Spring Async
@Async
비동기 실행을 위한 @Async
를 사용하기 위해서는 먼저 적당한 @Configuration
클래스에 @EnableAsync
를 추가해주어야 합니다.
@EnableAsync
@Configuration
public class AsyncConfig {
}
그런 다음 비동기적으로 실행되길 원하는 메서드에 @Async
어노테이션을 추가하면 됩니다. 다음은 Spring Event의 이벤트 리스너의 비동기 실행을 적용하기 위해 @Async
를 사용하는 코드입니다.
@Async
@TransactionalEventListener
public void createDefaultLinkBundle(CreateMemberEvent event) {
CreateLinkBundleCommand command
= new CreateLinkBundleCommand(event.memberId(), DEFAULT_LINK_BUNDLE, true);
linkBundleUseCase.createLinkBundle(command);
}
스프링 부트는 Executor
bean이 컨텍스트에 존재하지 않는 경우 ThreadPoolTaskExecutor
를 기본으로 구성합니다. 단, Java 21 버전부터 추가된 가상 스레드를 이용하는 경우 구성이 달라질 수 있습니다.
Spring은 비동기 실행을 위한 추상화 TaskExecutor
와 몇 가지 구현체를 제공합니다. 그 중 대표적인 것이 SimpleAsyncTaskExecutor
와 ThreadPoolTaskExecutor
입니다.
SimpleAsyncTaskExecutor
스레드를 재사용하지 않고, 항상 호출마다 새로운 스레드를 시작합니다. 그러나 슬롯이 비워질 때까지 모든 호출을 차단하는 동시성 제한을 지원합니다.
ThreadPoolTaskExecutor
가장 일반적으로 사용되는 구현입니다. java.util.concurrent.ThreadPoolExecutor
를 빈 스타일로 구성하고 TaskExecutor
로 래핑합니다.
ThreadPoolExecutor
의 동작 방식은 다음과 같습니다.
- 첫 작업이 들어오면,
CorePoolSize
만큼의 스레드를 생성합니다. - 유저 요청이 들어올 때마다 작업 큐에 담아둡니다.
CorePoolSize
에 해당하는 스레드 중, 유휴상태(idle)인 스레드가 있다면 작업 큐에서 작업을 꺼내 스레드에 할당합니다.- 유휴상태인 스레드가 없다면, 작업은 작업 큐에서 대기합니다.
- 작업 큐가 가득 차면, 스레드를 새로 생성합니다.
- 생성할 수 있는
MaxPoolSize
에 도달하면 설정된RejectedExecutionHandler
에 따라 작업이 처리됩니다.
- 작업이 완료되면 스레드는 다시 유휴상태로 돌아갑니다.
- 작업 큐가 비어있고
CorePoolSize
이상의 스레드가 생성되어있다면KeepAliveSeconds
만큼 대기 후 스레드를 destroy 합니다.
- 작업 큐가 비어있고
Bean 구성
스프링 부트는 properties를 통해 Bean으로 자동 등록하는 ThreadPoolTaskExecutor
의 설정을 구성할 수 있습니다.
spring:
task:
execution:
pool:
core-size: 8
max-size: 8
queue-capacity: 100
keep-alive: 60s
자동으로 구성된 Bean을 사용하는 대신 ThreadPoolTaskExecutor
를 직접 @Bean
으로 등록하면서 구성할 수도 있습니다.
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3); // 최소 스레드 풀 사이즈
executor.setMaxPoolSize(3); // 최대 스레드 풀 사이즈
executor.setQueueCapacity(50); // 대기열 길이
executor.setKeepAliveSeconds(60); // 여분의 스레드 유휴 시간
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 최대 스레드 풀, 대기열 가득 찬 경우의 정책
executor.setThreadNamePrefix("Task-"); // 스레드 프리픽스
executor.initialize(); // ThreadPoolExecutor 구성
return executor;
}
또한, @Async
의 value
속성을 이용하면 필요에 따라 Bean으로 등록한 다양한 executor
를 지정하여 비동기 실행 시 사용할 수 있습니다. value
에는 사용하고자 하는 executor
의 Bean 이름을 지정하면 됩니다.
@EnableAsync
@Configuration
public class AsyncConfig {
@Bean(name = "generativeTaskExecutor")
public Executor generativeTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
...
executor.initialize();
return executor;
}
}
@Component
public class EventListener {
@Async(value = "generativeTaskExecutor")
@TransactionalEventListener
public void createHubTags(CreateHubLinkEvent event) {
AutoCreateHubTagCommand command = new AutoCreateHubTagCommand(event.hubId());
try {
tagUseCase.autoCreateHubTags(command);
} catch (NotMetCondition e) {
}
}
}
RejectedExecutionHandler
스레드 풀이 설정한 최대 사이즈에 도달하고 대기열도 가득 찬경우 입력된 작업을 어떻게 처리할지 결정합니다. RejectedExecutionHandler
인터페이스는 더 이상 ThreadPoolExecutor
에 의해 작업이 실행될 수 없을 때 호출하는 메서드인 rejectedExecution()
을 정의하고 있습니다.
해당 인터페이스의 구현은 ThreadPoolExecutor
에 정적 클래스로 구현되어 있으며 다음과 같은 것들이 있습니다.
CallerRunsPolicy
거부된 작업은 execute
메서드의 호출 스레드에서 직접 실행합니다.
AbortPolicy
ThreadPoolExecutor
의 기본 핸들러로 RejectedExecutionException
를 던집니다.
DiscardPolicy
거부된 작업은 무시됩니다.
DiscardOldestPolicy
아직 처리되지 않은 요청 중 가장 오래된 것을 삭제하고 작업을 실행합니다.
비동기 예외 처리
비동기 처리 중 발생한 예외는 AsyncUncaughtExceptionHandler
를 구현하여 예외 처리가 가능합니다. 해당 핸들러를 등록하기 위해서는 AsyncConfigurer
의 getAsyncUncaughtExceptionHandler()
를 구현체를 반환하도록 오버라이드하면 됩니다.
@Slf4j
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.warn("비동기 처리 예외 발생. method={}, message={}, params={}", ex.getMessage(), method.getName(), params);
}
}
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncExceptionHandler();
}
}
Async AOP
@Async
를 사용하게 되면 스프링 AOP를 이용해서 비동기 실행이 적용되게 됩니다. 때문에 @Transactional
이 그러하듯 @Async
역시 내부 호출에 유의해야 합니다. 실제 객체 대신 실제 객체에 대한 참조를 가지고 있는 프록시 객체가 Bean으로 등록되기 때문에 내부 호출 시 비동기 실행이 적용되지 않을 것입니다.
조금 더 들어가서 Bean으로 등록한 executor
와 asyncUncaughtExceptionHandler
가 어떻게 등록되고 사용되는지 알아보겠습니다.
AsyncAnnotationBeanPostProcessor
라는 빈 후처리기는 Bean으로 등록된 executor
와 앞서 AsyncConfigurer
에 등록한 asyncUncaughtExceptionHandler
를 가져와 advisor
를 세팅합니다.
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
...
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
}
AsyncAnnotationAdvisor
는 @Async
어노테이션을 포인트컷으로 추가하고, AnnotationAsyncExecutionInterceptor
를 어드바이스로 할당합니다.
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
private final Advice advice;
private Pointcut pointcut;
...
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
...
asyncAnnotationTypes.add(Async.class);
...
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
}
AnnotationAsyncExecutionInterceptor
의 부모 AsyncExecutionInterceptor
의 invoke()
메서드를 확인하면 task
를 생성하고 doSubmit()
메서드를 호출하여 excutor
에 등록하는 것을 확인할 수 있습니다.
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
...
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future<?> future) {
return future.get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
}
재시도는 어떻게 하지...?
AsyncUncaughtExceptionHandler
를 구현하여 예외 발생 시 에러 로그를 남기도록 예외 처리를 하기는 했지만 충분하지는 않아보입니다. 실제로 토이 프로젝트에 비동기 실행을 적용하면서 단순히 로그만 남기고 종료할 것이 아니라 로직이 실패하였으면 재시도 처리도 필요하다고 생각하였습니다.
처음 생각했던 방법은 별도의 Event
엔티티를 만드는 것이었습니다. 비동기 실행의 시도, 성공, 실패를 기록하고 실패한 경우 스케줄러를 돌려서 일정 시간마다 재시도한다...가 주요 골자였습니다. 그리고 참고할만한 문서가 있는지 찾던 중 두번째 방법인 Spring Retry라는 프로젝트에 대해 알게 되었고 이를 적용하게 되었습니다.
다음은 Spring Event의 이벤트 리스너에 비동기 실행을 위한 Spring Async, 재시도 처리를 위한 Spring Retry를 적용한 프로젝트 코드 중 일부입니다.
@Retryable(
retryFor = ApiException.class,
maxAttemptsExpression = "${retry.max-attempt}",
backoff = @Backoff(delayExpression = "${retry.delay}",
multiplierExpression = "${retry.multiply}"))
@Async(value = "generativeTaskExecutor")
@TransactionalEventListener
public void createHubTags(CreateHubLinkEvent event) {
AutoCreateHubTagCommand command = new AutoCreateHubTagCommand(event.hubId());
try {
CreateTagResponse response = tagUseCase.autoCreateHubTags(command);
log.debug("[Event] 허브 태그 자동 생성. tagIds={}", response.tagIds());
} catch (NotMetCondition e) {
log.debug("[Event] 허브 태그 자동 생성 취소. 조건을 만족하지 않음.");
}
}
이어지는 게시글에서는 Spring Retry를 이용해 비동기 실행 중 발생한 예외 상황에 대한 재시도 처리에 대해 다루어보겠습니다.
참고
https://www.baeldung.com/spring-async
https://docs.spring.io/spring-framework/reference/integration/scheduling.html
https://docs.spring.io/spring-boot/reference/features/task-execution-and-scheduling.html#features.task-execution-and-scheduling
'Spring' 카테고리의 다른 글
Jedis, Lettuce, Redisson. 레디스를 위한 세 가지 자바 클라이언트 (1) | 2024.10.30 |
---|---|
스프링 @Retry를 이용한 재시도 처리하기 (0) | 2024.06.12 |
스프링에서 외부 API 호출을 테스트하는 방법(feat. RestClient) (0) | 2024.05.20 |
스프링 이벤트(Spring Event) 사용해보기 (0) | 2024.02.12 |
로그 세팅하기 (1) | 2023.11.19 |