free from

ForkAndJoin Framework 본문

개발/Java

ForkAndJoin Framework

고양이레옹이 2024. 5. 26. 19:29
728x90
반응형

Intro

Fork/Join 프레임워크는 병렬 프로그래밍을 위한 도구로 JDK 7java.util.concurrent 패키지에 등장했습니다. 이름과 같이 분할할 수 있는 작업을 더 작은 작업으로 나누고(Fork) 처리하여 각각의 결과를 합쳐서(Join) 반환하는 프레임워크입니다. 분할된 작업들을 처리하기 위해서 내부적으로 스레드 풀을 구성하여 작업을 병렬로 처리합니다. 이를 통해 멀티코어 프로세서를 최대한 활용하여 성능을 향상시킬 수 있습니다.

 

Executors, java.util.concurrent 패키지를 통해서 스레드를 어떻게 생성하고 관리할지에 대한 부분이 정리가 되었다면 Fork/Join 프레임워크를 통해서 스레드를 활용하여 어떻게 작업을 병렬로 처리할지에 대해서 다룹니다.

Fork/Join프레임워크는 ForkJoinPool, ForkJoinTask, RecursiveTaskRecursiveAction 등의 주요 클래스로 구성됩니다.

클래스 설명
ForkJoinPool Fork/Join 작업을 실행하는 스레드 풀을 나타냅니다.
병렬 작업을 실행하고 관리하며 스레드를 효율적으로 할당합니다.
ForkJoinTask Fork/Join 프레임워크의 작업을 나타내는 추상 클래스입니다.
RecursiveTaskRecursiveAction이 상속합니다.
RecursiveTask ForkJoinTask를 상속하는 클래스로, 결과를 반환하는 작업을 정의합니다.
병렬 작업의 결과를 합치는 역할을 수행합니다.
RecursiveAction ForkJoinTask를 상속하는 클래스로, 결과를 반환하지 않는 작업을 정의합니다.
주로 부수 효과를 갖는 작업에 사용됩니다.

 

ForkJoinPool

ForkJoinPool 클래스는 ExecutorService 인터페이스의 구현체로 분할된 작업을 병렬로 실행할 수 있도록 스레드 풀을 구성하고 작업을 관리합니다. 스레드 풀의 스레드 개수는 보통 프로세서 코어 수에 맞게 자동으로 생성되며 각 스레드는 자신의 작업 Queue를 가지고 있습니다. 컴퓨터가 4코어를 갖고 있다면 ForkJoinPool은 기본적으로 4개의스레드로 구성됩니다.

 

ForkJoinPool은 작업을 효율적으로 분배하기 위해 Work-Stealing 알고리즘을 사용합니다. 이 알고리즘은 각 스레드가 자신의 작업을 처리한 후 자신의 Queue가 비어있으면 다른 스레드의 Queue에서 작업을 가져와 처리합니다. 이를 통해 작업을 공평하게 분배하고 스레드 간의 부하를 균형있게 유지할 수 있습니다.

 

예를 들어 분할된 작업이 6개라면 4개의 작업이 우선적으로 처리되고 나머지 2개의 작업은 Queue에 대기하게 됩니다. 이때 스레드 A가 작업을 완료하고 자신의 큐가 비어있고 다른 스레드의 Queue에 작업이 있다면 해당 작업을 가져와 처리하게 됩니다.

 

ForkJoinPool은 아래와 같이 생성하여 ForkJoinTask를 전달하여 작업을 분할하고 결과를 반환합니다.

public static void main(String[] args) {
    int[] array = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

    // ForkJoinPool 생성
    ForkJoinPool forkJoinPool = new ForkJoinPool();

    // 작업 생성 및 실행
    int sum = forkJoinPool.invoke(new MyRecursiveTask(array, 0, array.length));

    // 결과 출력
    System.out.println("Sum: " + sum);
}

작업을 비동기/동기적으로 호출하는 것과 호출하는 시점에 따라 API가 조금 다릅니다.

호출 유형 Fork/Join 외부에서 호출 Fork/Join 내부에서 호출
비동기 실행 execute() ForkJoinTask.fork()
동기 실행 invoke() ForkJoinTask.invoke()
Future 반환 submit() ForkJoinTask.fork()

 

ForkJoinTask / RecursiveTask / RecursiveAction

ForkJoinTaskForJoinPool에서 실행되는 작업나타내는 추상 클래스입니다. fork()join() 메서드를 통해 작업을 분할하고 실행합니다. 또한 Future인터페이스를 구현하고 있어 완료, 취소와 같은 작업상태를 확인할 수 있으며 get() 메서드를 통해 결과를 반환받기 위한 API를 제공합니다.

 

실제 작업은 ForkJoinTask를 상속하는 RecursiveTaskRecursiveAction의 compute()을 구현하여 나타냅니다. 일반적으로 compute()는 divide-and-conquer 알고리즘을 작성하는 형태로 구현합니다.
만약 작업이 분리될 수 있으면 하위 작업으로 분리하고 fork()를 실행하여 스레드의 Queue에서 대기하게 됩니다.

if (하위 작업으로 분리할 수 있는지 판단)
    하위 작업으로 분리,
    fork()
else {
    태스크 실행
} 

RecursiveTask는 제네릭 타입을 반환하며 RecursiveAction은 반환 타입이 없을 경우 사용하게 됩니다.

 

Example

디렉토리 내의 파일들의 사이즈룰 조회해야하는 상황을 생각해봅시다.
디렉토리 내부에는 파일도 있지만 디렉토리도 존재할 수 있습니다.
그러므로 디렉토리 내부에 다른 디렉토리가 존재하면 하위 작업으로 분리할 수 있을 것 같습니다.

public class DirectorySizeCalculator extends RecursiveTask<Long> {
    private final File directory;

    public DirectorySizeCalculator(File directory) {
        this.directory = directory;
    }

    @Override
    protected Long compute() {
        long size = 0;
        File[] files = directory.listFiles();
        // 반환 조건을 명시합니다.
        if (Objects.isNull(files)) {
            return size;
        }

        // 파일 목록을 순회하면서 파일의 크기를 합산합니다.
        List<DirectorySizeCalculator> subTasks = new ArrayList<>();
        for (File file : files) {
            if (file.isDirectory()) {
                // 하위 디렉토리인 경우, 하위 작업을 생성하여 ForkJoinPool에 제출합니다.
                DirectorySizeCalculator subTask = new DirectorySizeCalculator(file);
                subTasks.add(subTask);
                subTask.fork();
            } else {
                size += file.length();
            }
        }

        // 하위 디렉토리들의 작업 결과를 합산합니다.
        for (DirectorySizeCalculator subTask : subTasks) {
            size += subTask.join();
        }

        return size;
    }

    public static void main(String[] args) {
        File directory = new File("/path/to/your/directory");

        // ForkJoinPool을 생성합니다.
        ForkJoinPool pool = new ForkJoinPool();

        // 결과값을 받기 위해서 invoke 메서드를 호출합니다.
        long totalSize = pool.invoke(new DirectorySizeCalculator(directory));  

        // 디렉토리의 전체 크기를 출력합니다.
        System.out.println("Total size of directory: " + totalSize + " bytes");
    }
}

 

Outro

병렬로 모든것을 처리하면 좋겠지만 장점 뒤에 있는 단점을 잘 유의하면서 사용해야합니다.
먼저 작업의 분할과 병합에 대한 비용을 고려해야 합니다.
작은 작업을 분할할 때는 오버헤드가 발생할 수 있으므로 너무 작은 작업을 분할하지 않도록 주의해야 합니다.

그리고 Fork/Join 프레임워크를 사용할 때는 작업을 가능한 한 순수 함수로 구현하는 것이 좋습니다.
순수 함수는 입력을 받아 결과를 반환하는데 외부 상태를 변경하지 않고 순수한 입력에만 의존하는 함수입니다.
이렇게 하면 병렬 처리가 더 쉬워지고 데드락, 가시성 등의 스레딩 문제가 발생하지 않습니다.

728x90

'개발 > Java' 카테고리의 다른 글

ClassLoader  (0) 2024.06.02
ExecutorService  (0) 2024.05.19
Java ClassFile - 2  (0) 2024.05.18
Java ClassFile - 1  (0) 2024.05.16
Comments