CompletableFuture

๋ฐฑ๊ธฐ์„  ๋‹˜์˜ <๋” ์ž๋ฐ”, Java 8>๋ฅผ ๋ณด๊ณ  ๊ณต๋ถ€ํ•œ ๋‚ด์šฉ์„ ๊ธฐ๋กํ•ฉ๋‹ˆ๋‹ค.

1. ์ž๋ฐ”์—์„œ์˜ concurrent

1-1. concorrent ์†Œํ”„ํŠธ์›จ์–ด

  • ๋™์‹œ์— ์—ฌ๋Ÿฌ ์ž‘์—…์„ ํ•  ์ˆ˜ ์žˆ๋Š” ์†Œํ”„ํŠธ์›จ์–ด

  • ์˜ˆ) ์›น ๋ธŒ๋ผ์šฐ์ €๋กœ ์œ ํŠœ๋ธŒ๋ฅผ ๋ณด๋ฉด์„œ ํ‚ค๋ณด๋“œ๋กœ ๋ฌธ์„œ์— ํƒ€์ดํ•‘

  • ์˜ˆ) ๋…นํ™”๋ฅผ ํ•˜๋ฉด์„œ ์ธํ…”๋ฆฌ J๋กœ ์ฝ”๋”ฉ์„ ํ•˜๊ณ , ์›Œ๋“œ์— ์ ์–ด๋‘” ๋ฌธ์„ค๋ฅด ๋ณด๊ฑฐ๋‚˜ ์ˆ˜์ •

1-2. ์ž๋ฐ”์—์„œ ์ œ๊ณตํ•˜๋Š” concurrent programming

  • ๋ฉ€ํ‹ฐ ํ”„๋กœ์„ธ์‹ฑ (ProcessBuilder)

  • ๋ฉ€ํ‹ฐ ์“ฐ๋ ˆ๋“œ

1-3. java ์˜ ๋ฉ€ํ‹ฐ ์“ฐ๋ ˆ๋“œ ํ”„๋กœ๊ทธ๋ž˜๋ฐ

  • ์“ฐ๋ ˆ๋“œ ๊ฐ„์—๋Š” ์ˆœ์„œ๋ฅผ ๋ณด์žฅํ•  ์ˆ˜ ์—†๋‹ค.

  • Runnable

  • Thread

    • ์“ฐ๋ ˆ๋“œ์˜ ์ฃผ์š”๊ธฐ๋Šฅ

      • sleep : ํ˜„์žฌ ์“ฐ๋ ˆ๋“œ ๋ฉˆ์ถฐ๋‘๊ธฐ. ๋‹ค๋ฅธ ์“ฐ๋ ˆ๋“œ๊ฐ€ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก ๊ธฐํšŒ๋ฅผ ์ฃผ์ง€๋งŒ, ๊ทธ๋ ‡๋‹ค๊ณ  ๋ฝ์„ ๋†”์ฃผ์ง€๋Š” ์•Š๋Š”๋‹ค. ์ž˜๋ชปํ•˜๋ฉด deadlock!

      • interrupt : ๋‹ค๋ฅธ ์“ฐ๋ ˆ๋“œ ๊นจ์šฐ๊ธฐ. ๋‹ค๋ฅธ ์“ฐ๋ ˆ๋“œ๋ฅผ ๊นจ์›Œ์„œ interruptedException ์„ ๋ฐœ์ƒ์‹œํ‚จ๋‹ค. ๊ทธ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ–ˆ์„ ๋•Œ ํ•  ์ผ์€ ๊ตฌํ˜„ํ•˜๊ธฐ ๋‚˜๋ฆ„์ด๋‹ค. ์ข…๋ฃŒ์‹œํ‚ค๊ฑฐ๋‚˜ ํ•˜๋˜ ์ผ์„ ๊ณ„์†ํ•  ์ˆ˜ ์žˆ์Œ

      • join : ๋‹ค๋ฅธ ์“ฐ๋ ˆ๋“œ ๊ธฐ๋‹ค๋ฆฌ๊ธฐ. ๋‹ค๋ฅธ ์“ฐ๋ ˆ๋“œ๊ฐ€ ๋๋‚ ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฐ๋‹ค.

public class MultiThreadJava {

    @Test
    void Thread๋ฅผ_์ƒ์†๋ฐ›์•„_๊ตฌํ˜„ํ•˜๋Š”_๋ฐฉ๋ฒ•() {
        HelloThread helloThread = new HelloThread();
        helloThread.start();

        System.out.println("Current Thread : " + Thread.currentThread().getName());
        //New Thread : Thread-0
        //Current Thread : main

        //Current Thread : main
        //New Thread : Thread-0
    }

    static class HelloThread extends Thread {
        @Override
        public void run() {
            System.out.println("New Thread : " + Thread.currentThread().getName());
        }
    }

    @Test
    void Runnable๋กœ_๊ตฌํ˜„ํ•˜๊ฑฐ๋‚˜_๋žŒ๋‹ค๋ฅผ_์“ฐ๋Š”_๋ฐฉ๋ฒ•() {
        Thread thread = new Thread(() -> System.out.println("New Thread" + Thread.currentThread().getName()));
        thread.start();

        System.out.println("Current Thread : " + Thread.currentThread().getName());
        //New ThreadThread-0
        //Current Thread : main

        //Current Thread : main
        //New ThreadThread-0
    }

    @Test
    void ์“ฐ๋ ˆ๋“œ_๊นจ์šฐ๊ธฐ() throws InterruptedException {
        Thread thread = new Thread(() -> {
            while (true) {
                System.out.println("New Thread" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    System.out.println("wake!");
                    return;
                }
            }
        });
        thread.start();

        System.out.println("Current Thread : " + Thread.currentThread().getName());
        Thread.sleep(3000L);
        thread.interrupt();
    }

    @Test
    void ์“ฐ๋ ˆ๋“œ_๊ธฐ๋‹ค๋ฆฌ๊ธฐ() throws InterruptedException {
        Thread thread = new Thread(() -> {
            System.out.println("New Thread" + Thread.currentThread().getName());
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                System.out.println("wake!");
                return;
            }
        });
        thread.start();

        System.out.println("Current Thread : " + Thread.currentThread().getName());
        thread.join();
        System.out.println("thread finished! = " + thread.getName());

        //New ThreadThread-0
        //Current Thread : main
        //thread finished! = Thread-0
    }
}

1-4. ์ฐธ๊ณ 

1-5. ๋ฌธ์ œ์ 

  • thread ๊ฐ€ 2๊ฐœ๋งŒ ๋˜์–ด๋„ ์ฝ”๋“œ ์ƒ์œผ๋กœ ํ”„๋กœ๊ทธ๋ž˜๋จธ๊ฐ€ ๊ด€๋ฆฌํ•˜๊ธฐ๊ฐ€ ๋งค์šฐ ๊นŒ๋‹ค๋กœ์›Œ์ง„๋‹ค.

  • ๊ทธ๋ž˜์„œ ๋“ฑ์žฅํ•œ ๊ฒƒ์ด Executors

2. Executors

2-1. ๊ณ ์ˆ˜์ค€(high-level) concurrency programming

  • ์“ฐ๋ ˆ๋“œ๋ฅผ ๋งŒ๋“ค๊ณ , ๊ด€๋ฆฌํ•˜๋Š” ์ž‘์—…์„ ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ๋ถ„๋ฆฌํ•œ๋‹ค.

  • ๊ทธ๋Ÿฐ ๊ธฐ๋Šฅ์„ Executors ์—๊ฒŒ ์œ„์ž„ํ•œ๋‹ค.

2-2. Executors ๊ฐ€ ํ•˜๋Š” ์ผ

  1. ์“ฐ๋ ˆ๋“œ ๋งŒ๋“ค๊ธฐ : ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์‚ฌ์šฉํ•  ์“ฐ๋ ˆ๋“œ ํ’€์„ ๋งŒ๋“ค์–ด์„œ ๊ด€๋ฆฌํ•œ๋‹ค.

  2. ์“ฐ๋ ˆ๋“œ ๊ด€๋ฆฌ : ์“ฐ๋ ˆ๋“œ ์ƒ๋ช… ์ฃผ๊ธฐ๋ฅผ ๊ด€๋ฆฌํ•œ๋‹ค.

  3. ์ž‘์—…์ฒ˜๋ฆฌ ๋ฐ ์‹คํ–‰ : ์“ฐ๋ ˆ๋“œ๋กœ ์‹คํ–‰ํ•  ์ž‘์—…์„ ์ œ๊ณตํ•  ์ˆ˜ ์žˆ๋Š” API ๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

2-3. ์ฃผ์š” ์ธํ„ฐํŽ˜์ด์Šค

  • Executor : execute(Runnable)

  • ExecutorService : Executor ๋ฅผ ์ƒ์†๋ฐ›์€ ์ธํ„ฐํŽ˜์ด์Šค

    • Callable ๋„ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

    • Executor ๋ฅผ ์ข…๋ฃŒ์‹œํ‚ค๊ฑฐ๋‚˜

    • ์—ฌ๋Ÿฌ Callable ์„ ๋™์‹œ์— ์‹คํ–‰ํ•˜๋Š” ๋“ฑ์˜ ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•œ๋‹ค.

  • ScheduledExecutorService : ExecutorService ๋ฅผ ์ƒ์†๋ฐ›์€ ์ธํ„ฐํŽ˜์ด์Šค๋กœ ํŠน์ • ์‹œ๊ฐ„ ์ดํ›„์— ํ˜น์€ ์ฃผ๊ธฐ์ ์œผ๋กœ ์ž‘์—…์„ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

  • ExecutorService ๋‚ด๋ถ€๋ฅผ ๋ณด๋ฉด, ์“ฐ๋ ˆ๋“œ ํ’€์ด ์žˆ๊ณ , Blocking Queue๊ฐ€ ์กด์žฌํ•œ๋‹ค.

    • ๋”ฐ๋ผ์„œ ์—ฌ๋Ÿฌ๊ฐœ์˜ ์“ฐ๋ ˆ๋“œ ํ’€๋กœ ๋น ๋ฅด๊ฒŒ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์—†์„ ๋•Œ์—๋Š” blocking queue์— ํ…Œ์Šคํฌ๋ฅผ ์Œ“์•„๋‘๊ณ  ์ˆœ์ฐจ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ฒŒ ๋œ๋‹ค.

    • ๊ทธ๋ž˜์„œ 5๊ฐœ์˜ ์ž‘์—…์„ ๋ณด๋‚ด๋„, ๋‘ ๊ฐœ์˜ ์“ฐ๋ ˆ๋“œ๋กœ ์ˆœ์ฐจ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Œ

    • submit -> blocking queue -> thread pool


public class ExecutorsTest {

    @Test
    void ์‹ฑ๊ธ€_์“ฐ๋ ˆ๋“œ_๋งŒ๋“ค๊ธฐ() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(runHello("Java"));
        executorService.submit(runHello("Spring"));
        executorService.submit(runHello("JPA"));
        executorService.submit(runHello("Http"));
        executorService.submit(runHello("MySQL"));
        //Java : pool-1-thread-1
        //Spring : pool-1-thread-1
        //JPA : pool-1-thread-1
        //Http : pool-1-thread-1
        //MySQL : pool-1-thread-1
    }

    @Test
    void ์“ฐ๋ ˆ๋“œ_์ข…๋ฃŒ() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(() -> System.out.println("Hello : " + Thread.currentThread().getName()));

        executorService.shutdown(); //graceful shutdown : thread ๋‚ด๋ถ€์˜ ์ผ์„ ๋ชจ๋‘ ์ฒ˜๋ฆฌํ•˜๊ณ  ๊บผ์ง
        executorService.shutdownNow();  //thread ์‹คํ–‰ ์ค‘ ์–ธ์ œ๋ผ๋„ ๊บผ์งˆ ์ˆ˜ ์žˆ์Œ
    }

    @Test
    void ์“ฐ๋ ˆ๋“œํ’€_๋งŒ๋“ค๊ธฐ() {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(runHello("Java"));
        executorService.submit(runHello("Spring"));
        executorService.submit(runHello("JPA"));
        executorService.submit(runHello("Http"));
        executorService.submit(runHello("MySQL"));
        //Java : pool-1-thread-1
        //Spring : pool-1-thread-2      //2๋ฒˆ ์“ฐ๋ ˆ๋“œ์—์„œ ์‹คํ–‰๋จ!
        //JPA : pool-1-thread-1
        //Http : pool-1-thread-1
        //MySQL : pool-1-thread-1
    }

    private Runnable runHello(String message) {
        return () -> System.out.println(message + " : " + Thread.currentThread().getName());
    }
}

2-4. Fork/Join framework

  • ExecutorService ์˜ ๊ตฌํ˜„์ฒด๋กœ ์†์‰ฝ๊ฒŒ ๋ฉ€ํ‹ฐ ํ”„๋กœ์„ธ์„œ๋ฅผ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๊ฒŒ๋” ๋„์™€์ค€๋‹ค.

  • Java 7 ๋ถ€ํ„ฐ ์ถ”๊ฐ€๋จ

2-5. ์ฐธ๊ณ 

3. Callable and Future

3-1. Callable : Runnable ๊ณผ ์œ ์‚ฌํ•˜์ง€๋งŒ ๋ฐ˜ํ™˜๊ฐ’์ด ์กด์žฌ

@FunctionalInterface
public interface Runnable {
    void run();        //void
}


@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;        //return V 
}

3-2. Future

  • ๋น„๋™๊ธฐ์ ์ธ ์ž‘์—…์˜ ํ˜„์žฌ ์ƒํƒœ๋ฅผ ์กฐํšŒํ•˜๊ฑฐ๋‚˜ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋‹ค.

  • get() : ๋ธ”๋กœํ‚น ์ฝœ์ด๋‹ค.

    • ํƒ€์ž„์•„์›ƒ(์ตœ๋Œ€ํ•œ ๊ธฐ๋‹ค๋ฆด ์‹œ๊ฐ„)์„ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค. : get(time, timeunit)


public class CallableAndFutureTest {

    @Test
    void get์œผ๋กœ_๊ฒฐ๊ณผ๊ฐ’_๊ฐ€์ ธ์˜ค๊ธฐ() throws ExecutionException, InterruptedException, TimeoutException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<String> helloFuture = executorService.submit(() -> {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, Future!";
        });

        System.out.println("Started!");

        String message = helloFuture.get();
        assertThat(message).isEqualTo("Hello, Future!");

        System.out.println("Ended!");
        executorService.shutdown();
    }

    @Test
    void get์€_ํƒ€์ž„์•„์›ƒ์„_์„ค์ •ํ• _์ˆ˜_์žˆ๋‹ค() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<String> helloFuture = executorService.submit(() -> {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, Future!";
        });

        //future.get(timeout, timeunit) : timeout ์‹œ๊ฐ„์ด ์ง€๋‚˜๋ฉด, TimeoutException
        assertThatThrownBy(() -> helloFuture.get(2, TimeUnit.SECONDS))
                .isInstanceOf(TimeoutException.class);
    }

    @Test
    void isDone์œผ๋กœ_์ž‘์—…์ƒํƒœ๋ฅผ_ํ™•์ธํ• _์ˆ˜_์žˆ๋‹ค() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<String> helloFuture = executorService.submit(() -> {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, Future!";
        });

        assertThat(helloFuture.isDone()).isFalse();
        System.out.println("Started!");

        assertThat(helloFuture.get()).isEqualTo("Hello, Future!");

        assertThat(helloFuture.isDone()).isTrue();
        System.out.println("Ended!");

        executorService.shutdown();
    }

    @Test
    void cancel๋กœ_์ž‘์—…์„_์ทจ์†Œํ• _์ˆ˜_์žˆ๋‹ค() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<String> helloFuture = executorService.submit(() -> {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, Future!";
        });

        assertThat(helloFuture.isCancelled()).isFalse();
        assertThat(helloFuture.isDone()).isFalse();
        System.out.println("Started!");

        boolean cancel = helloFuture.cancel(false);     //true : ์ง„ํ–‰์ค‘์ธ ์Šค๋ ˆ๋“œ interrupt, false : ํ˜„์žฌ ์ง„ํ–‰์ค‘์ธ ์ž‘์—… ๋๋‚ ๋•Œ๊นŒ์ง€ wait
        System.out.println("cancel = " + cancel);

        assertThat(helloFuture.isCancelled()).isTrue();
        assertThat(helloFuture.isDone()).isTrue();
        System.out.println("Ended!");

        executorService.shutdown();
    }

    @Test
    void isCanceled๋กœ_ํ•œ๋ฒˆ_์ทจ์†Œ๋œ_์„œ๋น„์Šค๋Š”_๋‹ค์‹œ_get_ํ• ์ˆ˜์—†๋‹ค() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<String> helloFuture = executorService.submit(() -> {
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, Future!";
        });

        helloFuture.cancel(false);

        assertThatThrownBy(() -> helloFuture.get())
                .isInstanceOf(CancellationException.class);
    }

    @Test
    void invokeAll๋กœ_์—ฌ๋Ÿฌ์ž‘์—…์„_๋™์‹œ์—_์‹คํ–‰ํ•œ๋‹ค() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Callable<String> hello = () -> {
            Thread.sleep(3000L);
            return "Hello";
        };

        Callable<String> java = () -> {
            Thread.sleep(1000L);
            return "Java";
        };

        Callable<String> spring = () -> {
            Thread.sleep(2000L);
            return "Spring";
        };

        List<Future<String>> futures = executorService.invokeAll(Arrays.asList(hello, java, spring));
        for (Future<String> f : futures) {
            System.out.println(f.get());
        }
        //๋™์‹œ์— ์ถœ๋ ฅ : ๊ฐ€์žฅ ์‹œ๊ฐ„์ด ๊ธด Hello ์˜ ์ข…๋ฃŒ์‹œ๊ฐ„๊นŒ์ง€ ๊ธฐ๋‹ค๋ ธ๋‹ค๊ฐ€ ๊ฐ™์ด ์ถœ๋ ฅ๋œ๋‹ค.

        //Hello
        //Java
        //Spring
    }

    @Test
    void invokeAny๋กœ_์—ฌ๋Ÿฌ์ž‘์—…์ค‘_ํ•˜๋‚˜๋ผ๋„_๋จผ์ €_์‘๋‹ต์ด_์˜ค๋ฉด_๋๋‚ธ๋‹ค() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        Callable<String> hello = () -> {
            Thread.sleep(3000L);
            return "Hello";
        };

        Callable<String> java = () -> {
            Thread.sleep(1000L);
            return "Java";
        };

        Callable<String> spring = () -> {
            Thread.sleep(2000L);
            return "Spring";
        };

        String result = executorService.invokeAny(Arrays.asList(hello, java, spring));
        assertThat(result).isEqualTo("Java");   //๊ฐ€์žฅ ์ˆ˜ํ–‰์‹œ๊ฐ„์ด ์งง์€ Java๋งŒ ์ถœ๋ ฅ๋œ๋‹ค. ๋ธ”๋กœํ‚น์ฝœ์ด๋‹ค.
    }
}
  • invokeAll : ๊ฐ€์žฅ ์ˆ˜ํ–‰์‹œ๊ฐ„์ด ๊ธด ์ž‘์—…์ด ๋๋‚ ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ ธ๋‹ค๊ฐ€ ํ•œ๊บผ๋ฒˆ์— ์ถœ๋ ฅํ•œ๋‹ค.

  • invokeAny : ๊ฐ€์žฅ ์ˆ˜ํ–‰์‹œ๊ฐ„์ด ์งง์€ ์ž‘์—…๋งŒ ์ถœ๋ ฅํ•œ๋‹ค.

4. CompletableFuture

4-1. ๊ฐœ๋…

  • ์ž๋ฐ”์—์„œ ๋น„๋™๊ธฐ(asynchronous) ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•˜๋Š” ์ธํ„ฐํŽ˜์ด์Šค

    • Future ๋ฅผ ์‚ฌ์šฉํ•ด์„œ๋„ ์–ด๋Š์ •๋„๋Š” ๊ฐ€๋Šฅํ–ˆ์ง€๋งŒ, ํ•˜๊ธฐ ํž˜๋“  ์ผ๋“ค์ด ๋งŽ์•˜๋‹ค.

4-2. ๊ธฐ์กด์˜ Future ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด์„œ ์•„์‰ฌ์šด ์ 

  • Future ๋ฅผ ์™ธ๋ถ€์—์„œ ์™„๋ฃŒ์‹œํ‚ฌ ์ˆ˜ ์—†๋‹ค.

    • ์ทจ์†Œํ•˜๊ฑฐ๋‚˜

    • get() ์— ํƒ€์ž„์•„์›ƒ์„ ์„ค์ •ํ•˜๋Š” ๋ฐฉ๋ฒ• ๋ฐ–์—๋Š”.

  • ๋ธ”๋กœํ‚น์ฝ”๋“œ์ธ get() ์„ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ ์„œ๋Š” ์ž‘์—…์ด ๋๋‚ฌ์„ ๋•Œ, ์ฝœ๋ฐฑ์„ ์‹คํ–‰ํ•  ์ˆ˜ ์—†๋‹ค.

  • ์—ฌ๋Ÿฌ Future ๋ฅผ ์กฐํ•ฉ์‹œํ‚ฌ ์ˆ˜ ์—†๋‹ค.

    • event ์˜ ์ •๋ณด๋ฅผ ๊ฐ€์ ธ์˜จ ๋‹ค์Œ

    • event ์— ์ฐธ์„ํ•˜๋Š” ํšŒ์› ๋ชฉ๋ก์„ ๊ฐ€์ ธ์˜ค๊ธฐ ๋“ฑ

  • ์˜ˆ์™ธ ์ฒ˜๋ฆฌ์šฉ API ๋ฅผ ์ œ๊ณตํ•˜์ง€ ์•Š๋Š”๋‹ค.

4-3. Completable Future

4-4. ๋น„๋™๊ธฐ๋กœ ์ž‘์—… ์‹คํ–‰ํ•˜๊ธฐ

  • ๋ฆฌํ„ด๊ฐ’์ด ์—†๋Š” ๊ฒฝ์šฐ: runAsync()

  • ๋ฆฌํ„ด๊ฐ’์ด ์žˆ๋Š” ๊ฒฝ์šฐ: supplyAsync()

  • ์›ํ•˜๋Š” Executor(์“ฐ๋ ˆ๋“œํ’€)๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์‹คํ–‰ํ•  ์ˆ˜๋„ ์žˆ๋‹ค. (๊ธฐ๋ณธ์€ ForkJoinPool.commonPool())

4-5. ์ฝœ๋ฐฑ ์ œ๊ณตํ•˜๊ธฐ

  • thenApply(Function): ๋ฆฌํ„ด๊ฐ’์„ ๋ฐ›์•„์„œ ๋‹ค๋ฅธ ๊ฐ’์œผ๋กœ ๋ฐ”๊พธ๋Š” ์ฝœ๋ฐฑ

  • thenAccept(Consumer): ๋ฆฌํ„ด๊ฐ’์„ ๋˜ ๋‹ค๋ฅธ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋Š” ์ฝœ๋ฐฑ (๋ฆฌํ„ด์—†์ด)

  • thenRun(Runnable): ๋ฆฌํ„ด๊ฐ’ ๋ฐ›์ง€ ๋‹ค๋ฅธ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋Š” ์ฝœ๋ฐฑ

  • ์ฝœ๋ฐฑ ์ž์ฒด๋ฅผ ๋˜ ๋‹ค๋ฅธ ์“ฐ๋ ˆ๋“œ์—์„œ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

4-6. ์กฐํ•ฉํ•˜๊ธฐ

  • thenCompose(): ๋‘ ์ž‘์—…์ด ์„œ๋กœ ์ด์–ด์„œ ์‹คํ–‰ํ•˜๋„๋ก ์กฐํ•ฉ

  • thenCombine(): ๋‘ ์ž‘์—…์„ ๋…๋ฆฝ์ ์œผ๋กœ ์‹คํ–‰ํ•˜๊ณ  ๋‘˜ ๋‹ค ์ข…๋ฃŒ ํ–ˆ์„ ๋•Œ ์ฝœ๋ฐฑ ์‹คํ–‰

  • allOf(): ์—ฌ๋Ÿฌ ์ž‘์—…์„ ๋ชจ๋‘ ์‹คํ–‰ํ•˜๊ณ  ๋ชจ๋“  ์ž‘์—… ๊ฒฐ๊ณผ์— ์ฝœ๋ฐฑ ์‹คํ–‰

  • anyOf():์—ฌ๋Ÿฌ์ž‘์—…์ค‘์—๊ฐ€์žฅ๋นจ๋ฆฌ๋๋‚œํ•˜๋‚˜์˜๊ฒฐ๊ณผ์—์ฝœ๋ฐฑ์‹คํ–‰

4-7. ์˜ˆ์™ธ์ฒ˜๋ฆฌ

  • exeptionally(Function)

  • handle(BiFunction):

4-8. ์ฝ”๋“œ๋ณด๊ธฐ

public class CompletableFutureTest {

    @Test
    void completableFuture1() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = new CompletableFuture<>();
        future.complete("flash");

        assertThat(future.get()).isEqualTo("flash");
    }

    @Test
    void completableFuture2() throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.completedFuture("flash");
        assertThat(future.get()).isEqualTo("flash");
    }

    @Test
    void runAsync() {
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
        });
        //Hello : ForkJoinPool.commonPool-worker-19
    }

    @Test
    void supplyAsync() throws ExecutionException, InterruptedException {
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        });

        System.out.println(stringCompletableFuture.get());
        //Hello : ForkJoinPool.commonPool-worker-19
        //Hello
    }

    @Test
    void anotherExecutor() {
        Executor executor = Executors.newSingleThreadExecutor();
        CompletableFuture.runAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
        }, executor);
        //Hello : pool-1-thread-1   -> ์ง€์ •๋œ executor ๋กœ ์‹คํ–‰๋œ๋‹ค.
    }

    @Test
    void thenApply_๋ฆฌํ„ด๊ฐ’์„๋ฐ›์•„_์ƒˆ๋กœ์šด๊ฐ’์„_๋ฆฌํ„ด() throws ExecutionException, InterruptedException {
        Executor executor = Executors.newSingleThreadExecutor();
        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        }, executor).thenApply((s) -> {
            System.out.println("thenApply : " + Thread.currentThread().getName());
            return s.toUpperCase(Locale.ROOT);
        });
        System.out.println(stringCompletableFuture.get());
        //Hello : pool-1-thread-1
        //thenApply : pool-1-thread-1
        //HELLO
    }

    @Test
    void thenAccept_๋ฆฌํ„ด๊ฐ’์„๋ฐ›์•„_๋ฆฌํ„ด์—†์ด_์ฒ˜๋ฆฌ() throws ExecutionException, InterruptedException {
        Executor executor = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        }, executor).thenAccept((s) -> {
            System.out.println("thenApply : " + Thread.currentThread().getName());
        });
        System.out.println(voidCompletableFuture.get());
        //Hello : pool-1-thread-1
        //thenApply : pool-1-thread-1
        //null
    }

    @Test
    void thenRun_๋ฆฌํ„ด๊ฐ’์„_๋ฐ›์•„_๋‹ค๋ฅธ์ž‘์—…์„_์ฒ˜๋ฆฌํ•˜๋Š”_์ฝœ๋ฐฑ() throws ExecutionException, InterruptedException {
        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        }).thenRun(() -> System.out.println(Thread.currentThread().getName()));
        System.out.println("voidCompletableFuture.get() = " + voidCompletableFuture.get());
        //Hello : ForkJoinPool.commonPool-worker-19
        //ForkJoinPool.commonPool-worker-19
        //voidCompletableFuture.get() = null
    }

    @Test
    void thenCompose() throws ExecutionException, InterruptedException {
        CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> stringCompletableFuture = helloFuture.thenCompose(CompletableFutureTest::getWorldFuture);
        System.out.println(stringCompletableFuture.get());
        //Hello : ForkJoinPool.commonPool-worker-19
        //World : ForkJoinPool.commonPool-worker-5
        //Hello World
    }

    private static CompletableFuture<String> getWorldFuture(String word) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World : " + Thread.currentThread().getName());
            return word + " World";
        });
    }

    @Test
    void thenCombine() throws ExecutionException, InterruptedException {
        CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> worldFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("World : " + Thread.currentThread().getName());
            return "World";
        });

        CompletableFuture<String> result = helloFuture.thenCombine(worldFuture, (s1, s2) -> s1 + " " + s2);
        System.out.println(result.get());
        //Hello : ForkJoinPool.commonPool-worker-19
        //World : ForkJoinPool.commonPool-worker-5
        //Hello World
    }

    @Test
    void allOf() throws ExecutionException, InterruptedException {
        CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> worldFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("World : " + Thread.currentThread().getName());
            return "World";
        });

        List<CompletableFuture<String>> futures = Arrays.asList(helloFuture, worldFuture);
        CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);

        CompletableFuture<List<String>> listCompletableFuture = CompletableFuture.allOf(futuresArray)
                .thenApply(v -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));

        List<String> strings = listCompletableFuture.get();
        strings.forEach(System.out::println);
        //World : ForkJoinPool.commonPool-worker-5
        //Hello : ForkJoinPool.commonPool-worker-19
        //Hello
        //World
    }

    @Test
    void anyOf() {
        CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> worldFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("World : " + Thread.currentThread().getName());
            return "World";
        });

        CompletableFuture.anyOf(helloFuture, worldFuture)
                .thenAccept(System.out::println);
        //World : ForkJoinPool.commonPool-worker-5
        //Hello : ForkJoinPool.commonPool-worker-19
        //World
    }

    @Test
    void exceptionally() throws ExecutionException, InterruptedException {
        boolean isError = true;

        CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
            if (isError) {
                throw new IllegalArgumentException();
            }
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        }).exceptionally((ex) -> {
            System.out.println(ex);
            return "Default Value";
        });

        System.out.println(helloFuture.get());
        //java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
        //Default Value
    }

    @Test
    void handle() throws ExecutionException, InterruptedException {
        boolean isError = true;

        CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
            if (isError) {
                throw new IllegalArgumentException();
            }
            System.out.println("Hello : " + Thread.currentThread().getName());
            return "Hello";
        }).handle((normalResult, exception) -> {
            if (exception != null) {
                System.out.println(exception);
                return "Error!";
            }
            return normalResult;
        });

        System.out.println(helloFuture.get());
        //java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
        //Error!
    }
}

4-9. ์ฐธ๊ณ 

Last updated