본문 바로가기
Spring

스프링 @Async를 이용한 비동기 실행 적용하기

by hseong 2024. 6. 11.

Spring Async

@Async

비동기 실행을 위한 @Async를 사용하기 위해서는 먼저 적당한 @Configuration 클래스에 @EnableAsync를 추가해주어야 합니다.

@EnableAsync  
@Configuration  
public class AsyncConfig  {
}

그런 다음 비동기적으로 실행되길 원하는 메서드에 @Async 어노테이션을 추가하면 됩니다. 다음은 Spring Event의 이벤트 리스너의 비동기 실행을 적용하기 위해 @Async를 사용하는 코드입니다.

@Async  
@TransactionalEventListener  
public void createDefaultLinkBundle(CreateMemberEvent event) {  
    CreateLinkBundleCommand command  
        = new CreateLinkBundleCommand(event.memberId(), DEFAULT_LINK_BUNDLE, true);  
    linkBundleUseCase.createLinkBundle(command);  
}

스프링 부트는 Executor bean이 컨텍스트에 존재하지 않는 경우 ThreadPoolTaskExecutor를 기본으로 구성합니다. 단, Java 21 버전부터 추가된 가상 스레드를 이용하는 경우 구성이 달라질 수 있습니다.

Spring은 비동기 실행을 위한 추상화 TaskExecutor와 몇 가지 구현체를 제공합니다. 그 중 대표적인 것이 SimpleAsyncTaskExecutorThreadPoolTaskExecutor입니다.

SimpleAsyncTaskExecutor

스레드를 재사용하지 않고, 항상 호출마다 새로운 스레드를 시작합니다. 그러나 슬롯이 비워질 때까지 모든 호출을 차단하는 동시성 제한을 지원합니다.

ThreadPoolTaskExecutor

가장 일반적으로 사용되는 구현입니다. java.util.concurrent.ThreadPoolExecutor를 빈 스타일로 구성하고 TaskExecutor로 래핑합니다.

ThreadPoolExecutor의 동작 방식은 다음과 같습니다.

  1. 첫 작업이 들어오면, CorePoolSize만큼의 스레드를 생성합니다.
  2. 유저 요청이 들어올 때마다 작업 큐에 담아둡니다.
  3. CorePoolSize에 해당하는 스레드 중, 유휴상태(idle)인 스레드가 있다면 작업 큐에서 작업을 꺼내 스레드에 할당합니다.
    • 유휴상태인 스레드가 없다면, 작업은 작업 큐에서 대기합니다.
    • 작업 큐가 가득 차면, 스레드를 새로 생성합니다.
    • 생성할 수 있는 MaxPoolSize에 도달하면 설정된 RejectedExecutionHandler에 따라 작업이 처리됩니다.
  4. 작업이 완료되면 스레드는 다시 유휴상태로 돌아갑니다.
    • 작업 큐가 비어있고 CorePoolSize 이상의 스레드가 생성되어있다면 KeepAliveSeconds만큼 대기 후 스레드를 destroy 합니다.

Bean 구성

스프링 부트는 properties를 통해 Bean으로 자동 등록하는 ThreadPoolTaskExecutor의 설정을 구성할 수 있습니다.

spring:
  task:
    execution:
      pool:
        core-size: 8
        max-size: 8
        queue-capacity: 100
        keep-alive: 60s            

자동으로 구성된 Bean을 사용하는 대신 ThreadPoolTaskExecutor를 직접 @Bean으로 등록하면서 구성할 수도 있습니다.

@Bean 
public Executor taskExecutor() {  
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
    executor.setCorePoolSize(3);  // 최소 스레드 풀 사이즈
    executor.setMaxPoolSize(3);  // 최대 스레드 풀 사이즈
    executor.setQueueCapacity(50);  // 대기열 길이
    executor.setKeepAliveSeconds(60);  // 여분의 스레드 유휴 시간
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  // 최대 스레드 풀, 대기열 가득 찬 경우의 정책
    executor.setThreadNamePrefix("Task-");  // 스레드 프리픽스
    executor.initialize();  // ThreadPoolExecutor 구성
    return executor;  
}

또한, @Asyncvalue 속성을 이용하면 필요에 따라 Bean으로 등록한 다양한 executor를 지정하여 비동기 실행 시 사용할 수 있습니다. value에는 사용하고자 하는 executor의 Bean 이름을 지정하면 됩니다.

@EnableAsync
@Configuration
public class AsyncConfig {
    @Bean(name = "generativeTaskExecutor")  
    public Executor generativeTaskExecutor() {  
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        ... 
        executor.initialize();  
        return executor;  
    }
}

@Component
public class EventListener {

    @Async(value = "generativeTaskExecutor")  
    @TransactionalEventListener  
    public void createHubTags(CreateHubLinkEvent event) {  
        AutoCreateHubTagCommand command = new AutoCreateHubTagCommand(event.hubId());  
        try {  
            tagUseCase.autoCreateHubTags(command);  
        } catch (NotMetCondition e) {  
        }  
    }
}

RejectedExecutionHandler

스레드 풀이 설정한 최대 사이즈에 도달하고 대기열도 가득 찬경우 입력된 작업을 어떻게 처리할지 결정합니다. RejectedExecutionHandler 인터페이스는 더 이상 ThreadPoolExecutor에 의해 작업이 실행될 수 없을 때 호출하는 메서드인 rejectedExecution()을 정의하고 있습니다.

해당 인터페이스의 구현은 ThreadPoolExecutor에 정적 클래스로 구현되어 있으며 다음과 같은 것들이 있습니다.

CallerRunsPolicy

거부된 작업은 execute 메서드의 호출 스레드에서 직접 실행합니다.

AbortPolicy

ThreadPoolExecutor의 기본 핸들러로 RejectedExecutionException를 던집니다.

DiscardPolicy

거부된 작업은 무시됩니다.

DiscardOldestPolicy

아직 처리되지 않은 요청 중 가장 오래된 것을 삭제하고 작업을 실행합니다.

비동기 예외 처리

비동기 처리 중 발생한 예외는 AsyncUncaughtExceptionHandler를 구현하여 예외 처리가 가능합니다. 해당 핸들러를 등록하기 위해서는 AsyncConfigurergetAsyncUncaughtExceptionHandler()를 구현체를 반환하도록 오버라이드하면 됩니다.

@Slf4j  
public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {  

    @Override  
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {  
        log.warn("비동기 처리 예외 발생. method={}, message={}, params={}", ex.getMessage(), method.getName(), params);  
    }  
}

@EnableAsync  
@Configuration  
public class AsyncConfig implements AsyncConfigurer {
    @Override  
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {  
        return new AsyncExceptionHandler();  
    }
}

Async AOP

@Async를 사용하게 되면 스프링 AOP를 이용해서 비동기 실행이 적용되게 됩니다. 때문에 @Transactional이 그러하듯 @Async 역시 내부 호출에 유의해야 합니다. 실제 객체 대신 실제 객체에 대한 참조를 가지고 있는 프록시 객체가 Bean으로 등록되기 때문에 내부 호출 시 비동기 실행이 적용되지 않을 것입니다.

조금 더 들어가서 Bean으로 등록한 executorasyncUncaughtExceptionHandler가 어떻게 등록되고 사용되는지 알아보겠습니다.

AsyncAnnotationBeanPostProcessor라는 빈 후처리기는 Bean으로 등록된 executor와 앞서 AsyncConfigurer에 등록한 asyncUncaughtExceptionHandler를 가져와 advisor를 세팅합니다.

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {

    ...

    @Override  
    public void setBeanFactory(BeanFactory beanFactory) {  
        super.setBeanFactory(beanFactory);  

        AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);  
        if (this.asyncAnnotationType != null) {  
           advisor.setAsyncAnnotationType(this.asyncAnnotationType);  
        }  
        advisor.setBeanFactory(beanFactory);  
        this.advisor = advisor;  
    }
}

AsyncAnnotationAdvisor@Async 어노테이션을 포인트컷으로 추가하고, AnnotationAsyncExecutionInterceptor를 어드바이스로 할당합니다.

public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {  

    private final Advice advice;  

    private Pointcut pointcut;

    ...

    public AsyncAnnotationAdvisor(  
           @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {  
        ...
        asyncAnnotationTypes.add(Async.class);  
        ...
        this.advice = buildAdvice(executor, exceptionHandler);  
        this.pointcut = buildPointcut(asyncAnnotationTypes);
    }

        protected Advice buildAdvice(  
           @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {  

        AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);  
        interceptor.configure(executor, exceptionHandler);  
        return interceptor;  
    }
}

AnnotationAsyncExecutionInterceptor의 부모 AsyncExecutionInterceptorinvoke() 메서드를 확인하면 task를 생성하고 doSubmit() 메서드를 호출하여 excutor에 등록하는 것을 확인할 수 있습니다.

public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {

...

    @Override  
    @Nullable  
    public Object invoke(final MethodInvocation invocation) throws Throwable {  
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);  
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);  
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);  

        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);  
        if (executor == null) {  
           throw new IllegalStateException(  
                 "No executor specified and no default executor set on AsyncExecutionInterceptor either");  
        }  

        Callable<Object> task = () -> {  
           try {  
              Object result = invocation.proceed();  
              if (result instanceof Future<?> future) {  
                 return future.get();  
              }  
           }  
           catch (ExecutionException ex) {  
              handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());  
           }  
           catch (Throwable ex) {  
              handleError(ex, userDeclaredMethod, invocation.getArguments());  
           }  
           return null;  
        };  

        return doSubmit(task, executor, invocation.getMethod().getReturnType());  
    }
}

재시도는 어떻게 하지...?

AsyncUncaughtExceptionHandler를 구현하여 예외 발생 시 에러 로그를 남기도록 예외 처리를 하기는 했지만 충분하지는 않아보입니다. 실제로 토이 프로젝트에 비동기 실행을 적용하면서 단순히 로그만 남기고 종료할 것이 아니라 로직이 실패하였으면 재시도 처리도 필요하다고 생각하였습니다.

처음 생각했던 방법은 별도의 Event 엔티티를 만드는 것이었습니다. 비동기 실행의 시도, 성공, 실패를 기록하고 실패한 경우 스케줄러를 돌려서 일정 시간마다 재시도한다...가 주요 골자였습니다. 그리고 참고할만한 문서가 있는지 찾던 중 두번째 방법인 Spring Retry라는 프로젝트에 대해 알게 되었고 이를 적용하게 되었습니다.

다음은 Spring Event의 이벤트 리스너에 비동기 실행을 위한 Spring Async, 재시도 처리를 위한 Spring Retry를 적용한 프로젝트 코드 중 일부입니다.

@Retryable(  
    retryFor = ApiException.class,  
    maxAttemptsExpression = "${retry.max-attempt}",  
    backoff = @Backoff(delayExpression = "${retry.delay}",  
        multiplierExpression = "${retry.multiply}"))  
@Async(value = "generativeTaskExecutor")  
@TransactionalEventListener  
public void createHubTags(CreateHubLinkEvent event) {  
    AutoCreateHubTagCommand command = new AutoCreateHubTagCommand(event.hubId());  
    try {  
        CreateTagResponse response = tagUseCase.autoCreateHubTags(command);  
        log.debug("[Event] 허브 태그 자동 생성. tagIds={}", response.tagIds());  
    } catch (NotMetCondition e) {  
        log.debug("[Event] 허브 태그 자동 생성 취소. 조건을 만족하지 않음.");  
    }  
}

이어지는 게시글에서는 Spring Retry를 이용해 비동기 실행 중 발생한 예외 상황에 대한 재시도 처리에 대해 다루어보겠습니다.

참고

https://www.baeldung.com/spring-async
https://docs.spring.io/spring-framework/reference/integration/scheduling.html
https://docs.spring.io/spring-boot/reference/features/task-execution-and-scheduling.html#features.task-execution-and-scheduling