12. 멀티 스레드 3

이것이 자바다 책을 참고하였습니다.

스레드풀 스레드의 블로킹 방식 작업 완료 통보

ExecutorService.submit() 메소드는 스레드 작업으로 준 Runnable 혹은 Callable 작업을 스레드 풀의 작업 큐에 저장하고 즉시 Future 객체를 반환한다.

  • Future<?> submit(Runnable task)

  • Future<V> submit(Runnable task, V result)

  • Future<V> submit(Callable<V> task)

Future 객체

Future 객체는 단순히 작업 결과를 받는 것이 아니라 작업이 완료될 때까지 기다렸다가 최종 결과를 얻는데 사용된다. Future를 지연 완료(pending completion) 객체라고도 한다.

Future.get()

Future.get() 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 처리 결과를 리턴한다. 이것이 블로킹을 사용하는 작업 완료 통보 방식이다.

  • V get(): 작업이 완료될 때까지 블로킹되었다가 처리 결과 V를 반환한다.

  • V get(long timeout, TimeUnit unit): timeout 시간 전에 작업이 완료되면 결과 V를 리턴하지만, 작업이 완료되지 않으면 TimeoutException을 발생시킨다.

Future.get()의 리턴값

  • submit(Runnable task)

    • 리턴 타입: future.get() -> null

    • 예외 발생: future.get() -> throw 예외객체

  • submit(Runnable task, Integer result)

    • 리턴 타입: future.get() -> int

    • 예외 발생: future.get() -> throw 예외객체

  • submit(Callable<String> task)

    • 리턴 타입: future.get() -> String

    • 예외 발생: future.get() -> throw 예외객체

주의사항: UI 처리 등을 하는 스레드가 Future.get() 메소드를 호출하면, 반환 값이 있기 전까지 UI 작업 등 모든 작업을 멈추기 때문에 새로운 스레드를 생성해서 Future.get()을 호출하거나, 스레드 풀의 다른 스레드에게 Future.get()을 위임해야 한다.

다른 스레드에게 Future.get() 위임

new Thread(new Runnable() {
  @Override
  public void run() {
    try {
      future.get();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}).start();

스레드 풀 내의 다른 스레드에게 Future.get() 위임

executorService.submit(new Runnable() {
  @Override
  public void run() {
    try {
      future.get();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
});

Future의 기타 메소드

  • boolean cancel(boolean mayInterruptIfRunning): 작업 처리가 진행 중일 경우 취소

    • 인자로 true를 주면 인터럽트를 건다.

  • boolean isCancelled(): 작업이 취소되었는지 여부

  • boolean isDone(): 작업 처리가 완료되었는지 여부

리턴 값이 없는 작업 완료 통보

  • 리턴값이 없는 작업은 Runnable 객체로 생성한다.

  • Future<?> submit(Runnable task) 메소드를 이용한다.

    • 결과값이 없음에도 Future객체를 반환하는데, 정상적으로 작업처리가 발생했는지, 예외가 발생했는지 확인하기 위해서다.

      • 정상적으로 완료됐으면 null을 반환한다.

      • interrupt가 발생했으면 InterruptedException을 발생시킨다.

      • 작업처리 도중 예외가 발생했으면 ExecutionException을 발생시킨다.

코드 예제

public class NoResultExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );

        System.out.println("[작업 처리 요청]");
        Runnable runnable = () -> {
            int sum = 0;
            for (int i = 1; i <= 10; i++) {
                sum+=i;
            }
            System.out.println("[처리 결과] " + sum);
        };

        Future future = executorService.submit(runnable);

        try {
            future.get();
            System.out.println("[작업 처리 완료]");
        } catch (Exception e) {
            System.out.println("[실행 예외 발생] " + e.getMessage());
            e.printStackTrace();
        }

        executorService.shutdown(); // 스레드 풀 종료
    }
}

리턴 값이 있는 작업 완료 통보

  • 리턴 값이 있는 작업의 경우 Callable로 작업 객체를 생성하면 된다.

  • Tcall() 메소드가 리턴하는 타입이 되도록하면 된다.

Callable<T> task = new Callable<T>() {
  @Override
  public T call() throws Exception {
    // 스레드가 처리할 작업 내용
    return T;
  }
}

Future<T> future = executorService.submit(task);

리턴 값이 있는 작업 완료 통보 예제

public class YesResultExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );

        System.out.println("[작업 처리 요청]");
        Callable<Integer> callable = () -> {
            int sum = 0;

            for (int i = 1; i <= 10; i++) {
                sum += i;
            }
            System.out.println("[처리 결과] " + sum);

            return sum;
        };

        Future<Integer> future = executorService.submit(callable);

        try {
            Integer resultInteger = future.get();
            System.out.println("[future.get()] " + resultInteger);
            System.out.println("[작업 처리 완료]");
        } catch (Exception e) {
            System.out.println("[실행 예외 발생] " + e.getMessage());
            e.printStackTrace();
        }

        executorService.shutdown();
    }
}

이전에 리턴 값이 없던 경우와 거의 같은데,

  • Callable 인터페이스를 구현하고,

  • .run() 메소드에서 값 반환을 하고,

  • 반환 값을 Future.get()으로 받아주는 것만 다르다.

작업 처리 결과를 외부 객체에 저장

  • 공유객체를 이용하여 스레드의 결과를 취합하는 경우가 있다.

public class SendRunnableResultToAnotherObject {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );

        System.out.println("[작업 처리 요청]");

        class Result {
            int accumValue;

            synchronized void addValue(int value) {
                accumValue += value;
            }
        }

        class Task implements Runnable {
            Result result;

            public Task(Result result) {
                this.result = result;
            }

            @Override
            public void run() {
                int sum = 0;
                for (int i = 1; i <= 10; i++) {
                    sum += i;
                }

                result.addValue(sum);
            }
        }

        Result result = new Result();

        Runnable task1 = new Task(result);
        Runnable task2 = new Task(result);

        Future<Result> future1 = executorService.submit(task1, result);
        Future<Result> future2 = executorService.submit(task2, result);

        try {
            result = future1.get();
            System.out.println("result.accumValue = " + result.accumValue);
            result = future2.get();
            System.out.println("[처리 결과] " + result.accumValue);
            System.out.println("[작업 처리 완료]");
        } catch (Exception e) {
            System.out.println("[실행 예외 발생] " + e.getMessage());
            e.printStackTrace();
        }

        executorService.shutdown();
    }
}
  • .submit() 메소드에 작업과 공유 객체를 인자로 넘기면 된다.

작업 완료 순으로 통보

  • 작업의 완료 순서는 요청 순이 아니라서 먼저 요청한 작업이 나중에 끝나기도 하고, 가장 나중에 요청한 작업이 가장 먼저 끝나기도 한다.

    • 이 경우 먼저 끝난 작업부터 결과를 얻어 이용할 수 있다.

    • CompletionService 객체는 처리 완료된 작업을 가져오는 .poll() 메소드와 .take() 메소드를 제공한다.

  • Future<V> poll(): 완료된 작업의 Future를 가져온다. 완료된 작업이 없다면 즉시 null을 반환한다.

  • Future<V> poll(long timeout, TimeUnit unit): 완료된 작업이 없다면, timeout까지 블로킹된다.

  • Future<V> take(): 완료된 작업이 없다면 있을 때까지 블로킹된다.

  • Future<V> submit(Callable<V> task): 스레드 풀에 Callable 작업 요청

  • Future<V> submit(Runnable task, V result): 스레드 풀이 Runnable 작업 요청

예제 코드

public class CompletionServiceExample extends Thread{
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );

        // CompletionService 생성
        CompletionService<Integer> completionService =
                new ExecutorCompletionService<Integer>(executorService);

        System.out.println("[3개의 Callable 객체 작업 처리 요청]");

        for (int i = 0; i < 3; i++) {
            int finalI = i;

            completionService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int sum = 0;

                    for (int j = 1; j <= 10; j++) {
                        sum += j;
                    }

                    return sum + finalI;
                };
            });
        }

        System.out.println("[처리 완료된 작업 확인]");

        executorService.submit(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        Future<Integer> future = completionService.take();
                        int value = future.get();
                        System.out.println("[처리 결과] " + value);
                    } catch (InterruptedException e) {
                        System.out.println("[INTERRUPT OCCURRED]");
                        break;
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                        break;
                    }
                }
            }
        });

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("마지막 도달");
        executorService.shutdownNow();
    }
}

위의 예제는 CompletionService를 이용하여 Callable 작업을 처리하는 예제이다.

  • 몇번째 루프에서 넘어온 값(몇번째 테스크)인지 알기 위해 finalI를 더해주었다.

  • 마지막에 shutdownNow() 코드를 넣어서, 3초 뒤에는 스레드 풀 내의 모든 스레드에 Interrupt를 건다.

  • 결국 .take() 메소드의 결과 값으로 Interrupt를 받게 되고 즉시 끝난다.

결과

2번째 1번째 3번째 순으로 연산이 끝났다.

콜백 방식의 작업 완료 통보

  • 콜백이란 애플리케이션이 스레드에게 작업처리를 요청한 후, 스레드가 작업을 완료하면 특정 메소드를 자동 실행하는 기법을 말한다.

    • 작업이 완료되면 자동으로 콜백 함수가 실행되기 때문에, 이전처럼 블록하고 기다릴 필요가 없다.

  • 콜백 메소드를 가진 클래스가 필요하다.

    • 직접 정의해도 하거나

    • java.nio.channels.CompletionHandler를 이용해도 된다.

      • 위는 비동기 통신에서 콜백 객체를 만들 때 사용된다.

      • 정상종료를 위한 .completed() 메소드가 존재한다.

      • 예외처리를 위한 .failed() 메소드가 존재한다.

CompletionHandler<V, A> callback = new CompletionHandler<V, A>() {
  @Override
  public void completed(V result, A attachment) {

  }

  @Override
  public void failed(Throwable exc, A attachment) {

  }
}
  • 위에서 A attachment는 콜백 메소드 결과값 외에 추가적으로 전달할 객체가 있으면 설정해주면 된다.

    • 없으면 그냥 null을 보내도 된다.

예제 코드

public class CallbackExample {
    private ExecutorService executorService;

    public CallbackExample() {
        executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );
    }

    private CompletionHandler<Integer, Void> callback = new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            System.out.println("completed() 실행: " + result);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            System.out.println("failed() 실행: " + exc.toString());
        }
    };

    public void doWork(final String x, final String y) {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                try {
                    int intX = Integer.parseInt(x);
                    int intY = Integer.parseInt(y);
                    int result = intX + intY;
                    callback.completed(result, null);
                } catch (NumberFormatException e) {
                    callback.failed(e, null);
                }
            }
        };

        executorService.submit(task);
    }

    public void finish() {
        executorService.shutdown();
    }

    public static void main(String[] args) {
        CallbackExample callbackExample = new CallbackExample();
        callbackExample.doWork("3", "3");
        callbackExample.doWork("3", "삼");
        callbackExample.finish();
    }
}

Last updated