이것이 자바다 책을 참고하였습니다.
스레드풀 스레드의 블로킹 방식 작업 완료 통보
ExecutorService
의 .submit()
메소드는 스레드 작업으로 준 Runnable
혹은 Callable
작업을 스레드 풀의 작업 큐에 저장하고 즉시 Future
객체를 반환한다.
Future<?> submit(Runnable task)
Future<V> submit(Runnable task, V result)
Future<V> submit(Callable<V> task)
Future 객체
Future
객체는 단순히 작업 결과를 받는 것이 아니라 작업이 완료될 때까지 기다렸다가 최종 결과를 얻는데 사용된다. Future
를 지연 완료(pending completion) 객체라고도 한다.
Future.get()
Future
의 .get()
메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 처리 결과를 리턴한다. 이것이 블로킹을 사용하는 작업 완료 통보 방식이다.
V get()
: 작업이 완료될 때까지 블로킹되었다가 처리 결과 V
를 반환한다.
V get(long timeout, TimeUnit unit)
: timeout 시간 전에 작업이 완료되면 결과 V를 리턴하지만, 작업이 완료되지 않으면 TimeoutException
을 발생시킨다.
Future.get()의 리턴값
submit(Runnable task)
리턴 타입: future.get() -> null
예외 발생: future.get() -> throw 예외객체
submit(Runnable task, Integer result)
리턴 타입: future.get() -> int
예외 발생: future.get() -> throw 예외객체
submit(Callable<String> task)
리턴 타입: future.get() -> String
예외 발생: future.get() -> throw 예외객체
주의사항: UI 처리 등을 하는 스레드가 Future
의 .get()
메소드를 호출하면, 반환 값이 있기 전까지 UI 작업 등 모든 작업을 멈추기 때문에 새로운 스레드를 생성해서 Future.get()
을 호출하거나, 스레드 풀의 다른 스레드에게 Future.get()
을 위임해야 한다.
다른 스레드에게 Future.get() 위임
Copy new Thread( new Runnable() {
@ Override
public void run() {
try {
future . get();
} catch ( Exception e) {
e . printStackTrace();
}
}
}) . start ();
스레드 풀 내의 다른 스레드에게 Future.get() 위임
Copy executorService . submit ( new Runnable() {
@ Override
public void run () {
try {
future . get ();
} catch ( Exception e) {
e . printStackTrace ();
}
}
});
Future의 기타 메소드
boolean cancel(boolean mayInterruptIfRunning)
: 작업 처리가 진행 중일 경우 취소
boolean isCancelled()
: 작업이 취소되었는지 여부
boolean isDone()
: 작업 처리가 완료되었는지 여부
리턴 값이 없는 작업 완료 통보
리턴값이 없는 작업은 Runnable
객체로 생성한다.
Future<?> submit(Runnable task)
메소드를 이용한다.
결과값이 없음에도 Future
객체를 반환하는데, 정상적으로 작업처리가 발생했는지, 예외가 발생했는지 확인하기 위해서다.
interrupt가 발생했으면 InterruptedException
을 발생시킨다.
작업처리 도중 예외가 발생했으면 ExecutionException
을 발생시킨다.
코드 예제
Copy public class NoResultExample {
public static void main ( String [] args) {
ExecutorService executorService = Executors . newFixedThreadPool (
Runtime . getRuntime () . availableProcessors ()
);
System . out . println ( "[작업 처리 요청]" );
Runnable runnable = () -> {
int sum = 0 ;
for ( int i = 1 ; i <= 10 ; i ++ ) {
sum += i;
}
System . out . println ( "[처리 결과] " + sum);
};
Future future = executorService . submit (runnable);
try {
future . get ();
System . out . println ( "[작업 처리 완료]" );
} catch ( Exception e) {
System . out . println ( "[실행 예외 발생] " + e . getMessage ());
e . printStackTrace ();
}
executorService . shutdown (); // 스레드 풀 종료
}
}
리턴 값이 있는 작업 완료 통보
리턴 값이 있는 작업의 경우 Callable
로 작업 객체를 생성하면 된다.
T
는 call()
메소드가 리턴하는 타입이 되도록하면 된다.
Copy Callable < T > task = new Callable < T >() {
@ Override
public T call () throws Exception {
// 스레드가 처리할 작업 내용
return T;
}
}
Future<T> future = executorService . submit (task);
리턴 값이 있는 작업 완료 통보 예제
Copy public class YesResultExample {
public static void main ( String [] args) {
ExecutorService executorService = Executors . newFixedThreadPool (
Runtime . getRuntime () . availableProcessors ()
);
System . out . println ( "[작업 처리 요청]" );
Callable < Integer > callable = () -> {
int sum = 0 ;
for ( int i = 1 ; i <= 10 ; i ++ ) {
sum += i;
}
System . out . println ( "[처리 결과] " + sum);
return sum;
};
Future < Integer > future = executorService . submit (callable);
try {
Integer resultInteger = future . get ();
System . out . println ( "[future.get()] " + resultInteger);
System . out . println ( "[작업 처리 완료]" );
} catch ( Exception e) {
System . out . println ( "[실행 예외 발생] " + e . getMessage ());
e . printStackTrace ();
}
executorService . shutdown ();
}
}
이전에 리턴 값이 없던 경우와 거의 같은데,
반환 값을 Future.get()
으로 받아주는 것만 다르다.
작업 처리 결과를 외부 객체에 저장
공유객체를 이용하여 스레드의 결과를 취합하는 경우가 있다.
Copy public class SendRunnableResultToAnotherObject {
public static void main ( String [] args) {
ExecutorService executorService = Executors . newFixedThreadPool (
Runtime . getRuntime () . availableProcessors ()
);
System . out . println ( "[작업 처리 요청]" );
class Result {
int accumValue;
synchronized void addValue ( int value) {
accumValue += value;
}
}
class Task implements Runnable {
Result result;
public Task ( Result result) {
this . result = result;
}
@ Override
public void run () {
int sum = 0 ;
for ( int i = 1 ; i <= 10 ; i ++ ) {
sum += i;
}
result . addValue (sum);
}
}
Result result = new Result() ;
Runnable task1 = new Task(result) ;
Runnable task2 = new Task(result) ;
Future < Result > future1 = executorService . submit (task1 , result);
Future < Result > future2 = executorService . submit (task2 , result);
try {
result = future1 . get ();
System . out . println ( "result.accumValue = " + result . accumValue );
result = future2 . get ();
System . out . println ( "[처리 결과] " + result . accumValue );
System . out . println ( "[작업 처리 완료]" );
} catch ( Exception e) {
System . out . println ( "[실행 예외 발생] " + e . getMessage ());
e . printStackTrace ();
}
executorService . shutdown ();
}
}
.submit()
메소드에 작업과 공유 객체를 인자로 넘기면 된다.
작업 완료 순으로 통보
작업의 완료 순서는 요청 순이 아니라서 먼저 요청한 작업이 나중에 끝나기도 하고, 가장 나중에 요청한 작업이 가장 먼저 끝나기도 한다.
이 경우 먼저 끝난 작업부터 결과를 얻어 이용할 수 있다.
CompletionService
객체는 처리 완료된 작업을 가져오는 .poll()
메소드와 .take()
메소드를 제공한다.
Future<V> poll()
: 완료된 작업의 Future
를 가져온다. 완료된 작업이 없다면 즉시 null
을 반환한다.
Future<V> poll(long timeout, TimeUnit unit)
: 완료된 작업이 없다면, timeout
까지 블로킹된다.
Future<V> take()
: 완료된 작업이 없다면 있을 때까지 블로킹된다.
Future<V> submit(Callable<V> task)
: 스레드 풀에 Callable
작업 요청
Future<V> submit(Runnable task, V result)
: 스레드 풀이 Runnable
작업 요청
예제 코드
Copy public class CompletionServiceExample extends Thread {
public static void main ( String [] args) {
ExecutorService executorService = Executors . newFixedThreadPool (
Runtime . getRuntime () . availableProcessors ()
);
// CompletionService 생성
CompletionService < Integer > completionService =
new ExecutorCompletionService < Integer >(executorService);
System . out . println ( "[3개의 Callable 객체 작업 처리 요청]" );
for ( int i = 0 ; i < 3 ; i ++ ) {
int finalI = i;
completionService . submit ( new Callable < Integer >() {
@ Override
public Integer call () throws Exception {
int sum = 0 ;
for ( int j = 1 ; j <= 10 ; j ++ ) {
sum += j;
}
return sum + finalI;
};
});
}
System . out . println ( "[처리 완료된 작업 확인]" );
executorService . submit ( new Runnable() {
@ Override
public void run () {
while ( true ) {
try {
Future < Integer > future = completionService . take ();
int value = future . get ();
System . out . println ( "[처리 결과] " + value);
} catch ( InterruptedException e) {
System . out . println ( "[INTERRUPT OCCURRED]" );
break ;
} catch ( ExecutionException e) {
e . printStackTrace ();
break ;
}
}
}
});
try {
Thread . sleep ( 3000 );
} catch ( InterruptedException e) {
e . printStackTrace ();
}
System . out . println ( "마지막 도달" );
executorService . shutdownNow ();
}
}
위의 예제는 CompletionService
를 이용하여 Callable
작업을 처리하는 예제이다.
몇번째 루프에서 넘어온 값(몇번째 테스크)인지 알기 위해 finalI
를 더해주었다.
마지막에 shutdownNow()
코드를 넣어서, 3초 뒤에는 스레드 풀 내의 모든 스레드에 Interrupt를 건다.
결국 .take()
메소드의 결과 값으로 Interrupt를 받게 되고 즉시 끝난다.
결과
2번째 1번째 3번째 순으로 연산이 끝났다.
콜백 방식의 작업 완료 통보
콜백이란 애플리케이션이 스레드에게 작업처리를 요청한 후, 스레드가 작업을 완료하면 특정 메소드를 자동 실행하는 기법을 말한다.
작업이 완료되면 자동으로 콜백 함수가 실행되기 때문에, 이전처럼 블록하고 기다릴 필요가 없다.
콜백 메소드를 가진 클래스가 필요하다.
java.nio.channels.CompletionHandler
를 이용해도 된다.
위는 비동기 통신에서 콜백 객체를 만들 때 사용된다.
정상종료를 위한 .completed()
메소드가 존재한다.
예외처리를 위한 .failed()
메소드가 존재한다.
Copy CompletionHandler < V , A > callback = new CompletionHandler < V , A >() {
@ Override
public void completed ( V result , A attachment) {
}
@ Override
public void failed ( Throwable exc , A attachment) {
}
}
위에서 A attachment
는 콜백 메소드 결과값 외에 추가적으로 전달할 객체가 있으면 설정해주면 된다.
예제 코드
Copy public class CallbackExample {
private ExecutorService executorService;
public CallbackExample () {
executorService = Executors . newFixedThreadPool (
Runtime . getRuntime () . availableProcessors ()
);
}
private CompletionHandler < Integer , Void > callback = new CompletionHandler < Integer , Void >() {
@ Override
public void completed ( Integer result , Void attachment) {
System . out . println ( "completed() 실행: " + result);
}
@ Override
public void failed ( Throwable exc , Void attachment) {
System . out . println ( "failed() 실행: " + exc . toString ());
}
};
public void doWork ( final String x , final String y) {
Runnable task = new Runnable() {
@ Override
public void run () {
try {
int intX = Integer . parseInt (x);
int intY = Integer . parseInt (y);
int result = intX + intY;
callback . completed (result , null );
} catch ( NumberFormatException e) {
callback . failed (e , null );
}
}
};
executorService . submit (task);
}
public void finish () {
executorService . shutdown ();
}
public static void main ( String [] args) {
CallbackExample callbackExample = new CallbackExample() ;
callbackExample . doWork ( "3" , "3" );
callbackExample . doWork ( "3" , "삼" );
callbackExample . finish ();
}
}