CompletableFuture
에 대해서 알아보기 전에 먼저 Java 5부터 사용할 수 있었던 Future
인터페이스를 확인해 보겠습니다. Future
를 이용하면 호출한 스레드가 결과를 기다리는 동안 다른 작업을 수행할 수 있습니다.
아래 소스코드를 기준으로 예를 들면 치킨배달이 완료(10초)되는 동안 그냥 기다리는 것이 아니라 TV시청을 한 이후에 배달된 치킨을 먹을 수 있습니다. TV시청 시간이 10초 보다 짧다면 배달이 완료되는 나머지 시간만큼은 그냥 대기를 하게 되고 배달시간(10초) 보다 더 많은 시간을 보게 되면 바로 먹을 수 있습니다. 배달사고로 인해 무한정 기다리게 되는 경우가 발생할 수도 있기 때문에 최대 기다릴 수 있는 타임아웃 시간(15초)도 지정했습니다.
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
Thread.sleep(10000);
return "치킨배달 완료";
});
doWatchingTv();
try {
String result = future.get(15, TimeUnit.SECONDS);
System.out.println("result: " + result);
executor.shutdown();
} catch (ExecutionException | InterruptedException | TimeoutException e) {
e.printStackTrace();
}
이제 CompletableFuture
를 활용할 수 있도록 코드를 변경해 보겠습니다. 먼저 DeliveryStore
라는 클래스를 만들어 배달 주문 받을 수 있도록 하겠습니다. 주문 이후 배달완료까지 3초 소요되도록 변경했습니다. 이때 시간 지연이 발생하므로 CompletableFuture
의 supplyAsync()
으로 처리 했습니다. supplyAsync()
의 두 번째 인수를 지정하지 않으면 commonPool를 사용하게 됩니다. 물론 직접 생성해서 지정할 수도 있습니다.
@Data
@AllArgsConstructor
public static class DeliveryStore {
private String name;
public Future<String> orderAsync(String item) {
return CompletableFuture.supplyAsync(() -> {
int seconds = 3000;
delay(seconds);
return "[" + name + "] " + item + " 배달 완료!! (배달시간: " + seconds + "msecs)";
});
}
public String order(String item) {
int seconds = 3000;
delay(3000);
return "[" + name + "] " + item + " 배달 완료!! (배달시간: " + seconds + "msecs)";
}
private void delay(int seconds) {
try {
Thread.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
이제 새로만든 DeliveryStore
클래스를 이용해서 주문을 해보겠습니다.
public static void main(String[] args) {
DeliveryStore chicken = new DeliveryStore("치킨");
Future<String> future = chicken.orderAsync("후라이드");
doWatchingTv();
try {
String result = future.get(8, TimeUnit.SECONDS);
System.out.println("결과: " + result);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
e.printStackTrace();
}
}
배달 정보가 잘 출력 되는 것을 확인할 수 있습니다.
결과: [치킨] 후라이드 배달 완료!! (배달시간: 3000msecs)
이번에는 비동기 주문(orderAsync()
)이 아닌 동기 주문(order()
)을 호출해서 여러 DeliveryStore
에 주문을 해보겠습니다. 아래와 같이 세개의 DeliveryStore
에 "세트1호"를 주문하면 모두 완료되려면 얼만큼의 시간이 소요될까요? 3초씩 총 9초가 소요됩니다.
List deliveryStores = Arrays.asList(
new DeliveryStore("치킨"),
new DeliveryStore("피자"),
new DeliveryStore("분식"));
deliveryStores.stream()
.map(ds -> ds.order("세트1호"))
.forEach(System.out::println);
총 배달소요 시간을 줄이기 위해 스트림을 병렬 스트림으로 이용해 보겠습니다. 이번에는 총 3초가 소요되었습니다. (테스트 환경의 코어 수에 따라 결과는 다를 수 있습니다.)
deliveryStores.parallelStream()
.map(ds -> ds.order("세트1호"))
.forEach(System.out::println);
DeliveryStore
수가 훨씬 많다고 가정하면 결과는 어떻게 될까요? commonPool의 제한된 쓰레드풀이 아닌 필요한 작업량을 고려해서 쓰레드풀을 만들 수 있어야 합니다. Executor
를 통해서 설정해보겠습니다.
List deliveryStores = Arrays.asList(
new DeliveryStore("치킨"),
new DeliveryStore("피자"),
new DeliveryStore("일식"),
new DeliveryStore("중식"),
new DeliveryStore("토스트"),
new DeliveryStore("김밥"),
new DeliveryStore("케익"),
new DeliveryStore("과일"),
new DeliveryStore("분식"));
// 최대 스레드 30개
ExecutorService excutor = Executors.newFixedThreadPool(Math.min(deliveryStores.size(), 30), r -> {
Thread t = new Thread(r);
t.setDaemon(true); // 데몬스레드 사용(프로그램종료 방해 방지)
return t;
});
List<CompletableFuture<String>> futures = deliveryStores.stream()
.map(ds -> CompletableFuture.supplyAsync(() -> ds.order("세트1호"), excutor))
.collect(toList());
List<String> result = futures.stream()
.map(CompletableFuture::join)
.collect(toList());
일반 스레드가 실행 중이면 자바프로그램이 종료되지 않는데 만약 어떤 작업을 한없이 기다리게 되면 문제가 발생할 수 있기 때문에 자바 프로그램이 종료 될 때 같이 강제로 실행이 종료 될 수 있도록 데몬 스레드로 지정합니다.
CompletableFuture.supplyAsync()
호출 이후에 CompletableFuture.join
을 바로 호출하지 않고 스트림을 분리한 이유는 함께 연속해서 호출하게 되면 모든 DeliveryStore
가 동기적, 순차적으로 실행되게 되므로 분리하여 호출해야 합니다. 실행 결과를 대략 3초정도 소요됩니다. commonPool을 사용했다면 더 많은 시간이 소요됐을 것입니다. 그렇다고 무한정 스레드 수를 늘릴 수는 없습니다. 너무 많으면 오히려 서버가 크래시 될 수 있으므로 서버의 CPU 코어 수, CPU 활용 비율, 대기시간/처리시간의 비율을 고려하여 적정한 수를 지정해야 합니다.