2carrot84
by 2carrot84
4 min read

Categories

  • development

Tags

  • executor
  • executorService
  • java
  • threadPool

요즘 비동기관련 포스팅을 자주하게 되는 것 같습니다. 그동안 실무적으로 비동기 로직을 적용해 본적이 없었는데, 보다 보니 볼게 참 많은 것 같네요.

스프링에서 제공하는 @Async 를 이용한 비동기 방식과 ThreadPoolTaskExecutor 에 대한 짧은 포스팅에 이어, 오늘은 Java 에서 제공하는 ExecutorService 와 Executors 에 대해 알아보겠습니다.

[Spring] @Async 사용 방법과 주의사항
[Spring] ThreadPoolTaskExecutor 에 대한 짧은 지식

ExecutorService

병렬 작업 시 여러 개의 작업을 효율적으로 처리하기 위해 제공하는 Java 라이브러리

Runnable 인스턴스를 실행하는 Executor(작업 실행 책임만 가짐) 를 상속한 인터페이스로 작업(Runnable, Callable) 등록 및 Executor 의 실행을 위한 인터페이스

Runnable 은 응답값 없이 실행만 가능

package java.lang;

@FunctionalInterface
public interface Runnable {
	public abstract void run();
}

Callable 은 응답값이 있음

package java.util.concurrent;

@FunctionalInterface
public interface Callable<V> {
	V call() throws Exception;
}
  • ExecutorService 에서는 비동기 작업을 지원하는 submit 메소드를 제공
    • Future 를 반환
      • Future 의 결과를 받기 위해 get 함수를 호출할 수 있으나, blocking 으로 처리되어 비동기의 이점을 얻기 어렵다.
public interface ExecutorService extends Executor {
  ...
  <T> Future<T> submit(Callable<T> task);

  <T> Future<T> submit(Runnable task, T result);

  Future<?> submit(Runnable task);
  ...
}
public interface Future<V> {
    ...
	/**
	 * Waits if necessary for the computation to complete, and then
	 * retrieves its result.
	 ...
	 */
	V get() throws InterruptedException, ExecutionException;
    ...
}
  • 대표적인 구현체로 java.util.concurrent.ThreadPoolExecutor 가 존재한다.
  • 사용 후 반드시 shutdown() 메소드를 호출해야 한다.

간단한 예재 코드는 아래와 같이 만들어 보았습니다.

@Slf4j
public class ExecutorStudy {
	private final ExecutorService executorService;

	public ExecutorStudy(ExecutorService executorService) {
		this.executorService = executorService;
	}

	public void callExecutorService() {
		executorService.submit(() -> {
			log.info(">>>>> callExecutorService.submit 시작");
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
			log.info(">>>>> callExecutorService.submit 종료");
		});
		// executorService.shutdown();
	}

	public void callMethod() {
		log.info(">>>>> callMethod 시작");
		try {
			Thread.sleep(500);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
		log.info(">>>>> callMethod 종료");
	}
}

위 코드를 Executors 를 통해 다양한 테스트를 진행해 보려고 합니다.

Executors

스레드풀을 쉽게 생성할 수 있는 팩토리 클래스

newFixedThreadPool

고정된 스레드 개수를 갖는 스레드풀 생성

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
	@Test
	void newFixedThreadPool() {
		ExecutorService executorService = Executors.newFixedThreadPool(2);
		ExecutorStudy executorStudy = new ExecutorStudy(executorService);
		ThreadPoolExecutor executor = (ThreadPoolExecutor)executorService;

		executorStudy.callExecutorService();
		executorStudy.callExecutorService();
		executorStudy.callExecutorService();
		executorStudy.callMethod();

		assertThat(executor.getPoolSize()).isEqualTo(2);
		assertThat(executor.getQueue().size()).isEqualTo(1);
	}
12:10:30.898 [pool-1-thread-1] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 시작
12:10:30.898 [pool-1-thread-2] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 시작
12:10:30.898 [Test worker] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callMethod 시작
12:10:31.407 [Test worker] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callMethod 종료

주어진 스레드풀을 이용하여, 위에서 보았던 ExecutorService 의 submit 메소드를 실행하는 예제 코드입니다.

2개의 스레드를 갖는 스레드풀을 생성 후 3개의 요청을 보낸 결과입니다.

pool-1-thread-1, pool-1-thread-2 과 같이 스레드풀에서 2개의 스레드를 가져와 각 스레드에서 실행을 하는걸 확인할 수 있습니다.

ThreadPoolExecutor 생성 시 설정된 corePoolSize(2) 를 넘어서는 요청은 LinkedBlockingQueue 에 담기게 되는 것도 확인할 수 있습니다.

ExecutorService 의 submit 결과인 Future 를 리턴 받아 get 을 호출하지 않았기 때문에, non-blocking 방식으로 실행되어, “»»> callExecutorService.submit 종료” 로그가 남지 않고, 메인 스레드(Test worker)가 종료되는 것을 볼 수 있습니다.

newCachedThreadPool

필요할 때 필요한 만큼의 스레드풀을 생성
이미 생성된 스레드가 있다면 재활용

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
	@Test
	void newCachedThreadPool() throws InterruptedException {
		ExecutorService executorService = Executors.newCachedThreadPool();
		ExecutorStudy executorStudy = new ExecutorStudy(executorService);
		ThreadPoolExecutor executor = (ThreadPoolExecutor)executorService;

		executorStudy.callExecutorService();
		executorStudy.callExecutorService();
		executorStudy.callExecutorService();
		executorStudy.callMethod();

		assertThat(executor.getPoolSize()).isEqualTo(3);
		assertThat(executor.getQueue().size()).isEqualTo(0);

		Thread.sleep(5000);

		executorStudy.callExecutorService();

		assertThat(executor.getPoolSize()).isEqualTo(3);
		assertThat(executor.getQueue().size()).isEqualTo(0);
	}
13:10:22.687 [pool-1-thread-3] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 시작
13:10:22.687 [Test worker] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callMethod 시작
13:10:22.687 [pool-1-thread-2] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 시작
13:10:22.687 [pool-1-thread-1] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 시작
13:10:23.196 [Test worker] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callMethod 종료
13:10:23.696 [pool-1-thread-1] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 종료
13:10:23.696 [pool-1-thread-2] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 종료
13:10:23.696 [pool-1-thread-3] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 종료
13:10:28.239 [pool-1-thread-2] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 시작

newCachedThreadPool 로 생성 후 3개의 요청을 보내고 5초 후 다시 요청을 보낸 결과입니다.

pool-1-thread-1pool-1-thread-2pool-1-thread-3 과 같이 3개의 스레드를 갖는 스레드 풀이 생성되고 3개를 사용 후 재요청 시 기존 스레드(pool-1-thread-2)가 재활용 되는 것을 확인할 수 있었습니다.

만약 3개 이상의 요청을 순간적으로 한다면 Integer.MAX_VALUE 까지 추가로 스레드를 늘리게 될 것입니다.

newSingleThreadExecutor

1개의 스레드만을 가지는 스레드 풀을 생성

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
	@Test
	void newSingleThreadExecutor() throws InterruptedException {
		ExecutorService executorService = Executors.newSingleThreadExecutor();
		ExecutorStudy executorStudy = new ExecutorStudy(executorService);

		executorStudy.callExecutorService();
		executorStudy.callExecutorService();
		executorStudy.callExecutorService();
		executorStudy.callMethod();

		Thread.sleep(5000);
	}
13:19:51.581 [Test worker] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callMethod 시작
13:19:51.581 [pool-1-thread-1] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 시작
13:19:52.090 [Test worker] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callMethod 종료
13:19:52.590 [pool-1-thread-1] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 종료
13:19:52.591 [pool-1-thread-1] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 시작
13:19:53.597 [pool-1-thread-1] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 종료
13:19:53.598 [pool-1-thread-1] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 시작
13:19:54.603 [pool-1-thread-1] INFO com.example.demo.executor.ExecutorStudy -- >>>>> callExecutorService.submit 종료

싱글 스레드풀을 생성 후 3개의 요청을 보내고 5초간 기다려본 결과입니다.

3개의 요청 모두 하나의 스레드(pool-1-thread-1) 를 순차적으로 사용하는 걸 확인할 수 있습니다.

마무리

이번에는 ExecutorService 와 Executors 를 통해 간단히 몇가지 스레드풀을 만들어서 동작을 확인해 볼 수 있었습니다.

결국 비동기는 별도의 스레드로 요청을 보내고 결과 수신과 상관없이, 메인 스레드(호출한 스레드)를 종료하는게 핵심인거 같습니다.

다음에는 CompleteFuture 에 관해 학습한 내용을 포스팅해 보도록 하겠습니다.

그럼 이만. 🥕👋🏼🖐🏼

참고자료🤣

[Java] ExecutorService란?
[Java] Callable, Executors, ExecutorService 의 이해 및 사용법