SKILL/JAVA

์ž๋ฐ” : Fork/Join framework

Jedy_Kim 2021. 9. 9. 18:28
728x90

๐Ÿš€ Fork/Join ์ •์˜ 

์—ฌ๊ธฐ๋ฅผ ๊ณต๋ถ€ํ•˜๊ธฐ ์œ„ํ•ด์„œ ์šด์˜์ฒด์ œ์˜ ํ”„๋กœ์„ธ์Šค์™€ ์Šค๋ ˆ๋“œ ํŒŒํŠธ๋ฅผ ๊ณต๋ถ€ํ•˜์˜€๊ณ , ์ž๋ฐ”์˜ ์Šค๋ ˆ๋“œ๋ฅผ ๊ฑธ์ณ Fork/Join๊นŒ์ง€ ๋‚˜๋ฆ„ ๋”ฅํ•˜๊ฒŒ ์ดํ•ดํ•˜๋ ค๊ณ  ๊ณต๋ถ€๋ฅผ ํ•ด์™”๋‹ค. ์šฐ์„  ์ •์˜๋ฅผ ํ•œ๋ฒˆ ์‚ดํŽด๋ณด์ž.
Fork/Join ํ”„๋ ˆ์ž„์›Œํฌ๋Š” ๋‹ค์ค‘ ํ”„๋กœ์„ธ์„œ์˜ ์ด์ ์„ ์ด์šฉํ•˜๊ธฐ ์œ„ํ•ด์„œ ExecutorService ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•œ๋‹ค. ๋‚ด๋ถ€์ ์ธ ๋™์ž‘ ๋ฐฉ์‹์€ ์ตœ๋Œ€ํ•œ ์ชผ๊ฐœ์งˆ ์ˆ˜ ์žˆ์„ ๋•Œ๊นŒ์ง€ ์ชผ๊ฐœ์ ธ์„œ ์ด๋Ÿฐ ์ž‘์€ ๋‹จ์œ„์˜ ์กฐ๊ฐ๋“ค์ด ํšŒ๊ท€์ ์œผ๋กœ ๋™์ž‘ํ•˜๊ฒŒ ๋˜์–ด์žˆ๋‹ค. ์•Œ๊ณ ๋ฆฌ์ฆ˜์—์„œ ์šฐ๋ฆฌ๊ฐ€ ํ”ํžˆ "๋ถ„ํ•  ์ •๋ณต"์ด๋ผ๊ณ  ๋ถ€๋ฅด๋Š” ๊ฐœ๋…๊ณผ ๊ฐ™๋‹ค๊ณ  ์ดํ•ด๋ฅผ ํ•˜๋ฉด ์ข‹์„ ๊ฒƒ ๊ฐ™๋‹ค. 
Fork/Join์˜ ๋ชฉ์ ์€ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ๋ชจ๋“  ์ฒ˜๋ฆฌ๋Šฅ๋ ฅ์„ ์‚ฌ์šฉํ•˜์—ฌ ํ”„๋กœ๊ทธ๋žจ์˜ ์ˆ˜ํ–‰๋Šฅ๋ ฅ์„ ํ–ฅ์ƒํ•˜๋Š” ๋ฐ์— ์žˆ๋‹ค.

์ฆ‰ Fork/Join์€ CPU๋ฅผ ๋” ์‰ฝ๊ฒŒ, ํšจ์œจ์ ์œผ๋กœ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด์„œ ๋งŒ๋“ค์–ด์กŒ๋‹ค.

๐Ÿš€ ๋™์ž‘ ๋ฐฉ์‹

ExecutorService ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•œ ๋ชจ๋“  ๊ฒƒ๊ณผ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ, ์Šค๋ ˆ๋“œ ํ’€ ์•ˆ์—์„œ ๊ฐœ๋ณ„ ์Šค๋ ˆ๋“œ๋“คํ•œํ…Œ ์—…๋ฌด๋ฅผ ๋ถ„๋ฐฐํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค. ํ•˜์ง€๋งŒ Work Stealing ์•Œ๊ณ ๋ฆฌ์ฆ˜์„ ์ ์šฉ์‹œํ‚จ Fork/Join์€ ๊ธฐ์กด์— ExecutorService ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๋‹จ์ˆœ ๊ตฌํ˜„ํ•œ ๊ฒƒ๊ณผ๋Š” ๋‹ค๋ฅด๊ฒŒ ๋™์ž‘ํ•˜๊ฒŒ ๋œ๋‹ค. ๊ฐœ๋ณ„ ์Šค๋ ˆ๋“œ๋“ค์ด ์ข€ ๋” ๋ฐ”์œ ์Šค๋ ˆ๋“œ์˜ ์—…๋ฌด๋ฅผ ์œ ํœด ์Šค๋ ˆ๋“œ๊ฐ€ ์ผ์„ ๊ฐ€์ ธ๊ฐ€๋ฉด์„œ ๋ชจ๋“  ์Šค๋ ˆ๋“œ๊ฐ€ ๋ฐ”์˜๊ฒŒ ๋™์ž‘ํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค.

๐Ÿš€ Work stealing?

์–ด๋–ค ์ž‘์—…์ด ๋Œ€๊ธฐํ•˜๊ณ  ์žˆ๋Š” ํ(Queue)๊ฐ€ ์žˆ๋‹ค๊ณ  ํ•  ๋•Œ, ํ•œ์ชฝ๋งŒ ๋์ด ์•„๋‹ˆ๋ผ ์–‘์ชฝ์ด ๋์œผ๋กœ ์ธ์‹๋˜๋Š” ํ๋ฅผ Dequeue๋ผ๊ณ  ํ•œ๋‹ค. ์—ฌ๋Ÿฌ ๊ฐœ์˜ Dequeue์— ์ž‘์—…์ด ๋‚˜๋‰˜์–ด ์–ด๋–ค ์ผ์ด ์ง„ํ–‰๋  ๋•Œ ๋งŒ์•ฝ ํ•˜๋‚˜์˜ Dequeue๋Š” ๋งค์šฐ ๋ฐ”์˜๊ณ , ๋‹ค๋ฅธ Dequeue๋Š” ๋ฐ”์˜์ง€ ์•Š์„ ๋•Œ๊ฐ€ ์ฝ์„ ๊ฒƒ๋„ ์ด๋Ÿฐ ์ƒํ™ฉ์—์„œ ํ•  ์ผ์ด ์—†๋Š” Deque๊ฐ€ ๋ฐ”์œ Dequeue์— ๋Œ€๊ธฐํ•˜๊ณ  ์žˆ๋Š” ์ผ์„ ๊ฐ€์ ธ๊ฐ€์„œ ํ•ด ์ฃผ๋Š” ๊ฒƒ์ด๋ผ๊ณ  ์ƒ๊ฐํ•˜๋ฉด ๋œ๋‹ค. ์•„๋ž˜์˜ ๊ทธ๋ฆผ์„ ๋ณด๋ฉด ์ดํ•ด๊ฐ€ ์ข€ ๋” ์‰ฌ์šธ ๊ฒƒ์ด๋‹ค.
์ด๋Ÿฌํ•œ Work streal์ด๋ผ๋Š” ๊ฐœ๋…์ด Fork/Join์ด๋ผ๋Š” ๊ฒƒ์—๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ํฌํ•จ๋˜์–ด ์žˆ๋‹ค. ์ฆ‰ Fork/Join์„ ์‚ฌ์šฉํ•˜๋ฉด ๋ณ„๋„๋กœ ๊ตฌํ˜„ํ•˜์ง€ ์•Š์•„๋„ ๋œ๋‹ค๋Š” ๋œป์ด๋‹ค.

๐Ÿš€ ๋ฉ”์„œ๋“œ

Fork/Join ๊ธฐ๋Šฅ์€ java.util.concurrent ํŒจํ‚ค์ง€์˜ RecursiveAction๊ณผ RecursiveTask๋ผ๋Š” abstract ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค.
๊ฒฐ๋ก ์ ์œผ๋กœ ๋งํ•˜๋ฉด "~Action์€ ๊ฒฐ๊ณผ ๋ฆฌํ„ด๊ณผ, Generic์ด ์—†๋‹ค." ์ •๋„์˜ ์ฐจ์ด๊ฐ€ ์žˆ๋‹ค. RecursiveTask ํด๋ž˜์Šค๋Š” <V>๋ผ๋Š” ํƒ€์ž…์œผ๋กœ ๋ฆฌํ„ด๋œ๋‹ค. 

๋‘ ํด๋ž˜์Šค ๋ชจ๋‘ ForkJoinTask๋ฅผ ์ƒ์†๋ฐ›๊ณ  ์žˆ๋Š”๋ฐ ํ•ด๋‹น ํด๋ž˜์Šค๋Š” Serializable, Future <V>๋ผ๋Š” ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•˜์˜€๋‹ค. ์—ฌ๊ธฐ์„œ Future๋ผ๋Š” ์ธํ„ฐํŽ˜์ด์Šค๋Š” "๋น„๋™๊ธฐ์ ์ธ ์š”์ฒญ์„ ํ•˜๊ณ  ์‘๋‹ต์„ ๊ธฐ๋‹ค๋ฆด ๋•Œ ์‚ฌ์šฉ"๋œ๋‹ค.

Fork/Join ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋ ค๋ฉด RecursiveTask ํด๋ž˜์Šค๋‚˜ RecursiveAction ํด๋ž˜์Šค๋ฅผ ํ™•์žฅํ•˜์—ฌ ๊ฐœ๋ฐœํ•˜๋ฉด ๋œ๋‹ค. ๋‘ ํด๋ž˜์Šค ๋ชจ๋‘ compute()๋ผ๋Š” ๋ฉ”์„œ๋“œ๊ฐ€ ์žˆ๊ณ , ์ด ๋ฉ”์„œ๋“œ๊ฐ€ ์žฌ๊ท€ ํ˜ธ์ถœ๋˜๊ณ , ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.
์›ํ˜• ์„ค๋ช…
public abstract class RecursiveAction extends ForkJoinTask<Void> A recursive resultless ForkJoinTask. This class establishes conventions to parameterize resultless actions as Void ForkJoinTasks. Because null is the only valid value of type Void, methods such as join always return null upon completion.
public abstract class RecursiveTask<V> extends ForkJoinTask<V> A recursive result-bearing ForkJoinTask.
 ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋Š” ํด๋ž˜์Šค๋ฅผ ๋งŒ๋“  ํ›„์—๋Š” ForkJoinPool ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ž‘์—…์„ ์‹œ์ž‘ํ•œ๋‹ค.
  Call from non-fork/join clients
(Fork/Join ํด๋ผ์ด์–ธํŠธ ๋ฐ–์—์„œ ํ˜ธ์ถœ)
Call from within fork/join computations
(Fork/Join ํด๋ผ์ด์–ธํŠธ ๋‚ด์—์„œ ํ˜ธ์ถœ)
Arrange async execution
(๋น„ ๋™๊ธฐ์  ํ˜ธ์ถœ ์ˆ˜ํ–‰์‹œ)
execute(ForkJoinTask)  ForkJoinTask.fork()
Await and obtain result
(ํ˜ธ์ถœ ํ›„ ๊ฒฐ๊ณผ ๋Œ€๊ธฐ)
invoke(ForkJoinTask)  ForkJoinTask.invoke()
Arrange exec and obtain Future
(ํ˜ธ์ถœ ํ›„ Future ๊ฐ์ฒด ์ˆ˜์‹ )
submit(ForkJoinTask)  ForkJoinTask.fork() (ForkJoinTasks are Futures)
Fork/Join์€ ๋ณต์žกํ•œ ์—ฐ์‚ฐ์„ ์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•ด์„œ ๋งŒ๋“  ๊ฒƒ์ด๋‹ค. ์ฆ‰ ๊ณ„์‚ฐ์ด ๋ณต์žกํ•˜๋ฉด ๋ณต์žกํ• ์ˆ˜๋ก Fork/Join์˜ ํšจ๊ณผ๋Š” ํฌ๋ฉฐ, ๊ณ„์‚ฐ์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ ์œ„ํ•ด์„œ ์Šค๋ ˆ๋“œ๋ฅผ ๊ด€๋ฆฌํ•  ํ•„์š”๊ฐ€ ์—†๋‹ค๋Š” ๊ฒƒ์ด ํ•ต์‹ฌ์ด๋‹ค.

๐ŸŽฏ  ์˜ˆ์ œ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.study;
 
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask; 
 
class GetSum2 extends RecursiveTask<Long> {
 
    long from, to;
 
    public GetSum2(long from, long to) {
        this.from = from;
        this.to   = to;
    }
 
    @Override
    protected Long compute() {
 
        long gap = to - from;
 
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        log("From = " + from + " To = " + to);
 
        if(gap<=3) {
            long tempSum = 0;
            for(long loop = from; loop <= to; loop++) {
                tempSum += loop;
            }
            log("Return !! " + from + " ~ " + to + " = " + tempSum);
            return tempSum;
        }
 
        long middle = (from + to) / 2;
        GetSum2 sumPre = new GetSum2(from, middle);
        log("Pre      From = " + from + " To = " + middle);
        sumPre.fork();
        GetSum2 sumPost = new GetSum2((middle+1), to);
        log("Post     From = " + (middle + 1+ " To = " + to);
 
        return sumPost.compute() + (long)sumPre.join();
    }
 
    public void log(String message) {
        String threaName = Thread.currentThread().getName();
        System.out.println("[" + threaName + "] " + message);
    }
 
}
 
public class ForkJoin {
 
    static final ForkJoinPool mainPool = new ForkJoinPool();
 
    public static void main(String[] args) {
        ForkJoin sample = new ForkJoin();
        sample.calc2();
    } 
    
    public void calc2() {
        long from = 0;
        long to = 10;
 
        GetSum2 sum = new GetSum2(from, to);
        Long result = mainPool.invoke(sum);
        System.out.println("Fork Join : Total sum of " + from + " ~ " + to + " = " + result);
    }
}
 
cs
ForkJoinํด๋ž˜์Šค๋ถ€ํ„ฐ ์‚ดํŽด๋ณด๋ฉด,
- ForkJoinPool ์ด๋ผ๋Š” ํด๋ž˜์Šค์˜ ๊ฐ์ฒด๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
- ๊ณ„์‚ฐ์„ ์ˆ˜ํ–‰ํ•  GetSum2 ํด๋ž˜์Šค์˜ ๊ฐ์ฒด๋ฅผ ๋งŒ๋“ ๋‹ค.
- ForkJoinPool์— ์„ ์–ธ๋˜์–ด ์žˆ๋Š” invoke()๋ผ๋Š” ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๊ณ„์‚ฐ์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฐ์ฒด๋ฅผ ๋„˜๊ฒจ์ฃผ๋ฉด, ์ž‘์—…์ด ์‹œ์ž‘๋œ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ๊ทธ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ›๋Š”๋‹ค.

๊ฒฐ๊ณผ

์œ„์˜ ์„ค๋ช…์„ ์ฝ๊ณ  ๊ฒฐ๊ณผ๋ฅผ ๋ณธ๋‹ค๋ฉด ์‰ฝ๊ฒŒ ์ดํ•ด๊ฐ€ ๋  ๊ฒƒ์ด๋‹ค. ์ดํ•ด๊ฐ€ ์•ˆ ๋œ๋‹ค๋ฉด ์•„๋งˆ ์žฌ๊ท€ ํ˜ธ์ถœ์˜ ์›๋ฆฌ๋ฅผ ๋ชจ๋ฅผ ์ˆ˜๋„ ์žˆ์„ ๊ฒƒ์ด๋‹ค. ๊ทธ๋ž˜๋„ ๊ตฌ๋™์›๋ฆฌ๋ฅผ ์ด๋ฏธ์ง€ํ™”์‹œ์ผœ๋ณด๋ฉด์„œ ๋งˆ๋ฌด๋ฆฌ๋ฅผ ์ง“๋„๋ก ํ•˜๊ฒ ๋‹ค.

๐Ÿš€  ๋งˆ๋ฌด๋ฆฌ

์ž๋ฐ”๋ฅผ ํ•˜๋ฉด์„œ Fork/Join์ด๋ผ๋Š” ๊ฐœ๋…์€ ์ฒ˜์Œ ์•Œ๊ฒŒ ๋˜์—ˆ๋‹ค. ์ด๋ ‡๊ฒŒ ํ•˜๋ฉด์„œ ๋“œ๋Š” ์ƒ๊ฐ์ด ์ฝ”๋”ฉ ํ…Œ์ŠคํŠธ ๊ณต๋ถ€ํ•˜๋ฉด์„œ ํ•œ๋ฒˆ ๋ฐฑ์ค€์—์„œ ์ ์šฉํ•ด๋ณด๊ณ  ์‹ถ์€ ๊ถ๊ธˆ์ฆ์ด ์ƒ๊ฒผ๋‹ค.

 

์ฐธ๊ณ ์ž๋ฃŒ

- [๊ต์žฌ] ์ž๋ฐ”์˜ ์‹  (์ด์ƒ๋ฏผ ์ €)

- https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html

- https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html 

- https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/RecursiveAction.html

- https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/RecursiveTask.html

- https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html

๋ฐ˜์‘ํ˜•

'SKILL > JAVA' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

Comparator, ComparTo  (0) 2021.11.03
์ž๋ฐ” : ์Šค๋ ˆ๋“œ(2)  (0) 2021.09.08
์ž๋ฐ” : ์Šค๋ ˆ๋“œ(1)  (0) 2021.09.07
์ž๋ฐ” : java 7 - ๋ณด์™„๋œ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ  (0) 2021.09.03
์ž๋ฐ” : Heap pollution ์ด๋ž€?  (0) 2021.09.02