Java/Theory

[Java/Theory]28. Executor

양승길 2016. 9. 17. 17:02

[Java/Theory]28. Executor

(출처 : http://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/)

Java 8 에서 병행처리중 두 가지 사용법이Thread와 Executor가 있다.

먼저 Thread를 보도록 한다.


Thread & Runnables

현대 운영체제는 process와  thread를 통해 병행처리를 지원한다. process는 서로 간에 독립적으로 실행하는 program의 단위이다. 이 process 내부에서 thread가 병행하여 code를 실행 될 수있다. thread는 JDK 1.0이후로 자바에서 지원하는데, 시작하기 앞서 code가 task라 부르는 thread에 의해 실행된다는 사실을 알아야 된다. 이는 Runnable을 실행시켜서 작업하게 된다.

간단한 Thread를 예문을 들면, 실행결과는 때에 따라 다르다. 응용 프로그램이 커질수록 복잡 할 수 있다. 또한 Thread는 sleep이란 것을 특정 시간에 사용할 수도 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package example;
 
public class ExOfThread {
    
    public static void main(String args[]){
        Runnable task = () -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Hello " + threadName);
        };
 
        task.run();
 
        Thread thread = new Thread(task);
        thread.start();
 
        System.out.println("Done!");
 
    }
    
}
 
cs


1
2
3
Hello main
Done!
Hello Thread-0
cs


1
2
3
Hello main
Hello Thread-0
Done!
cs


Executor

병행처리 API는 thread와 작동하는 ExecutorService의 개념으로 시작된다.  Executor는 thread들을 pool로 관리하고 task를 비동기로 실행시킨다. 그래서 굳이 Thread를 생성할 필요가 없다. 내부적으로 있는 pool의 모든 thread들이 반복적 작업을 수행하기 위해 재사용될 것이다?(All threads of the internal pool will be reused under the hood for revenant tasks ) 그러니 단일적으로 ExecutorService로 작동하는 응용 프로그램에 있는 생명주기에 따라 병행처리 작업이 가능하다.

간단한 예시는 아래와 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package example;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ExOfExecutor {
    
    public static void main(String args[]){
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("Hello " + threadName);
        });
 
    }
    
}
 
cs


Executor는 서로 다른 ExecutorService를 생성하는 편리한 factory method를 제공한다. 위 예제는 thread pool 크기가 하나인 경우다. 실행 결과는 위에 있는 thread 예제와 비슷하지만 명백히 중요한 차이점이 있다. Executor의 경우 java process가 종료되지 않는다. 그러므로 명시적으로 종료시켜야 된ㄷ다. 그렇지 않으면 새 작업을 계속 듣게 된다.

ExecutorService는 shutdown(), shutdownNow()라는 두 method를 재공하는데 shutdown()은 현재 실행중인 작업을 종료시키기 위해 대기하고 shutdownNow()는 모든 작업을 막고 executor를 즉각 꺼버린다. 예제는 다음과 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package example;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
 
public class ExOfShutdown {
    
    public static void main(String args[]){
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.submit(() -> {
            try {
                System.out.println("attempt to shutdown executor");
                executor.shutdown();
                executor.awaitTermination(5, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                System.err.println("tasks interrupted");
            }
            finally {
                if (!executor.isTerminated()) {
                    System.err.println("cancel non-finished tasks");
                }
                executor.shutdownNow();
                System.out.println("shutdown finished");
            }
        });
        
 
 
    }
    
}
 
cs


1
2
3
attempt to shutdown executor
cancel non-finished tasks
shutdown finished
cs


Executor가 현재 작업을 실핼 시키면서, 특정 시간동안 대기하다가 그 시간이 지나면 모든 작업을 막고 종료시킨다.


Callable and Fututre

Runnable을 추가로 하여서, Executor는 Callable이란 또 다른 작업을 지원한다. 이 funcrional interface는 Runnable과 비슷하지만 Callable은 return 값이 있다.

Callable<Integer> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return 123;
    }
    catch (InterruptedException e) {
        throw new IllegalStateException("task interrupted", e);
    }
};


Callable은 Runnable처럼 ExecutorService에게 submit()를 통해 전달 될 수 있다. 

그러나 submit()는 작업이 끝나기 전까지 대기하지 않기 때문에 ExecutorService는 Callable의 결과를 return value를 직접 나타낼 수 업다. 그리하여 차후에 결과를 회수하는데 사용 될 수있는 Future라는 결과값을 나타낼 수있다.

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);

System.out.println("future done? " + future.isDone());

Integer result = future.get();

System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);

Callable을 Executor에 전달하고 가장 먼저 확인할 것은 Future가 isDone()을 통해서 완료가 됐는가 아닌가를 보는 것이다.

분명히 첫 번째에 있는 isDone은 정수값을 반환하기 전에 1초간 잠들고 있는 이후가 아니므로 false로 나타날 것이고, get()이란 method가 현재 thread를 막고 Callable의 실제 반환값 123을 전달하기까지 Callable의 완료를 대기하도록 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package example;
 
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
 
public class ExOfCallableFuture {
 
    public static void main(String args[]) {
        Callable<Integer> task = () -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                return 123;
            } catch (InterruptedException e) {
                throw new IllegalStateException("task interrupted", e);
            }
        };
 
        ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<Integer> future = executor.submit(task);
 
        System.out.println("future done? " + future.isDone());
 
        Integer result = null;
        try {
            result = future.get();
        } catch (InterruptedException | ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("future done? " + future.isDone());
        System.out.print("result: " + result);
 
    }
 
}
 
cs
future done? false
future done? true
result: 123

Future는 근본적으로 ExecutorService와 탄탄히 결합되어있다?(Future are tightly coupled to the underlying executor service.) 종결되지 않은 모든 furture는 executor가 shutdown되면 예외를 던지게 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package example;
 
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
 
public class ExOfCallableFuture {
 
    public static void main(String args[]) {
        Callable<Integer> task = () -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                return 123;
            } catch (InterruptedException e) {
                throw new IllegalStateException("task interrupted", e);
            }
        };
 
        ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<Integer> future = executor.submit(task);
 
        System.out.println("future done? " + future.isDone());
 
        Integer result = null;
        try {
            executor.shutdownNow();
            result = future.get();
        } catch (InterruptedException | ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("future done? " + future.isDone());
        System.out.print("result: " + result);
 
    }
 
}
 
cs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
future done? false
java.util.concurrent.ExecutionException: java.lang.IllegalStateException: task interrupted
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at example.ExOfCallableFuture.main(ExOfCallableFuture.java:30)
Caused by: java.lang.IllegalStateException: task interrupted
    at example.ExOfCallableFuture.lambda$0(ExOfCallableFuture.java:18)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:340)
    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
    at example.ExOfCallableFuture.lambda$0(ExOfCallableFuture.java:15)
    ... 4 more
future done? true
result: null
cs


Executor를 생성하는데 있어 newFixedThreadPool(int)를 사용하면 thread를 한 개만 허용하는thread pool의 executor service를 생성한다. 이는 newSingleThreadExecutor()와 동일하지만 앞으로도 1개 이상의 값들을 전달해야되므로 pool의 크기를 늘려야할 필요가 있다.


Timeouts

future.get()은 Callable이 종결되기 전까지 대기한다. 최악의 경우, Callable이 계속 실행된다는 점인데, 무반응의 응용 프로그램이 되기 마련이다. 이에 대응하여 timeout이라는 시나리오를 작성해본다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package example;
 
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
 
public class ExOfTimeout {
 
    public static void main(String args[]) {
        ExecutorService executor = Executors.newFixedThreadPool(1);
 
        Future<Integer> future = executor.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                return 123;
            }
            catch (InterruptedException e) {
                throw new IllegalStateException("task interrupted", e);
            }
        });
 
        try {
            future.get(1, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
 
}
 
cs


1
2
3
java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
    at example.ExOfTimeout.main(ExOfTimeout.java:26)
cs

이 예외가 발생되는 이유는 get()에서 최대 대기 시간을 1초로 했는데, sleep시간을 2초로 지정했기 때문이다.


InvokeAll

Callable과 Future를 list로 표현할때, 이에 대한 접근은 invokeAll()을 통해서 Executor에게 전달한다. Executor는 일괄적으로 callable을 전달하게끔 지원한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package example;
 
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ExOfInvokeAll {
 
    public static void main(String args[]) {
        ExecutorService executor = Executors.newWorkStealingPool();
 
        List<Callable<String>> callables = Arrays.asList(
                () -> "task1",
                () -> "task2",
                () -> "task3");
 
        try {
            executor.invokeAll(callables)
                .stream()
                .map(future -> {
                    try {
                        return future.get();
                    }
                    catch (Exception e) {
                        throw new IllegalStateException(e);
                    }
                })
                .forEach(System.out::println);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
 
}
 
cs

이 예제는 invokeAll이 발동되어 반환된 모든 Callable을 처리하기 위해 Java 8에서 제공하는 Stream을 이용한 것이다.


InvokeAny

invokeAll()외에 일괄적으로 Callable을 Executor에 전달하는 방법은 invokeAny()가 있다.

이는 invokeAll()과 다르게 모든 객체들을 반환하지 않고, 최초 완료한 객체가 나오기까지 객체들의 반환을 막아놓고 있다가 그 최초의 객체를  반환시킨다. (Instead of returning future objects this method blocks until the first callable terminates and returns the result of that callable.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package example;
 
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
 
public class ExOfInvokeAny {
 
    public static Callable<String> callable(String result, long sleepSeconds) {
        return () -> {
            TimeUnit.SECONDS.sleep(sleepSeconds);
            return result;
        };
    }
    
    public static void main(String args[]) {
        
        ExecutorService executor = Executors.newWorkStealingPool();
 
        List<Callable<String>> callables = Arrays.asList(
            callable("task1"2),
            callable("task2"1),
            callable("task3"3));
 
        String result = null;
        try {
            result = executor.invokeAny(callables);
        } catch (InterruptedException | ExecutionException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println(result);
 
        // => task2
    }
    
 
}
 
cs

위 예제의 경우 Callable들을 모아놓은 List에서 sleep 시간이 가장 먼저 끝나는 Callable이 task2인데 결과를 출력하면 task2가 나오게 하여금 invokeAny를 활용하였다.


Executor를 생성하는 newWorkStealingPool()은 Executor의 한 종류인 ForkJoinPool을 반환하는데 일반적인 Executor와 조금 차이가 있다. ...(Instead of using a fixed size thread-pool ForkJoinPools are created for a given parallelism size which per default is the number of available cores of the hosts CPU.