백기선 님의 <더 자바, Java 8>를 보고 공부한 내용을 기록합니다.
1-2. 자바에서 제공하는 concurrent programming
public class MultiThreadJava {
@Test
void Thread를_상속받아_구현하는_방법() {
HelloThread helloThread = new HelloThread();
helloThread.start();
System.out.println("Current Thread : " + Thread.currentThread().getName());
//New Thread : Thread-0
//Current Thread : main
//Current Thread : main
//New Thread : Thread-0
}
static class HelloThread extends Thread {
@Override
public void run() {
System.out.println("New Thread : " + Thread.currentThread().getName());
}
}
@Test
void Runnable로_구현하거나_람다를_쓰는_방법() {
Thread thread = new Thread(() -> System.out.println("New Thread" + Thread.currentThread().getName()));
thread.start();
System.out.println("Current Thread : " + Thread.currentThread().getName());
//New ThreadThread-0
//Current Thread : main
//Current Thread : main
//New ThreadThread-0
}
@Test
void 쓰레드_깨우기() throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {
System.out.println("New Thread" + Thread.currentThread().getName());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
System.out.println("wake!");
return;
}
}
});
thread.start();
System.out.println("Current Thread : " + Thread.currentThread().getName());
Thread.sleep(3000L);
thread.interrupt();
}
@Test
void 쓰레드_기다리기() throws InterruptedException {
Thread thread = new Thread(() -> {
System.out.println("New Thread" + Thread.currentThread().getName());
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
System.out.println("wake!");
return;
}
});
thread.start();
System.out.println("Current Thread : " + Thread.currentThread().getName());
thread.join();
System.out.println("thread finished! = " + thread.getName());
//New ThreadThread-0
//Current Thread : main
//thread finished! = Thread-0
}
}
2-1. 고수준(high-level) concurrency programming
public class ExecutorsTest {
@Test
void 싱글_쓰레드_만들기() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(runHello("Java"));
executorService.submit(runHello("Spring"));
executorService.submit(runHello("JPA"));
executorService.submit(runHello("Http"));
executorService.submit(runHello("MySQL"));
//Java : pool-1-thread-1
//Spring : pool-1-thread-1
//JPA : pool-1-thread-1
//Http : pool-1-thread-1
//MySQL : pool-1-thread-1
}
@Test
void 쓰레드_종료() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> System.out.println("Hello : " + Thread.currentThread().getName()));
executorService.shutdown(); //graceful shutdown : thread 내부의 일을 모두 처리하고 꺼짐
executorService.shutdownNow(); //thread 실행 중 언제라도 꺼질 수 있음
}
@Test
void 쓰레드풀_만들기() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(runHello("Java"));
executorService.submit(runHello("Spring"));
executorService.submit(runHello("JPA"));
executorService.submit(runHello("Http"));
executorService.submit(runHello("MySQL"));
//Java : pool-1-thread-1
//Spring : pool-1-thread-2 //2번 쓰레드에서 실행됨!
//JPA : pool-1-thread-1
//Http : pool-1-thread-1
//MySQL : pool-1-thread-1
}
private Runnable runHello(String message) {
return () -> System.out.println(message + " : " + Thread.currentThread().getName());
}
}
3-1. Callable : Runnable 과 유사하지만 반환값이 존재
@FunctionalInterface
public interface Runnable {
void run(); //void
}
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception; //return V
}
public class CallableAndFutureTest {
@Test
void get으로_결과값_가져오기() throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> helloFuture = executorService.submit(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, Future!";
});
System.out.println("Started!");
String message = helloFuture.get();
assertThat(message).isEqualTo("Hello, Future!");
System.out.println("Ended!");
executorService.shutdown();
}
@Test
void get은_타임아웃을_설정할_수_있다() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> helloFuture = executorService.submit(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, Future!";
});
//future.get(timeout, timeunit) : timeout 시간이 지나면, TimeoutException
assertThatThrownBy(() -> helloFuture.get(2, TimeUnit.SECONDS))
.isInstanceOf(TimeoutException.class);
}
@Test
void isDone으로_작업상태를_확인할_수_있다() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> helloFuture = executorService.submit(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, Future!";
});
assertThat(helloFuture.isDone()).isFalse();
System.out.println("Started!");
assertThat(helloFuture.get()).isEqualTo("Hello, Future!");
assertThat(helloFuture.isDone()).isTrue();
System.out.println("Ended!");
executorService.shutdown();
}
@Test
void cancel로_작업을_취소할_수_있다() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> helloFuture = executorService.submit(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, Future!";
});
assertThat(helloFuture.isCancelled()).isFalse();
assertThat(helloFuture.isDone()).isFalse();
System.out.println("Started!");
boolean cancel = helloFuture.cancel(false); //true : 진행중인 스레드 interrupt, false : 현재 진행중인 작업 끝날때까지 wait
System.out.println("cancel = " + cancel);
assertThat(helloFuture.isCancelled()).isTrue();
assertThat(helloFuture.isDone()).isTrue();
System.out.println("Ended!");
executorService.shutdown();
}
@Test
void isCanceled로_한번_취소된_서비스는_다시_get_할수없다() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> helloFuture = executorService.submit(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, Future!";
});
helloFuture.cancel(false);
assertThatThrownBy(() -> helloFuture.get())
.isInstanceOf(CancellationException.class);
}
@Test
void invokeAll로_여러작업을_동시에_실행한다() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () -> {
Thread.sleep(3000L);
return "Hello";
};
Callable<String> java = () -> {
Thread.sleep(1000L);
return "Java";
};
Callable<String> spring = () -> {
Thread.sleep(2000L);
return "Spring";
};
List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, java, spring));
for (Future<String> f : futures) {
System.out.println(f.get());
}
//동시에 출력 : 가장 시간이 긴 Hello 의 종료시간까지 기다렸다가 같이 출력된다.
//Hello
//Java
//Spring
}
@Test
void invokeAny로_여러작업중_하나라도_먼저_응답이_오면_끝낸다() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Callable<String> hello = () -> {
Thread.sleep(3000L);
return "Hello";
};
Callable<String> java = () -> {
Thread.sleep(1000L);
return "Java";
};
Callable<String> spring = () -> {
Thread.sleep(2000L);
return "Spring";
};
String result = executorService.invokeAny(Arrays.asList(hello, java, spring));
assertThat(result).isEqualTo("Java"); //가장 수행시간이 짧은 Java만 출력된다. 블로킹콜이다.
}
}
4-2. 기존의 Future 를 사용하면서 아쉬운 점
public class CompletableFutureTest {
@Test
void completableFuture1() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("flash");
assertThat(future.get()).isEqualTo("flash");
}
@Test
void completableFuture2() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.completedFuture("flash");
assertThat(future.get()).isEqualTo("flash");
}
@Test
void runAsync() {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
});
//Hello : ForkJoinPool.commonPool-worker-19
}
@Test
void supplyAsync() throws ExecutionException, InterruptedException {
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
System.out.println(stringCompletableFuture.get());
//Hello : ForkJoinPool.commonPool-worker-19
//Hello
}
@Test
void anotherExecutor() {
Executor executor = Executors.newSingleThreadExecutor();
CompletableFuture.runAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
}, executor);
//Hello : pool-1-thread-1 -> 지정된 executor 로 실행된다.
}
@Test
void thenApply_리턴값을받아_새로운값을_리턴() throws ExecutionException, InterruptedException {
Executor executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}, executor).thenApply((s) -> {
System.out.println("thenApply : " + Thread.currentThread().getName());
return s.toUpperCase(Locale.ROOT);
});
System.out.println(stringCompletableFuture.get());
//Hello : pool-1-thread-1
//thenApply : pool-1-thread-1
//HELLO
}
@Test
void thenAccept_리턴값을받아_리턴없이_처리() throws ExecutionException, InterruptedException {
Executor executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}, executor).thenAccept((s) -> {
System.out.println("thenApply : " + Thread.currentThread().getName());
});
System.out.println(voidCompletableFuture.get());
//Hello : pool-1-thread-1
//thenApply : pool-1-thread-1
//null
}
@Test
void thenRun_리턴값을_받아_다른작업을_처리하는_콜백() throws ExecutionException, InterruptedException {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}).thenRun(() -> System.out.println(Thread.currentThread().getName()));
System.out.println("voidCompletableFuture.get() = " + voidCompletableFuture.get());
//Hello : ForkJoinPool.commonPool-worker-19
//ForkJoinPool.commonPool-worker-19
//voidCompletableFuture.get() = null
}
@Test
void thenCompose() throws ExecutionException, InterruptedException {
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> stringCompletableFuture = helloFuture.thenCompose(CompletableFutureTest::getWorldFuture);
System.out.println(stringCompletableFuture.get());
//Hello : ForkJoinPool.commonPool-worker-19
//World : ForkJoinPool.commonPool-worker-5
//Hello World
}
private static CompletableFuture<String> getWorldFuture(String word) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return word + " World";
});
}
@Test
void thenCombine() throws ExecutionException, InterruptedException {
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> worldFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return "World";
});
CompletableFuture<String> result = helloFuture.thenCombine(worldFuture, (s1, s2) -> s1 + " " + s2);
System.out.println(result.get());
//Hello : ForkJoinPool.commonPool-worker-19
//World : ForkJoinPool.commonPool-worker-5
//Hello World
}
@Test
void allOf() throws ExecutionException, InterruptedException {
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> worldFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return "World";
});
List<CompletableFuture<String>> futures = Arrays.asList(helloFuture, worldFuture);
CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);
CompletableFuture<List<String>> listCompletableFuture = CompletableFuture.allOf(futuresArray)
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
List<String> strings = listCompletableFuture.get();
strings.forEach(System.out::println);
//World : ForkJoinPool.commonPool-worker-5
//Hello : ForkJoinPool.commonPool-worker-19
//Hello
//World
}
@Test
void anyOf() {
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> worldFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return "World";
});
CompletableFuture.anyOf(helloFuture, worldFuture)
.thenAccept(System.out::println);
//World : ForkJoinPool.commonPool-worker-5
//Hello : ForkJoinPool.commonPool-worker-19
//World
}
@Test
void exceptionally() throws ExecutionException, InterruptedException {
boolean isError = true;
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
if (isError) {
throw new IllegalArgumentException();
}
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}).exceptionally((ex) -> {
System.out.println(ex);
return "Default Value";
});
System.out.println(helloFuture.get());
//java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
//Default Value
}
@Test
void handle() throws ExecutionException, InterruptedException {
boolean isError = true;
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
if (isError) {
throw new IllegalArgumentException();
}
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}).handle((normalResult, exception) -> {
if (exception != null) {
System.out.println(exception);
return "Error!";
}
return normalResult;
});
System.out.println(helloFuture.get());
//java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
//Error!
}
}