티스토리 뷰

Java

@Async와 ThreadPoolTaskExecutor

kunypony 2024. 2. 19. 20:04
728x90

@Async ThreadPoolTaskExecutor 스프링 프레임워크에서 비동기 처리를 위해 사용되는 중요한 요소이다. 성능 개선의 예제로 비동기 작업의 예시를 많이 봤을 것이다. 알아도 손해볼 거 없는 내용이니 간단한 예제 코드를 학습해보자.

 

@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(30);
        taskExecutor.setMaxPoolSize(30);
        taskExecutor.setQueueCapacity(100);
        taskExecutor.setThreadNamePrefix("sh-");
        taskExecutor.initialize();

        return taskExecutor;
    }
}

또는

SpringBoot를 사용할 경우 autoConfiguration으로 ThreadPoolTaskExecutor가 자동으로 등록
되기 때문에 application.yml으로 executor의 옵션을 지정해주는 방법도 있다.

spring:
  task:
    execution:
      pool:
        core-size: 30
        max-size: 30
        queue-capacity: 100
        keep-alive: 30s

 

위의 예제에서는 ThreadPoolTaskExecutor 빈을 설정하고, @EnableAsync 어노테이션을 통해 비동기 처리를 활성화하였다

ThreadPoolTaskExecutor은 TaskExecutor 인터페이스의 구현체로서 스레드 풀을 관리하며 비동기작업을 수행하는데 쓰인다. 동작 원리를 간단히 설명하면 작업을 큐에 쌓고 풀에 있는 스레드가 큐에서 작업을 꺼내어 비동기로 처리한다고 생각하면 될 듯 하다.  

ThreadPoolTaskExecutor의 속성은 다음과 같다.

   
corePoolSize 기본 thread 수
maxPoolSize 최대 thread의 수
queueCapacity thread-pool에서 사용할 최대 queue의 크기
threadNamePrefix thread의 이름의 prefix

큐 용량이 꽉차게 되면 corePoolSize에서 maxPoolSize까지 스레드를 생성하여 작업을 처리한다. 

오해하지 말 것은 스레드의 수가 많아지면 컨텍스트 스위칭 비용이 더 발생하기 때문에 다다익선이 아니다. 보통 호스트의  CPU 코어 수와 동일하게 맞춘 케이스를 많이 봤다. JVM이 호스트의 코어 리소스와 1대1 매핑이라 그런것인지 스레드가 항상 활성 상태로 시스템이 병렬로 작업을 처리할 있는 최대한의 기회를 제공하며 모든 코어가 활용되면서 오버헤드 최소화할 수 있다고 한다. 이 부분은 궁금하면 더 찾아보도록 하자.

 

이제 적용할 메소드에 @Async를 붙여주기만 하면 된다.

@Slf4j
@Component
public class AsyncJob {
    @Async
    public void setJob(int minutes, int count)
    {
        try
        {
            Thread.sleep(minutes * 30000);
            log.info("async : {}", count);
        }
        catch (InterruptedException e)
        {
            log.info("async error : {}", e);
        }
    }
}

 

@Async를 사용할 때, Executor를 Bean에서 등록하지 않으면 Spring에서는 AsyncTaskExecutor을 사용해서 스레드를 알아서 관리한다. 이 때 AsyncTaskExecutor의 기본 설정인 SimpleAsyncTaskExecutor는 스레드 풀을 사용하지 않고, 매 요청마다 새로운 스레드를 생성해 작업을 수행한다.

만약 1000명의 사용자가 동시에 인증 메일 전송 요청을 한다면, 1000개의 스레드를 생성하려하고 이는 리소스 부족 문제로 이어지기 쉽다.

따라서 ThreadPoolTaskExecutor 같은 스레드 기반의 TaskExecutor 사용하도록 설정해야한다.

 

예제 코드

@RequiredArgsConstructor
@Slf4j
@Component
public class AsyncTest implements ApplicationRunner {

    private final AsyncJob asyncTimer;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        for (int i = 0; i < 100; i++)
        {
            asyncTimer.setJob(1, i);
            log.info("job 실행 현재 스레드 개수 : {}", Thread.activeCount());
        }
    }
}

 

실행 결과

실행 결과

위에서 corePoolSize로 설정한 30으로 설정하니

asyncTimer.setJob(1, i);

이 부분에서 30개의 스레드가 할당되고 나머지는 default 스레드20개가 할당된 것으로 확인된다.

실제 10초뒤에 타이머가 작동하고 난 이후 찍힌 로그를 보자.

10초 타이머 이후 결과

총 30개의 할당된 스레드가 작업하고 다음 1분의 작업을 기다린 후 나머지 30개의 작업이 진행된다. 비동기로 처리되었기에 순서가 섞인 것도 확인할 수 있다.

 

이때 유의해야하는 점은 다음과 같다.

  • 스레드 풀의 스레드 수가 maxPoolSize에 도달한 상태에서 새로운 요청이 들어오면, 더 이상 스레드를 생성할 수 없고 큐에도 대기시킬 수 없다. → TaskRejectedException이 발생한다.

예외 발생을 만들기 위해 큐 사이즈를 1로 수정하고 확인해보자.

taskExecutor.setCorePoolSize(1);
taskExecutor.setMaxPoolSize(1);
taskExecutor.setQueueCapacity(1);

 

다음과 같이 예상한 TaskRejectedException을 확인할 수 있었다.

스프링에서는 다음과 같은 예외 처리를 사용하고 핸들링할 수 있도록 제공한다.

RejectedExecutionHandler를 통해 ThreadPoolTaskExecutor에서 스레드풀 내에서 더 이상 작업을 처리할 수 없을때의 예외 처리 전략을 설정할 수 있다.

RejectedExecutionHandler의 기본 전략은 AbortPolicy이다.

Spring에서 제공하는 전략과 그 특징들은 아래와 같다.

  • AbortPolicy: TaskRejectedException을 발생시키며 종료한다.
  • CallerRunsPolicy: 스레드풀을 호출한 스레드에서 처리한다. (톰캣에서 스레드풀을 호출했다면, 톰캣 스레드가 요청을 처리한다.)
  • DiscardPolicy: 해당 요청들을 무시한다.
  • DiscardOldestPolicy: 큐에 있는 가장 오래된 요청을 삭제하고 새로운 요청을 받아들인다. (queueCapacity 0 경우 StackOverFlowError 발생한다.)
@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(1);
        taskExecutor.setMaxPoolSize(1);
        taskExecutor.setQueueCapacity(1);
        taskExecutor.setThreadNamePrefix("sh-");
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();

        return taskExecutor;
    }
}
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

CallerRunsPolicy를 활용하여 호출자의 스레드를 활용하여 작업하도록 설정하였고 정상 수행됨을 확인하였다.

 

Spring에서 제공하는 전략을 활용하는 방식 이외에도 직접 핸들링하는 방법도 있다.

@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(1);
        taskExecutor.setMaxPoolSize(1);
        taskExecutor.setQueueCapacity(1);
        taskExecutor.setThreadNamePrefix("sh-");
        taskExecutor.setRejectedExecutionHandler((r, exec) -> {
            throw new IllegalArgumentException("더 이상 요청을 처리할 수 없습니다.");
        });
        taskExecutor.initialize();
        return taskExecutor;
    }
}

 

이로서 간단하게 @Async와 ThreadPoolTaskExecutor를 활용하는 방법에 대해 살펴보았다.

ThreadPoolSize와 QueueSize를 적당히 호스트의 스펙을 맞춰 신중하게 결정해야한다 느낄 것이다. 너무 많은 스레드 풀은 컨텍스트 스위칭 코스트, 시간이 들 것이고 너무 작은 스레드 풀은 처리량이 다소 아쉬울 것 같다. 큐의 사이즈와 예외처리도 실무 환경에 맞춰 적절한 수준을 맞추도록 노력해보자. 

 

컨텍스트 스위칭을 최소화하기 위한 작업에 대해서 다음에 포스팅해보려고 한다.

'Java' 카테고리의 다른 글

gRPC란? RPC, gRPC, REST API  (0) 2024.03.05
[Java] CompletableFuture의 이해와 활용  (1) 2024.02.19
Java heap dump, JVM OOM(Out of Memory)  (1) 2024.01.09
[Java] MDC를 활용한 로그 추적  (0) 2023.08.04
[Java] ThreadLocal  (0) 2023.07.31
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2024/07   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30 31
글 보관함