# 12. 멀티 스레드 3

> [이것이 자바다](https://www.hanbit.co.kr/store/books/look.php?p_code=B1460673937) 책을 참고하였습니다.

## 스레드풀 스레드의 블로킹 방식 작업 완료 통보

`ExecutorService`의 `.submit()` 메소드는 스레드 작업으로 준 `Runnable` 혹은 `Callable` 작업을 스레드 풀의 작업 큐에 저장하고 즉시 `Future` 객체를 반환한다.

* `Future<?> submit(Runnable task)`
* `Future<V> submit(Runnable task, V result)`
* `Future<V> submit(Callable<V> task)`

### Future 객체

`Future` 객체는 단순히 작업 결과를 받는 것이 아니라 작업이 완료될 때까지 기다렸다가 최종 결과를 얻는데 사용된다. `Future`를 지연 완료(pending completion) 객체라고도 한다.

### Future.get()

`Future`의 `.get()` 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 처리 결과를 리턴한다. 이것이 블로킹을 사용하는 작업 완료 통보 방식이다.

* `V get()`: 작업이 완료될 때까지 블로킹되었다가 처리 결과 `V`를 반환한다.
* `V get(long timeout, TimeUnit unit)`: timeout 시간 전에 작업이 완료되면 결과 V를 리턴하지만, 작업이 완료되지 않으면 `TimeoutException`을 발생시킨다.

#### Future.get()의 리턴값

* `submit(Runnable task)`
  * 리턴 타입: `future.get() -> null`
  * 예외 발생: `future.get() -> throw 예외객체`
* `submit(Runnable task, Integer result)`
  * 리턴 타입: `future.get() -> int`
  * 예외 발생: `future.get() -> throw 예외객체`
* `submit(Callable<String> task)`
  * 리턴 타입: `future.get() -> String`
  * 예외 발생: `future.get() -> throw 예외객체`

> 주의사항: UI 처리 등을 하는 스레드가 `Future`의 `.get()` 메소드를 호출하면, 반환 값이 있기 전까지 UI 작업 등 모든 작업을 멈추기 때문에 새로운 스레드를 생성해서 `Future.get()`을 호출하거나, 스레드 풀의 다른 스레드에게 `Future.get()`을 위임해야 한다.

#### 다른 스레드에게 Future.get() 위임

```java
new Thread(new Runnable() {
  @Override
  public void run() {
    try {
      future.get();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}).start();
```

#### 스레드 풀 내의 다른 스레드에게 Future.get() 위임

```java
executorService.submit(new Runnable() {
  @Override
  public void run() {
    try {
      future.get();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
});
```

### Future의 기타 메소드

* `boolean cancel(boolean mayInterruptIfRunning)`: 작업 처리가 진행 중일 경우 취소
  * 인자로 `true`를 주면 인터럽트를 건다.
* `boolean isCancelled()`: 작업이 취소되었는지 여부
* `boolean isDone()`: 작업 처리가 완료되었는지 여부

## 리턴 값이 없는 작업 완료 통보

* 리턴값이 없는 작업은 `Runnable` 객체로 생성한다.
* `Future<?> submit(Runnable task)` 메소드를 이용한다.
  * 결과값이 없음에도 `Future`객체를 반환하는데, 정상적으로 작업처리가 발생했는지, 예외가 발생했는지 확인하기 위해서다.
    * 정상적으로 완료됐으면 `null`을 반환한다.
    * interrupt가 발생했으면 `InterruptedException`을 발생시킨다.
    * 작업처리 도중 예외가 발생했으면 `ExecutionException`을 발생시킨다.

### 코드 예제

```java
public class NoResultExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );

        System.out.println("[작업 처리 요청]");
        Runnable runnable = () -> {
            int sum = 0;
            for (int i = 1; i <= 10; i++) {
                sum+=i;
            }
            System.out.println("[처리 결과] " + sum);
        };

        Future future = executorService.submit(runnable);

        try {
            future.get();
            System.out.println("[작업 처리 완료]");
        } catch (Exception e) {
            System.out.println("[실행 예외 발생] " + e.getMessage());
            e.printStackTrace();
        }

        executorService.shutdown(); // 스레드 풀 종료
    }
}
```

## 리턴 값이 있는 작업 완료 통보

* 리턴 값이 있는 작업의 경우 `Callable`로 작업 객체를 생성하면 된다.&#x20;
* `T`는 `call()` 메소드가 리턴하는 타입이 되도록하면 된다.

```java
Callable<T> task = new Callable<T>() {
  @Override
  public T call() throws Exception {
    // 스레드가 처리할 작업 내용
    return T;
  }
}

Future<T> future = executorService.submit(task);
```

### 리턴 값이 있는 작업 완료 통보 예제

```java
public class YesResultExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );

        System.out.println("[작업 처리 요청]");
        Callable<Integer> callable = () -> {
            int sum = 0;

            for (int i = 1; i <= 10; i++) {
                sum += i;
            }
            System.out.println("[처리 결과] " + sum);

            return sum;
        };

        Future<Integer> future = executorService.submit(callable);

        try {
            Integer resultInteger = future.get();
            System.out.println("[future.get()] " + resultInteger);
            System.out.println("[작업 처리 완료]");
        } catch (Exception e) {
            System.out.println("[실행 예외 발생] " + e.getMessage());
            e.printStackTrace();
        }

        executorService.shutdown();
    }
}
```

이전에 리턴 값이 없던 경우와 거의 같은데,

* `Callable` 인터페이스를 구현하고,
* `.run()` 메소드에서 값 반환을 하고,
* 반환 값을 `Future.get()`으로 받아주는 것만 다르다.

## 작업 처리 결과를 외부 객체에 저장

* 공유객체를 이용하여 스레드의 결과를 취합하는 경우가 있다.

```java
public class SendRunnableResultToAnotherObject {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );

        System.out.println("[작업 처리 요청]");

        class Result {
            int accumValue;

            synchronized void addValue(int value) {
                accumValue += value;
            }
        }

        class Task implements Runnable {
            Result result;

            public Task(Result result) {
                this.result = result;
            }

            @Override
            public void run() {
                int sum = 0;
                for (int i = 1; i <= 10; i++) {
                    sum += i;
                }

                result.addValue(sum);
            }
        }

        Result result = new Result();

        Runnable task1 = new Task(result);
        Runnable task2 = new Task(result);

        Future<Result> future1 = executorService.submit(task1, result);
        Future<Result> future2 = executorService.submit(task2, result);

        try {
            result = future1.get();
            System.out.println("result.accumValue = " + result.accumValue);
            result = future2.get();
            System.out.println("[처리 결과] " + result.accumValue);
            System.out.println("[작업 처리 완료]");
        } catch (Exception e) {
            System.out.println("[실행 예외 발생] " + e.getMessage());
            e.printStackTrace();
        }

        executorService.shutdown();
    }
}
```

* `.submit()` 메소드에 작업과 공유 객체를 인자로 넘기면 된다.

## 작업 완료 순으로 통보

* 작업의 완료 순서는 요청 순이 아니라서 먼저 요청한 작업이 나중에 끝나기도 하고, 가장 나중에 요청한 작업이 가장 먼저 끝나기도 한다.
  * 이 경우 먼저 끝난 작업부터 결과를 얻어 이용할 수 있다.
  * `CompletionService` 객체는 처리 완료된 작업을 가져오는 `.poll()` 메소드와 `.take()` 메소드를 제공한다.
* `Future<V> poll()`: 완료된 작업의 `Future`를 가져온다. 완료된 작업이 없다면 즉시 `null`을 반환한다.
* `Future<V> poll(long timeout, TimeUnit unit)`: 완료된 작업이 없다면, `timeout`까지 블로킹된다.
* `Future<V> take()`: 완료된 작업이 없다면 있을 때까지 블로킹된다.
* `Future<V> submit(Callable<V> task)`: 스레드 풀에 `Callable` 작업 요청
* `Future<V> submit(Runnable task, V result)`: 스레드 풀이 `Runnable` 작업 요청

### 예제 코드

```java
public class CompletionServiceExample extends Thread{
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );

        // CompletionService 생성
        CompletionService<Integer> completionService =
                new ExecutorCompletionService<Integer>(executorService);

        System.out.println("[3개의 Callable 객체 작업 처리 요청]");

        for (int i = 0; i < 3; i++) {
            int finalI = i;

            completionService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int sum = 0;

                    for (int j = 1; j <= 10; j++) {
                        sum += j;
                    }

                    return sum + finalI;
                };
            });
        }

        System.out.println("[처리 완료된 작업 확인]");

        executorService.submit(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        Future<Integer> future = completionService.take();
                        int value = future.get();
                        System.out.println("[처리 결과] " + value);
                    } catch (InterruptedException e) {
                        System.out.println("[INTERRUPT OCCURRED]");
                        break;
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                        break;
                    }
                }
            }
        });

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("마지막 도달");
        executorService.shutdownNow();
    }
}
```

위의 예제는 `CompletionService`를 이용하여 `Callable` 작업을 처리하는 예제이다.

* 몇번째 루프에서 넘어온 값(몇번째 테스크)인지 알기 위해 `finalI`를 더해주었다.
* 마지막에 `shutdownNow()` 코드를 넣어서, 3초 뒤에는 스레드 풀 내의 모든 스레드에 Interrupt를 건다.
* 결국 `.take()` 메소드의 결과 값으로 Interrupt를 받게 되고 즉시 끝난다.

### 결과

![](https://images.velog.io/images/jakeseo_me/post/8f41cabe-9ae8-491e-aafc-6db08b367edd/image.png)

2번째 1번째 3번째 순으로 연산이 끝났다.

## 콜백 방식의 작업 완료 통보

* 콜백이란 애플리케이션이 스레드에게 작업처리를 요청한 후, 스레드가 작업을 완료하면 특정 메소드를 자동 실행하는 기법을 말한다.
  * 작업이 완료되면 자동으로 콜백 함수가 실행되기 때문에, 이전처럼 블록하고 기다릴 필요가 없다.
* 콜백 메소드를 가진 클래스가 필요하다.
  * 직접 정의해도 하거나
  * `java.nio.channels.CompletionHandler`를 이용해도 된다.
    * 위는 비동기 통신에서 콜백 객체를 만들 때 사용된다.
    * 정상종료를 위한 `.completed()` 메소드가 존재한다.
    * 예외처리를 위한 `.failed()` 메소드가 존재한다.

```java
CompletionHandler<V, A> callback = new CompletionHandler<V, A>() {
  @Override
  public void completed(V result, A attachment) {

  }

  @Override
  public void failed(Throwable exc, A attachment) {

  }
}
```

* 위에서 `A attachment`는 콜백 메소드 결과값 외에 추가적으로 전달할 객체가 있으면 설정해주면 된다.
  * 없으면 그냥 `null`을 보내도 된다.

### 예제 코드

```java
public class CallbackExample {
    private ExecutorService executorService;

    public CallbackExample() {
        executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors()
        );
    }

    private CompletionHandler<Integer, Void> callback = new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            System.out.println("completed() 실행: " + result);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            System.out.println("failed() 실행: " + exc.toString());
        }
    };

    public void doWork(final String x, final String y) {
        Runnable task = new Runnable() {
            @Override
            public void run() {
                try {
                    int intX = Integer.parseInt(x);
                    int intY = Integer.parseInt(y);
                    int result = intX + intY;
                    callback.completed(result, null);
                } catch (NumberFormatException e) {
                    callback.failed(e, null);
                }
            }
        };

        executorService.submit(task);
    }

    public void finish() {
        executorService.shutdown();
    }

    public static void main(String[] args) {
        CallbackExample callbackExample = new CallbackExample();
        callbackExample.doWork("3", "3");
        callbackExample.doWork("3", "삼");
        callbackExample.finish();
    }
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://n00nietzsche.gitbook.io/jake-seo/books/this-is-java/12.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
