본문 바로가기
Java

[Java] ExecutorService, ScheduledExecutorService, and Future

by 곰민 2023. 3. 7.
728x90

Java에서 ExecutorService는 스레드를 사용하여 작업을 비동기적으로 실행하는 방법을 제공합니다.
ExecutorService를 상속하는 ScheduledExecutorService는 지정된 시간, 고정된 속도 또는 고정된 지연으로 실행되도록 작업을 예약하는 메서드를 제공합니다.
Future는 비동기 작업의 결과를 나타내며 작업의 상태를 확인하거나 필요한 경우 취소할 수 있습니다.
요 세 녀석을 자세하게 살펴보도록 하겠습니다.

 

릴라드 타임마! 가 아닌 미룰 수 없다 java 쓰레드와 동시성 파먹기

 

ExecutorService


Java에서 스레드는 운영 체제의 리소스인 시스템 수준 스레드에 매핑됩니다.

스레드를 제어할 수 없을 정도로 많이 생성하면 이러한 리소스가 빠르게 부족해질 수 있습니다.

 

Java에서는 Executor framework가 thread pool을 생성하고 사용하는 간단한 방법을 제공합니다.

thread pool을 사용하면 스레드 생성 및 소멸과 관련된 오버헤드를 줄이고 애플리케이션에서 실행되는 스레드 수를 제어하는 등 여러 가지 이점을 얻을 수 있습니다.

 

Executor framework는 thread pool에서 작업을 생성하고 실행하기 위한 상위 수준 API를 정의하는 ExecutorService interface를 기반으로 구축됩니다.

 

출처 : https://www.geeksforgeeks.org/scheduledexecutorservice-interface-in-java/

 

ExecutorService interface는 execute(), submit(), invokeAll() 등 스레드 풀에 작업을 제출하기 위한 여러 메서드를 제공합니다.

ExecutorService는 각각 고유한 특징과 기능을 가진 ThreadPoolExecutor, ScheduledThreadPoolExecutor, ForkJoinPool을 포함한 여러 구현 클래스를 제공합니다.

 

출처 : https://www.baeldung.com/thread-pool-java-and-guava

 

ExecutorService와 ThreadPool 유형


대표적인 세 가지 ExecutorService 유형은 다음과 같습니다.

하위 예시에서 log.info는 @Sl4fj가 설정되어 있다고 가정합니다.

 

Fixed Thread Pool

고정된 크기의 스레드 풀을 생성하는 유형입니다.

 

newFixedThreadPool(int nThreads) 메서드를 사용하여 스레드 풀의 크기를 지정할 수 있습니다.

지정한 크기만큼 스레드가 생성되어 작업을 수행합니다.

만약 스레드가 모두 사용 중이라면, 작업은 대기 상태로 들어갑니다.

 

public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(getRunnable("Hello"));
        executorService.submit(getRunnable("lion"));
        executorService.submit(getRunnable("tiger"));
        executorService.submit(getRunnable("shark"));


			//쓰레드 두가지를 번갈하가면서 실행을 진행합니다.
        executorService.shutdown();
    }

    private static Runnable getRunnable(String message){
        return () -> log.info(message + Thread.currentThread().getName());
    }

 

Single Thread Pool

하나의 스레드만 사용하는 스레드 풀을 생성하는 유형입니다.

Executors.newSingleThreadExecutor() 메소드를 사용하여 생성할 수 있습니다.

하나의 스레드가 순차적으로 작업을 수행합니다.

작업이 완료되면, 다음 작업을 수행합니다.

 

public static void main(String[] args) {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.submit(() -> {
			log.info("Thread" + Thread.currentThread().getContextClassLoader());
    });

//작업 실행후 다음 작업이 들어오기 전까지 대기하기 때문에 명시적으로 sutdown을 시켜야됨

    executorService.shutdown();
		//gracefull shutdown
}

 

Cached Thread Pool

필요에 따라 스레드를 동적으로 생성하는 스레드 풀을 생성하는 유형입니다.

Executors.newCachedThreadPool() 메소드를 사용하여 생성할 수 있습니다.

작업이 생성될 때마다 새로운 스레드가 생성되어 작업을 수행합니다.

만약 이전에 생성된 스레드가 사용되지 않고 있다면, 해당 스레드는 제거됩니다.

 

public class CachedThreadPoolDemo {

    public static void main(String[] args) {
        // cached thread pool 생성
        ExecutorService executor = Executors.newCachedThreadPool();

        // int stream 으로 task 할당 할당 된 만큼 thread 생성
        IntStream.range(0, 10)
            .forEach(i -> executor.execute(new DemoTask(i)));

        // Shutdown the thread pool when all tasks are completed
        executor.shutdown();
    }
}
@Slf4j
class DemoTask implements Runnable {
    private int taskId;

    public DemoTask(int taskId) {
        this.taskId = taskId;
    }

    @Override
    public void run() {
       log.info("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
    }
}

 

 

task가 실행되는 순서는 예측할 수 없습니다.

각 ExecutorService 유형은 다양한 상황에서 사용될 수 있습니다.

예를 들어, Fixed Thread Pool 은 동시에 실행되는 작업의 수가 일정하거나 작업의 수가 미리 예측 가능한 경우에 유용합니다.

반면에, Cached Thread Pool은 작업의 수가 동적으로 변하거나 작업의 지속 시간이 짧은 경우에 유용합니다.

 

 

Future


Future 인터페이스는 별도의 스레드에서 실행 중인 비동기 계산의 결과를 나타냅니다.

계산이 완료되었는지 확인하고, 계산 결과를 검색하거나, 필요한 경우 계산을 취소할 수 있는 방법을 제공합니다.

작업이 ExecutorService에 제출되면 작업의 결과를 나타내는 Future 객체를 반환합니다.

 

future get

 

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    Callable<String> hello = () -> {
        Thread.sleep(2000L);
        return "Hello";
    };

    Future<String> futuresubmit = executorService.submit(hello);
    log.info((futuresubmit.get().toUpperCase()));
    executorService.shutdown();
}

 

Future 인터페이스는 blocking method get()을 제공하며, 이 메서드는 Callable 태스크의 실제 실행 결과를 반환하거나 Runnable 태스크의 경우 null을 반환합니다.

get() 메서드는 블로킹이므로 계산이 완료될 때까지 기다렸다가 결과를 반환합니다.

또한 Future를 사용하여 작업을 실행하는 동안 발생하는 예외를 처리할 수 있으므로 오류와 예외를 원활하게 처리할 수 있습니다.

 

 

future isDone

 

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    Callable<String> hello = () -> {
        Thread.sleep(2000L);
        return "Hello";
    };

    Future<String> futuresubmit = executorService.submit(hello);
	log.info(String.valueOf(futuresubmit.isDone()));
	log.info("started");

    futuresubmit.get(); //blocking call 결과가 나올때까지 기다림.

	log.info(String.valueOf(futuresubmit.isDone()));
	log.info("End!");
    executorService.shutdown();
}

//false
//Started
//true
//End!
//출력

future의 get method를 호출하기 전 완료상태를 isdone 메서드를 통해서 확인할 수 있습니다.

 

future cancel

 

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    Callable<String> hello = () -> {
        Thread.sleep(2000L);
        return "Hello";
    };

    Future<String> futuresubmit = executorService.submit(hello);
	log.info(String.valueOf(futuresubmit.isDone()));
	log.info("started");

    futuresubmit.cancel(true); //interupt하고 종료
    //futuresubmit.cancel(false); //false를 한들 일단 리턴값을 가져 올 수 없음

	log.info(String.valueOf(futuresubmit.isDone())); //cancle을 호출한 이상 true가 됨

	log.info("End!");
    executorService.shutdown();
}

 

cancel 메서드를 사용하면 바로 인터럽트를 하고 종료를 합니다.

false 값을 주어도 리턴값을 가져올 수 없습니다.

 

future invokeAll 예시

 

public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hi = () -> {
            Thread.sleep(2000L);
            return "hi";
        };

        Callable<String> lion = () -> {
            Thread.sleep(3000L);
            return "lion";
        };

        Callable<String> tiger = () -> {
            Thread.sleep(1000L);
            return "tiger";
        };

        List<Callable<String>> tasks = Arrays.asList(hi, lion, tiger);
        List<Future<String>> futures = executorService.invokeAll(tasks);

        futures.stream().map(future -> {
            try {
                return future.get();
            } catch (InterruptedException | ExecutionException e) {
                log.error("An error occurred while waiting for a task to complete", e);
                return null;
            }
        }).forEach(log::info);

        executorService.shutdown();
    }

 

invokeAll의 경우 task가 전부 끝날 때까지 기다립니다.

 

future invokeAny 예제

 

public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = Executors.newFixedThreadPool(4);

        Callable<String> hi = () -> {
            Thread.sleep(2000L);
            return "hi";
        };

        Callable<String> lion = () -> {
            Thread.sleep(3000L);
            return "lion";
        };

        Callable<String> tiger = () -> {
            Thread.sleep(1000L);
            return "tiger";
        };

        String result = executorService.invokeAny(Arrays.asList(hi, lion, tiger));
        log.info(result);

        executorService.shutdown();
    }

 

각각의 task가 끝나기를 기다렸다가 로직 처리를 해야 되는 경우도 존재하지만.

server 3군데에서 sever에서 replication된 copy3가지를 전부 가져와야 한다면?

server 별 레이턴시가 다르다면 가장 빨리 처리된 task만 return 하면 수월합니다.

 

invokeAny의 경우 첫번째 task가 끝날 때까지 기다리고 첫 번째 task가 완료된다면 다른 task의 완료 여부와 상관없이 즉시 반환됩니다. 작업 중 하나라도 예외가 발생할 시 ExecutionException을 던집니다.

 

ScheduledExecutorService


ExecutorService를 상속받고 스케줄링 기능을 추가적으로 제공합니다.

이 인터페이스를 사용하면 특정 시간 또는 일정한 간격으로 실행할 작업을 예약할 수 있습니다.

 

ScheduledExecutorService를 인스턴스화하는 가장 좋은 방법은 Executors 클래스의 static 팩토리 메서드를 사용하는 것입니다.

 

혹시 static factory method를 모르신다면 아래 예전 포스팅을 참조해주시면 감사합니다.

https://colevelup.tistory.com/5

 

[EffectiveJava] 정적 팩터리 메서드(Static Factory Method) 장단점

정적 팩터리 매서드(Static Factory Method)는 Java 서적에 바이블이라고 할 수 있는 effective java를 읽은 개발자라면 생성자 대신 정적 팩토리 메서드를 고려하라 라는 첫 장에서 많이 보았을 겁니다. 정

colevelup.tistory.com

 

schedule 메서드

schedule를 사용하면 미래의 특정 시간에 작업을 한 번 실행하도록 예약할 수 있습니다.

schedule는 실행 가능 또는 호출 가능 객체와 지연 시간을 매개변수로 받습니다.

   public static void main(String[] args) {

        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

         executorService.schedule(() -> log.info("hi" + Thread.currentThread().getName()), 3, TimeUnit.SECONDS);
        // 3초 뒤 실행

        executorService.shutdown();
    }

 

 

scheduleAtFixedrate and scheduleWithFixedDelay

 

scheduleAtFixedRate 또는 scheduleWithFixedDelay 이 두 가지 메서드를 활용하면 고정된 간격으로 반복적으로 실행되도록 작업을 예약할 수 있습니다.

아래 두가지 예시 모두 다 출력을 확인하기 위해 Thread.sleep을 추가하였습니다.

 

scheduleAtFixedRate 메서드는 이전 작업이 완료되는 데 걸린 시간에 관계없이 고정된 속도로 작업이 실행되도록 예약합니다.

 

4가지 인자를 받습니다.

 

1. Runnable 

2. initialDelay - 첫 번째 실행 전 초기 지연 시간입니다.

3. period - 각 후속 작업 실행 사이의 기간입니다.

4. TimeUnit - 초기 지연 및 기간 매개 변수의 시간 단위입니다.

 

public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(() -> log.info("hi" + Thread.currentThread().getName()), 1, 5, TimeUnit.SECONDS);
        try {
            Thread.sleep(10000); // sleep for 10 seconds to allow the task to run
        } catch (InterruptedException e) {
            log.error("Interrupted while sleeping", e);
        }
        executorService.shutdown();
    }

 

scheduleWithFixedDelay 메서드는 한 task 실행이 끝날 때와 다음 실행이 시작될 때 사이에 고정된 지연을 두고 작업을 실행하도록 예약합니다.

 

4가지 인자를 받습니다.

 

1. Runnable 

2. initialDelay - 첫 번째 실행 전 초기 지연 시간입니다.

3. delay - 한 task 실행이 완료되고 다음 실행이 시작될 때까지의 지연 시간입니다.

4. TimeUnit - 초기 지연 및 기간 매개 변수의 시간 단위입니다.

 

public class ScheduledExecutorFixedDemo {
    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleWithFixedDelay(() -> log.info("hi" + Thread.currentThread().getName()), 1, 5, TimeUnit.SECONDS);
        try {
            Thread.sleep(10000); // sleep for 10 seconds to allow the task to run
        } catch (InterruptedException e) {
            log.error("Interrupted while sleeping", e);
        }

        executorService.shutdown();
    }
}

 

 

다음번 포스팅에서는 Java8에 추가된 CompletableFuture에 대해서 자세하게 다루어 보도록 하겠습니다.

 

참조


백기선 the java 8

https://www.baeldung.com/thread-pool-java-and-guava

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html

https://www.geeksforgeeks.org/scheduledexecutorservice-interface-in-java/

728x90

댓글