GPT Archiving

Java - CompletableFuture & addCallBack

99duuk 2025. 1. 6. 20:10

CompletableFuture는 Java의 비동기 프로그래밍을 지원하는 클래스입니다. Java 8에서 추가되었으며, 비동기 작업을 처리하고 조합할 수 있는 강력한 API를 제공합니다.


1. CompletableFuture란?

  • Java의 java.util.concurrent 패키지에 포함된 클래스.
  • 비동기적으로 작업을 수행하고, 그 결과를 처리하거나 다른 작업과 연결할 수 있도록 설계된 Future의 확장 버전.
  • 기존의 Future와 비교하여 더 유연하고 강력하며, 특히 비동기 작업을 쉽게 연결할 수 있습니다.

2. CompletableFuture의 주요 특징

  1. 비동기 작업 수행:
    • 특정 작업을 비동기적으로 실행하고 결과를 반환합니다.
    • 결과를 기다리지 않고 다른 작업을 계속 진행할 수 있습니다.
  2. 작업 조합:
    • 여러 CompletableFuture를 결합하여 복잡한 작업 흐름을 간단히 표현할 수 있습니다.
    • 예: 순차 실행, 병렬 실행, 결과 조합 등.
  3. 콜백 지원:
    • 작업이 완료되었을 때 특정 작업(콜백)을 실행할 수 있습니다.
  4. 비동기 처리 체인:
    • thenApply, thenCompose, thenAccept, handle 등의 메서드를 사용하여 작업을 체인 형태로 연결할 수 있습니다.

3. 주요 메서드와 사용법

1) 간단한 비동기 작업 실행

CompletableFuture를 생성하여 비동기 작업을 실행하는 방법:

import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("Running async task...");
        });

        // 메인 스레드에서 다른 작업 가능
        System.out.println("Main thread is free");

        // 결과를 기다리지 않음 (Optional)
        future.join();  // 완료될 때까지 기다림
    }
}

2) 결과를 반환하는 비동기 작업

비동기 작업의 결과를 반환받고 처리하는 방법:

import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "Hello, CompletableFuture!";
        });

        future.thenAccept(result -> {
            System.out.println("Result: " + result);
        });

        future.join(); // 결과 대기
    }
}

3) 작업 체인 연결

비동기 작업의 결과를 다음 작업으로 전달:

import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            return "Task 1";
        }).thenApply(result -> {
            return result + " -> Task 2";
        }).thenAccept(finalResult -> {
            System.out.println("Final Result: " + finalResult);
        }).join();
    }
}

4) 병렬 실행과 결과 조합

여러 비동기 작업을 병렬로 실행하고 결과를 조합:

import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2");

        future1.thenCombine(future2, (result1, result2) -> result1 + " + " + result2)
               .thenAccept(finalResult -> System.out.println("Combined Result: " + finalResult))
               .join();
    }
}

5) 예외 처리

비동기 작업 중 발생한 예외를 처리:

import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            if (true) {
                throw new RuntimeException("Error occurred!");
            }
            return "Task";
        }).exceptionally(ex -> {
            System.out.println("Exception: " + ex.getMessage());
            return "Fallback Result";
        }).thenAccept(result -> {
            System.out.println("Result: " + result);
        }).join();
    }
}

4. CompletableFuture의 주요 메서드

메서드 설명

runAsync() 반환값 없이 비동기 작업 실행.
supplyAsync() 반환값이 있는 비동기 작업 실행.
thenApply() 작업 완료 후 반환값을 변환.
thenAccept() 작업 완료 후 반환값을 소비(출력).
thenCombine() 두 개의 작업 결과를 조합.
exceptionally() 작업 중 예외 발생 시 기본값을 반환하거나 처리.
handle() 작업 완료 또는 예외 발생 시 처리 로직 정의.
allOf() 여러 작업이 모두 완료될 때 실행.
anyOf() 여러 작업 중 하나라도 완료되면 실행.

5. CompletableFuture와 Future의 차이

Feature CompletableFuture Future

작업 완료 후 콜백 지원 (thenApply, thenAccept 등) 지원하지 않음.
비동기 작업 연결 가능 (thenCompose, thenCombine 등) 불가능.
작업 취소 지원 가능 가능 (제한적).
결과 기다림 join() 또는 get() get()

6. 실전에서의 활용

  • 비동기 HTTP 호출.
  • 비동기 데이터베이스 연산.
  • 복잡한 비동기 작업의 병렬 처리 및 결과 조합.
  • 시간이 오래 걸리는 작업의 비동기 실행으로 UI 응답성 개선.


addCallback은 Spring의 ListenableFuture API에서 제공되는 메서드로, 비동기 작업의 성공 또는 실패 시 특정 콜백을 실행할 수 있도록 지원하는 기능입니다. Spring에서 Kafka 또는 비동기 작업 결과를 처리할 때 자주 사용됩니다.


1. addCallback의 정의

  • addCallback은 비동기 작업의 성공/실패를 처리하기 위한 콜백 메서드를 등록할 수 있게 합니다.
  • 주로 ListenableFuture 객체에서 사용되며, 성공(onSuccess)과 실패(onFailure) 콜백을 개별적으로 정의할 수 있습니다.

2. 주요 메서드

addCallback 메서드는 두 가지 방식으로 사용할 수 있습니다:

1) Success와 Failure 콜백을 각각 지정

void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
  • SuccessCallback: 작업이 성공적으로 완료되었을 때 실행됩니다.
  • FailureCallback: 작업이 실패했을 때 실행됩니다.

2) ListenableFutureCallback 객체로 통합

void addCallback(ListenableFutureCallback<? super T> callback);
  • ListenableFutureCallback: 성공 및 실패 콜백을 통합한 객체로 처리합니다.

3. 예제

1) Kafka addCallback 사용 예제

Kafka 프로듀서에서 비동기로 메시지를 전송하고 성공/실패를 처리:

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String key, String value) {
        // 메시지 전송
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, value);

        // addCallback 사용
        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("Message sent successfully to topic: " + result.getRecordMetadata().topic()
                        + ", partition: " + result.getRecordMetadata().partition()
                        + ", offset: " + result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                System.err.println("Failed to send message: " + ex.getMessage());
            }
        });
    }
}

2) Success와 Failure 콜백 분리

future.addCallback(
    result -> {
        System.out.println("Message sent successfully: " + result);
    },
    ex -> {
        System.err.println("Message sending failed: " + ex.getMessage());
    }
);

4. addCallback의 주요 장점

  1. 비동기 작업 결과 처리:
    • 성공 시와 실패 시의 처리를 명확히 분리 가능.
    • Kafka 메시지 전송 결과를 확인하거나, 실패 시 재시도 로직을 구현할 수 있음.
  2. 가독성:
    • 콜백 메서드를 분리하여 비동기 작업의 결과를 직관적으로 처리.
  3. Spring Integration:
    • ListenableFuture를 사용하는 Spring 기반 비동기 API와 자연스럽게 통합.

5. 실전 활용

  • Kafka 메시지 전송 결과 확인: Kafka 프로듀서에서 메시지 전송 후, 성공 또는 실패 여부를 확인하고 로그를 남기거나 후속 작업을 처리.
  • 비동기 데이터베이스 작업: 비동기로 데이터베이스에 데이터를 저장하거나 조회한 뒤, 결과를 처리.
  • HTTP 비동기 호출: Spring RestTemplate의 비동기 버전 또는 WebClient와 통합하여 HTTP 요청 결과 처리.

6. 주의사항

  • 스레드 풀 관리: addCallback의 콜백은 기본적으로 비동기 작업을 수행한 스레드에서 실행됩니다. 스레드 풀 크기가 충분하지 않으면 지연이 발생할 수 있습니다.
  • 예외 처리: 실패 콜백(onFailure)에서 예외를 적절히 처리하지 않으면 비동기 작업 흐름에 문제가 생길 수 있습니다.

 

'GPT Archiving' 카테고리의 다른 글

리플렉션 2  (0) 2025.01.11
리플렉션  (1) 2025.01.11
kafka - ProducerRecord  (0) 2025.01.06
kafka Sticky  (0) 2025.01.06
dockerfile - 의존성 캐싱  (1) 2025.01.06