본문 바로가기

IT

[Java] CompletableFuture의 이해와 활용

728x90

Java5에서 추가된 Future는 비동기 작업의 결과를 나타내는 인터페이스다. Future는 작업이 아직 완료되지 않았더라도 결과에 접근할 수 있는 방법을 제공한다.

 

예제 코드

import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 비동기 작업 생성
        Future<String> future = executor.submit(() -> {
            Thread.sleep(2000); // 비동기 작업 시뮬레이션
            return "Hello, Future!";
        });

        System.out.println("Do other work while the future is running...");

        // 비동기 작업이 완료될 때까지 기다림
        String result = future.get();
        System.out.println("Future result: " + result);

        executor.shutdown();
    }
}

 

출력 결과 

Do other work while the future is running...
Future result: Hello, Future!

하지만 Future는 몇 가지 한계가 있다.

  • 블로킹: Future의 get() 메서드를 호출하면 작업이 완료될 때까지 현재 스레드가 블록된다. 이는 비동기성을 잃게 되는데, 다른 작업을 수행하지 못하고 대기해야 한다는 단점이 존재한다.
  • 예외 처리 어려움: 비동기 작업에서 예외가 발생하면 get() 메서드를 호출할 때 예외가 발생하게 된다. 이 예외를 적절하게 처리하고 전파하는 것이 어렵습니다.
  • 조합성의 어려움: 여러 개의 Future를 조합하거나 여러 작업을 병렬로 실행하고자 할 때 Future만으로는 코드를 복잡하게 만들 수 있다.

이러한 한계를 극복하기 위해 Java 8에서는 CompletableFuture가 도입되었는데, CompletableFuture는 Future의 한계를 극복하고 비동기 코드를 더 쉽게 다룰 수 있도록 디자인되었다고 한다. 몇 가지 주요 특징은 다음과 같다:

  • 비동기 메서드 체이닝: CompletableFuture는 비동기 메서드 체이닝을 통해 여러 작업을 연결하여 실행할 수 있다.
  • 콜백 지원: CompletableFuture는 콜백 메서드를 등록하여 작업이 완료되었을 때 특정 동작을 수행할 수 있다.
  • 조합성 개선: 여러 CompletableFuture를 조합하고 결합하여 더 복잡한 비동기 흐름을 만들 수 있다.
  • 에러 처리 개선: 예외 처리가 더 쉽고 강력하게 구현되어 있다.

CompletableFuture 보다 유연하고 강력한 비동기 프로그래밍을 지원하기 위해 나왔으며, Future 비해 편리하게 사용할 있다.

 

예제 코드

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(1);

        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000); // 비동기 작업 시뮬레이션
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello from CompletableFuture!";
        }, executor);

        // 비동기 작업이 완료되면 실행할 콜백 등록
        completableFuture.thenAccept(result -> System.out.println(result));

        // 다른 작업을 수행할 수 있음
        System.out.println("Doing other work...");

        // CompletableFuture가 완료될 때까지 블록하지 않고 대기
        completableFuture.join();

        executor.shutdown();
    }
}

 

CompletableFuture 작업이 완료될 실행할 콜백을 등록할 있으며, join() 메서드를 사용하여 블록하지 않고 대기할  있다. join() 메서드는 밑에서 더 자세히 알아보자. 이렇게 하면 비동기성을 유지하면서도 콜백 등록 등으로 작업을 처리할 있다. 또한 CompletableFuture 다양한 조합 메서드를 제공하여 여러 작업을 효과적으로 조합할 있습니다.

 

출력 결과

Do other work while the future is running...
Hello from CompletableFuture!

 

반환값이 없는 경우 runAsync, 반환값이 있는 경우 supplyAsync을 사용하는데, runAsync supplyAsync 기본적으로 자바7 추가된 ForkJoinPool commonPool() 사용해 작업을 실행할 쓰레드를 쓰레드 풀로부터 얻어 실행시킨다.

CompletableFuture ForJoinPool

ForkJoinPool Work-Stealing 알고리즘을 기반으로 동작한다. 알고리즘은 스레드가 자신의 작업을 처리한 다른 스레드의 대기 중인 작업을 스틸(도둑질)해서 가져와서 처리하는 방식으로 이를 통해 작업의 균형을 맞추고 병렬성을 극대화하는 프레임워크인데 나중에 따라 알아보도록 하자. 만약 다른 원하는 쓰레드 풀을 사용하려면, ExecutorService 파라미터로 넘겨주면 된다.

 

CompletableFuture에 대한 특징을 하나씩 살펴보도록 하자.

 

1. 비동기 메서드 체이닝

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureChainingExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> "Hello")
                .thenApplyAsync(s -> s + " World")
                .thenAcceptAsync(System.out::println);

        future.join(); // 대기하지 않고 프로그램이 종료되지 않도록 join()을 호출

        // 비동기 메서드 체이닝을 통해 "Hello World"가 출력됨
    }
}

 

위의 코드에서 thenApplyAsync 앞선 비동기 작업의 결과에 함수를 적용하고, thenAcceptAsync 결과를 소비하여 출력한다.

  • thenApply: 반환 값을 받아서 다른 값을 반환함함수형 인터페이스 Function을 파라미터로 받음
  • thenAccpet: 반환 값을 받아 처리하고 값을 반환하지 않음함수형 인터페이스 Consumer를 파라미터로 받음
  • thenRun: 반환 값을 받지 않고 다른 작업을 실행함함수형 인터페이스 Runnable을 파라미터로 받음

출력 결과

"Hello World"

 

2. 콜백 지원

import java.util.concurrent.CompletableFuture;

public class CompletableFutureCallbackExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");

        future.thenAccept(result -> System.out.println("Result: " + result));

        // 다른 작업 수행

        // 프로그램이 즉시 종료되지 않도록 대기
        future.join();
    }
}

thenAccept 사용하여 작업이 완료되면 콜백이 실행되어 결과를 출력한다.

 

출력 결과

"Result: Hello"

 

3. 조합성 개선

import java.util.concurrent.CompletableFuture;

public class CompletableFutureCombiningExample {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> " World");

        CompletableFuture<String> combinedFuture = future1.thenCombineAsync(future2, (s1, s2) -> s1 + s2);

        combinedFuture.thenAccept(System.out::println);

        // 다른 작업 수행

        // 프로그램이 즉시 종료되지 않도록 대기
        combinedFuture.join();
    }
}
  • thenCompose :두 작업이 이어서 실행하도록 조합하며, 앞선 작업의 결과를 받아서 사용할 수 있음, 함수형 인터페이스 Function을 파라미터로 받음
  • thenCombine : 두 작업을 독립적으로 실행하고, 둘 다 완료되었을 때 콜백을 실행함, 함수형 인터페이스 Function을 파라미터로 받음
  • allOf : 여러 작업들을 동시에 실행하고, 모든 작업 결과에 콜백을 실행함
  • anyOf: 여러 작업들 중에서 가장 빨리 끝난 하나의 결과에 콜백을 실행함

출력 결과

"Hello World"

 

4. 에러 처리 개선

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExceptionHandlingExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 에러를 유발하는 작업 시뮬레이션
            throw new RuntimeException("Error in the async task");
        });

        CompletableFuture<String> exceptionally = future.exceptionally(ex -> "Handled Exception: " + ex.getMessage());

        exceptionally.thenAccept(System.out::println);

        
        /**
        
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            if (doThrow) {
                throw new IllegalArgumentException("Invalid Argument");
            }

            return "Thread: " + Thread.currentThread().getName();
        }).handle((result, e) -> {
            return e == null
                    ? result
                    : e.getMessage();
        });
        **/

        // 다른 작업 수행

        // 프로그램이 즉시 종료되지 않도록 대기
        exceptionally.join();
    }
}

exceptionally 사용하여 예외가 발생한 경우 대체값을 반환하도록 핸들링한다.

  • exeptionally: 발생한 에러를 받아서 예외를 처리함, 함수형 인터페이스 Function을 파라미터로 받음
  • handle, handleAsync:(결과값, 에러)를 반환받아 에러가 발생한 경우와 아닌 경우 모두를 처리할 수 있음, 함수형 인터페이스 BiFunction을 파라미터로 받음

 

CompletableFuture get() 메서드와 join() 메서드의 차이 

Future에서는 get() 메서드로 블럭 처리후 받아왔지만 CompletableFuture에서는 join() 메서드를 사용한 것을 확인할 수 있다.

Java의 CompletableFuture에서 get()과 join() 메소드는 모두 완료된 CompletableFuture의 결과를 반환하는 메소드이지만 두 메소드에는 몇 가지 차이점이 있다.

 

get() 메소드는 CompletableFuture의 결과가 사용 가능할 때까지 기다리며 결과가 사용 가능해지면 그 결과를 반환한다. 이 메소드는 CompletableFuture가 완료되기 전에 현재 스레드를 차단합니다. 만약 CompletableFuture가 예외를 던지면 원래 예외를 포장한 ExecutionException를 던진다.

 

join() 메소드도 CompletableFuture의 결과가 사용 가능할 때까지 기다리며 결과가 사용 가능해지면 그 결과를 반환하지만 join() 메소드는 체크된 예외를 던지지 않는다. 대신 CompletableFuture가 예외를 던지면 join() 메소드는 원래 예외를 포장한 UncheckedExecutionException을 던진다.

 

get()과 join() 메소드의 중요한 차이점 중 하나는 get() 메소드가 인터럽트 가능하다는 것이고, join() 메소드는 인터럽트가 불가능하다는 것인데 이것은 호출하는 스레드가 인터럽트될 때 get() 메소드가 InterruptedException을 던지는 반면, join() 메소드는 CompletableFuture가 완료될 때까지 차단된다.

 

또 다른 차이점은 get() 메소드는 java.util.concurrent.Future 인터페이스에 정의되어 있으므로 이 인터페이스를 구현하는 다른 클래스와 호환 가능하지만, join() 메소드는 CompletableFuture 클래스에만 특화되어 있기 때문에 CompletableFuture와만 사용할 수 있다는 것이다.

 

일반적으로 CompletableFuture을 다룰 때는 join() 메소드를 사용하는 것이 좋다고 한다.

get()과 join()의 차이