๋ฐฑ๊ธฐ์ ๋์ <๋ ์๋ฐ, Java 8>๋ฅผ ๋ณด๊ณ ๊ณต๋ถํ ๋ด์ฉ์ ๊ธฐ๋กํฉ๋๋ค.
1. ์๋ฐ์์์ concurrent
1-1. concorrent ์ํํธ์จ์ด
1-2. ์๋ฐ์์ ์ ๊ณตํ๋ concurrent programming
1-3. java ์ ๋ฉํฐ ์ฐ๋ ๋ ํ๋ก๊ทธ๋๋ฐ
public class MultiThreadJava {
@Test
void Thread๋ฅผ_์์๋ฐ์_๊ตฌํํ๋_๋ฐฉ๋ฒ() {
HelloThread helloThread = new HelloThread();
helloThread.start();
System.out.println("Current Thread : " + Thread.currentThread().getName());
//New Thread : Thread-0
//Current Thread : main
//Current Thread : main
//New Thread : Thread-0
}
static class HelloThread extends Thread {
@Override
public void run() {
System.out.println("New Thread : " + Thread.currentThread().getName());
}
}
@Test
void Runnable๋ก_๊ตฌํํ๊ฑฐ๋_๋๋ค๋ฅผ_์ฐ๋_๋ฐฉ๋ฒ() {
Thread thread = new Thread(() -> System.out.println("New Thread" + Thread.currentThread().getName()));
thread.start();
System.out.println("Current Thread : " + Thread.currentThread().getName());
//New ThreadThread-0
//Current Thread : main
//Current Thread : main
//New ThreadThread-0
}
@Test
void ์ฐ๋ ๋_๊นจ์ฐ๊ธฐ() throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {
System.out.println("New Thread" + Thread.currentThread().getName());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
System.out.println("wake!");
return;
}
}
});
thread.start();
System.out.println("Current Thread : " + Thread.currentThread().getName());
Thread.sleep(3000L);
thread.interrupt();
}
@Test
void ์ฐ๋ ๋_๊ธฐ๋ค๋ฆฌ๊ธฐ() throws InterruptedException {
Thread thread = new Thread(() -> {
System.out.println("New Thread" + Thread.currentThread().getName());
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
System.out.println("wake!");
return;
}
});
thread.start();
System.out.println("Current Thread : " + Thread.currentThread().getName());
thread.join();
System.out.println("thread finished! = " + thread.getName());
//New ThreadThread-0
//Current Thread : main
//thread finished! = Thread-0
}
}
2-1. ๊ณ ์์ค(high-level) concurrency programming
2-2. Executors ๊ฐ ํ๋ ์ผ
2-3. ์ฃผ์ ์ธํฐํ์ด์ค
public class ExecutorsTest {
@Test
void ์ฑ๊ธ_์ฐ๋ ๋_๋ง๋ค๊ธฐ() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(runHello("Java"));
executorService.submit(runHello("Spring"));
executorService.submit(runHello("JPA"));
executorService.submit(runHello("Http"));
executorService.submit(runHello("MySQL"));
//Java : pool-1-thread-1
//Spring : pool-1-thread-1
//JPA : pool-1-thread-1
//Http : pool-1-thread-1
//MySQL : pool-1-thread-1
}
@Test
void ์ฐ๋ ๋_์ข
๋ฃ() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> System.out.println("Hello : " + Thread.currentThread().getName()));
executorService.shutdown(); //graceful shutdown : thread ๋ด๋ถ์ ์ผ์ ๋ชจ๋ ์ฒ๋ฆฌํ๊ณ ๊บผ์ง
executorService.shutdownNow(); //thread ์คํ ์ค ์ธ์ ๋ผ๋ ๊บผ์ง ์ ์์
}
@Test
void ์ฐ๋ ๋ํ_๋ง๋ค๊ธฐ() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(runHello("Java"));
executorService.submit(runHello("Spring"));
executorService.submit(runHello("JPA"));
executorService.submit(runHello("Http"));
executorService.submit(runHello("MySQL"));
//Java : pool-1-thread-1
//Spring : pool-1-thread-2 //2๋ฒ ์ฐ๋ ๋์์ ์คํ๋จ!
//JPA : pool-1-thread-1
//Http : pool-1-thread-1
//MySQL : pool-1-thread-1
}
private Runnable runHello(String message) {
return () -> System.out.println(message + " : " + Thread.currentThread().getName());
}
}
3-1. Callable : Runnable ๊ณผ ์ ์ฌํ์ง๋ง ๋ฐํ๊ฐ์ด ์กด์ฌ
@FunctionalInterface
public interface Runnable {
void run(); //void
}
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception; //return V
}
public class CallableAndFutureTest {
@Test
void get์ผ๋ก_๊ฒฐ๊ณผ๊ฐ_๊ฐ์ ธ์ค๊ธฐ() throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> helloFuture = executorService.submit(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, Future!";
});
System.out.println("Started!");
String message = helloFuture.get();
assertThat(message).isEqualTo("Hello, Future!");
System.out.println("Ended!");
executorService.shutdown();
}
@Test
void get์_ํ์์์์_์ค์ ํ _์_์๋ค() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> helloFuture = executorService.submit(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, Future!";
});
//future.get(timeout, timeunit) : timeout ์๊ฐ์ด ์ง๋๋ฉด, TimeoutException
assertThatThrownBy(() -> helloFuture.get(2, TimeUnit.SECONDS))
.isInstanceOf(TimeoutException.class);
}
@Test
void isDone์ผ๋ก_์์
์ํ๋ฅผ_ํ์ธํ _์_์๋ค() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> helloFuture = executorService.submit(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, Future!";
});
assertThat(helloFuture.isDone()).isFalse();
System.out.println("Started!");
assertThat(helloFuture.get()).isEqualTo("Hello, Future!");
assertThat(helloFuture.isDone()).isTrue();
System.out.println("Ended!");
executorService.shutdown();
}
@Test
void cancel๋ก_์์
์_์ทจ์ํ _์_์๋ค() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> helloFuture = executorService.submit(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, Future!";
});
assertThat(helloFuture.isCancelled()).isFalse();
assertThat(helloFuture.isDone()).isFalse();
System.out.println("Started!");
boolean cancel = helloFuture.cancel(false); //true : ์งํ์ค์ธ ์ค๋ ๋ interrupt, false : ํ์ฌ ์งํ์ค์ธ ์์
๋๋ ๋๊น์ง wait
System.out.println("cancel = " + cancel);
assertThat(helloFuture.isCancelled()).isTrue();
assertThat(helloFuture.isDone()).isTrue();
System.out.println("Ended!");
executorService.shutdown();
}
@Test
void isCanceled๋ก_ํ๋ฒ_์ทจ์๋_์๋น์ค๋_๋ค์_get_ํ ์์๋ค() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> helloFuture = executorService.submit(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, Future!";
});
helloFuture.cancel(false);
assertThatThrownBy(() -> helloFuture.get())
.isInstanceOf(CancellationException.class);
}
@Test
void invokeAll๋ก_์ฌ๋ฌ์์
์_๋์์_์คํํ๋ค() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () -> {
Thread.sleep(3000L);
return "Hello";
};
Callable<String> java = () -> {
Thread.sleep(1000L);
return "Java";
};
Callable<String> spring = () -> {
Thread.sleep(2000L);
return "Spring";
};
List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, java, spring));
for (Future<String> f : futures) {
System.out.println(f.get());
}
//๋์์ ์ถ๋ ฅ : ๊ฐ์ฅ ์๊ฐ์ด ๊ธด Hello ์ ์ข
๋ฃ์๊ฐ๊น์ง ๊ธฐ๋ค๋ ธ๋ค๊ฐ ๊ฐ์ด ์ถ๋ ฅ๋๋ค.
//Hello
//Java
//Spring
}
@Test
void invokeAny๋ก_์ฌ๋ฌ์์
์ค_ํ๋๋ผ๋_๋จผ์ _์๋ต์ด_์ค๋ฉด_๋๋ธ๋ค() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Callable<String> hello = () -> {
Thread.sleep(3000L);
return "Hello";
};
Callable<String> java = () -> {
Thread.sleep(1000L);
return "Java";
};
Callable<String> spring = () -> {
Thread.sleep(2000L);
return "Spring";
};
String result = executorService.invokeAny(Arrays.asList(hello, java, spring));
assertThat(result).isEqualTo("Java"); //๊ฐ์ฅ ์ํ์๊ฐ์ด ์งง์ Java๋ง ์ถ๋ ฅ๋๋ค. ๋ธ๋กํน์ฝ์ด๋ค.
}
}
4-2. ๊ธฐ์กด์ Future ๋ฅผ ์ฌ์ฉํ๋ฉด์ ์์ฌ์ด ์
4-4. ๋น๋๊ธฐ๋ก ์์
์คํํ๊ธฐ
public class CompletableFutureTest {
@Test
void completableFuture1() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("flash");
assertThat(future.get()).isEqualTo("flash");
}
@Test
void completableFuture2() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.completedFuture("flash");
assertThat(future.get()).isEqualTo("flash");
}
@Test
void runAsync() {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
});
//Hello : ForkJoinPool.commonPool-worker-19
}
@Test
void supplyAsync() throws ExecutionException, InterruptedException {
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
System.out.println(stringCompletableFuture.get());
//Hello : ForkJoinPool.commonPool-worker-19
//Hello
}
@Test
void anotherExecutor() {
Executor executor = Executors.newSingleThreadExecutor();
CompletableFuture.runAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
}, executor);
//Hello : pool-1-thread-1 -> ์ง์ ๋ executor ๋ก ์คํ๋๋ค.
}
@Test
void thenApply_๋ฆฌํด๊ฐ์๋ฐ์_์๋ก์ด๊ฐ์_๋ฆฌํด() throws ExecutionException, InterruptedException {
Executor executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}, executor).thenApply((s) -> {
System.out.println("thenApply : " + Thread.currentThread().getName());
return s.toUpperCase(Locale.ROOT);
});
System.out.println(stringCompletableFuture.get());
//Hello : pool-1-thread-1
//thenApply : pool-1-thread-1
//HELLO
}
@Test
void thenAccept_๋ฆฌํด๊ฐ์๋ฐ์_๋ฆฌํด์์ด_์ฒ๋ฆฌ() throws ExecutionException, InterruptedException {
Executor executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}, executor).thenAccept((s) -> {
System.out.println("thenApply : " + Thread.currentThread().getName());
});
System.out.println(voidCompletableFuture.get());
//Hello : pool-1-thread-1
//thenApply : pool-1-thread-1
//null
}
@Test
void thenRun_๋ฆฌํด๊ฐ์_๋ฐ์_๋ค๋ฅธ์์
์_์ฒ๋ฆฌํ๋_์ฝ๋ฐฑ() throws ExecutionException, InterruptedException {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}).thenRun(() -> System.out.println(Thread.currentThread().getName()));
System.out.println("voidCompletableFuture.get() = " + voidCompletableFuture.get());
//Hello : ForkJoinPool.commonPool-worker-19
//ForkJoinPool.commonPool-worker-19
//voidCompletableFuture.get() = null
}
@Test
void thenCompose() throws ExecutionException, InterruptedException {
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> stringCompletableFuture = helloFuture.thenCompose(CompletableFutureTest::getWorldFuture);
System.out.println(stringCompletableFuture.get());
//Hello : ForkJoinPool.commonPool-worker-19
//World : ForkJoinPool.commonPool-worker-5
//Hello World
}
private static CompletableFuture<String> getWorldFuture(String word) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return word + " World";
});
}
@Test
void thenCombine() throws ExecutionException, InterruptedException {
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> worldFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return "World";
});
CompletableFuture<String> result = helloFuture.thenCombine(worldFuture, (s1, s2) -> s1 + " " + s2);
System.out.println(result.get());
//Hello : ForkJoinPool.commonPool-worker-19
//World : ForkJoinPool.commonPool-worker-5
//Hello World
}
@Test
void allOf() throws ExecutionException, InterruptedException {
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> worldFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return "World";
});
List<CompletableFuture<String>> futures = Arrays.asList(helloFuture, worldFuture);
CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);
CompletableFuture<List<String>> listCompletableFuture = CompletableFuture.allOf(futuresArray)
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
List<String> strings = listCompletableFuture.get();
strings.forEach(System.out::println);
//World : ForkJoinPool.commonPool-worker-5
//Hello : ForkJoinPool.commonPool-worker-19
//Hello
//World
}
@Test
void anyOf() {
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> worldFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("World : " + Thread.currentThread().getName());
return "World";
});
CompletableFuture.anyOf(helloFuture, worldFuture)
.thenAccept(System.out::println);
//World : ForkJoinPool.commonPool-worker-5
//Hello : ForkJoinPool.commonPool-worker-19
//World
}
@Test
void exceptionally() throws ExecutionException, InterruptedException {
boolean isError = true;
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
if (isError) {
throw new IllegalArgumentException();
}
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}).exceptionally((ex) -> {
System.out.println(ex);
return "Default Value";
});
System.out.println(helloFuture.get());
//java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
//Default Value
}
@Test
void handle() throws ExecutionException, InterruptedException {
boolean isError = true;
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
if (isError) {
throw new IllegalArgumentException();
}
System.out.println("Hello : " + Thread.currentThread().getName());
return "Hello";
}).handle((normalResult, exception) -> {
if (exception != null) {
System.out.println(exception);
return "Error!";
}
return normalResult;
});
System.out.println(helloFuture.get());
//java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
//Error!
}
}