Milk's development review

ForkJoinPool과 ParallelStream 본문

Language/JAVA

ForkJoinPool과 ParallelStream

YooMilk 2019. 9. 17. 12:08
반응형

개발 중 다음과 같은 코드를 만났습니다.

new ForkJoinPool(10).submit(() -> {   
    integerList.parallelStream().forEach((integer) -> { ...

parallelStream이 내부적으로 ForkJoinPool을 사용하는데 어째서 위와 같은 코드가 작성되었는지 의문이 들었습니다.
먼저 ForkJoinPool에 대해서 간단하게 정리해보겠습니다. 아래 내용은 java docs에서 발췌했습니다.


An ExecutorService for running ForkJoinTasks. A ForkJoinPool provides the entry point for submissions from non-ForkJoinTask clients, as well as management and monitoring operations.

A ForkJoinPool differs from other kinds of ExecutorService mainly by virtue of employing work-stealing: all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined.

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html

주의 깊게 봐야할 부분은

  • 다른 종류의 ExecutorService와는 다르게 Work-Stealing 매커니즘을 사용한다.
  • 때문에 대부분의 task가 하위 task를 생성하는 경우, 외부 클라이언트에 의한 small task가 많을 경우 효과적일 수 있다.

정도입니다.

하지만 크게 와닿지는 않습니다. 조금 더 쉽게 풀어서 예를 들면 아래와 같습니다.

  1. 1부터 1000까지 더해야하는 task가 있다.

  2. Fork - Join 을 위해 아래 작업을 수행한다.

    2-1. task를 가능한 잘게 쪼갠다. (Fork)

    2-2. ForkJoinPool에 있는 Thread들은 각각의 task를 처리하며 그 과정은 아래와 같다. (Join)

    ​ 2-2-1. ForkJoinPool 내부에는 inbound queue가 존재하며 inbound queue에는 task가 쌓인다.

    ​ 2-2-2. 각각의 Thread 들은 쌓여있는 task를 자신에게 개별 할당 된 queue에 적재해가며 처리한다.

    ​ 2-2-3. 만약 자신의 queue에 task가 더 남아 있지 않으면 다른 Thread들의 queue에 남아있는 task를 steal 한다.

그림으로는 아래와 같습니다.

https://hamait.tistory.com/612

결국 ForkJoinPool은 Work Steal을 통해 Thread가 멍청하게 놀고 있는 상황을 최대한 방지합니다.


다시 본론으로 돌아와서, parallelStream은 내부적으로 ForkJoinPool을 사용합니다. 헌데 왜 ForkJoinPool을 명시적으로 선언하여 사용하였지가 의문이었습니다. 의문에 대한 해답은 아래 강조처리 해두었습니다.


최상단에 있던 코드를 다시 보겠습니다.

new ForkJoinPool(10).submit(() -> {   
    integerList.parallelStream().forEach((integer) -> {

ForkJoinPool 생성자에서 전달하는 인자는 int parallelism으로 병렬지수를 의미합니다.
즉, task를 얼마나 병렬하게 수행하겠는가에 대한 수치입니다. 만약 인자를 생략할 경우 Default parallelism값이 셋업되며 PC의 CPU core수와 같습니다.
parallelStream의 경우 내부적으로 ForkJoinPool.commonPool을 참조하게 되며 이때도 마찬가지로 Default parallelism값이 적용됩니다.


헌데 만약 parallelStream을 사용하면서 Default parallelism값을 사용하지 않고 parallelism값을 직접 설정하고 싶다면 위의 코드와 같이 ForkJoinPool 객체를 생성하면서 parallelism값을 생성자 인자로 전달하면 됩니다.
궁금증은 어느정도 해소되었으나 여러가지 상황에 대한 Thread 상태가 궁금하여 Test Case 별로 테스트를 해 보았습니다.

  • Case 1 : ForkJoinPool을 선언한 경우 (paralleism : 5)

  • Case 2 : ForkJoinPool을 선언한 경우 (Default paralleism)

  • Case 3 : parallelStream만을 수행하는 경우

Initial Thread Active Count는 병렬수행 전 초기 활성화 된 Thread 수를 의미합니다.

availableProcessors는 PC의 CPU core수를 의미합니다.

  • Case 1의 경우 availableProcessors는 8이지만 직접 생성자 인자로 paralleism값을 5로 셋업 했습니다.
    따라서 최대 활성화 Thread Count는 Initial Thread Active Count + paralleism(5)가 되어 8이 됩니다.
  • Case 2의 경우 paralleism값은 Default parallelism로 설정되었고 이 경우 availableProcessors와 같습니다.
    따라서 최대 활성화 Thread Count는 Initial Thread Active Count + availableProcessors(8) = 11이 됩니다.
  • Case 3의 경우 ForkJoinPool을 명시적으로 선언하지 않았기 때문에 parallelStream이 내부적으로 ForkJoinPool.commonPool을 참조하게 됩니다.(Case 1,2와 Thread Name 다른것을 볼 수 있습니다.)
    이 경우 paralleism값은 Default parallelism으로 셋업됩니다.
    따라서 최대 활성화 Thread Count는 Case 2와 동일할 것으로 생각되지만 결과는 1이 적은 10입니다. 이유는 parallelStream을 사용할 경우, 이미 존재하는 main thread가 availableProcessors로 간주되어 작업을 함께 수행 하게 되기 때문입니다.

위 세가지 Case 말고도

  1. ForkJoin안에서 다시 ForkJoin을 수행 할 경우.
  2. parallelStream안에서 다시 parallelStream을 수행할 경우.
  3. (1), (2) 를 합쳐놓은 경우.

와 같은 경우가 있을 수 있습니다. 위와 같은경우 병렬수행이 중첩되어 Thread Count는 상황에 따라 기하 급수적으로 증가할 수 있으며 이는 성능에 영향을 미칠 수 있습니다. 또한, 병렬수행을 중첩할 경우 때에 따라서 Deadlock 이슈가 발생할 수 있습니다.


그렇다면 ''언제 parallelStream을 사용하는 것이 좋을까?'' 에 대해서 좀 더 찾아보게 되었습니다.
명쾌한 정답은 없지만, 분할이 잘 이루어 질 수 있는 데이터 구조를 가지거나 혹은 CPU사용이 높고 독립적인 작업을 수행 할 때 비교적 적합할수 있겠다고 생각했습니다.


좀 더 자세한 판단 기준은 아래 링크를 참조하시면 될 것 같습니다.
http://gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html


[참조]

반응형

'Language > JAVA' 카테고리의 다른 글

Class정보를 이용하여 Generic 타입을 return하는 method  (0) 2017.07.20
Comments