多个CompletableFuture样例

import com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * 注意:
 * handle 和 thenApply方法的区别
 * 它们与handle方法的区别在于handle方法会处理正常计算值和异常,因此它可以屏蔽异常,避免异常继续抛出。
 * 而thenApply方法只是用来处理正常值,因此一旦有异常就会抛出。
 */
public class SimpleTest {

    private ExecutorService executorService = new ThreadPoolExecutor(20, 40, 100, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());

    /**
     * CompletableFuture 串行操作
     */
    @Test
    public void test01(){

        CompletableFuture.supplyAsync(()->"hello ")
                .thenApply(str -> str + "world ")
                .thenCombine(CompletableFuture.completedFuture("java"),(s1, s2)->s1 + s2)
                .thenAccept(System.out::println);
    }

    /**
     * obtrudeException 主动抛出异常
     * 1. 如果将主动抛异常时间延长到6s,
     * 由于该节点计算完成, 则不会收到异常
     *
     * 收到异常:手动失败
     */
    @Test
    public void test02() throws InterruptedException {
        CompletableFuture<String> fu = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println("成功执行了");
            } catch (InterruptedException ignored) {
            }
            return null;
        });

        CompletableFuture<String> end = fu.exceptionally(throwable -> {
            System.out.println("收到异常:" + throwable.getMessage());
            return null;
        });

        TimeUnit.SECONDS.sleep(3);
        // 如果等6s后再抛,由于结果已经计算完成,则不会受到异常
//        TimeUnit.SECONDS.sleep(6);

        fu.obtrudeException(new RuntimeException("手动失败"));      // 手动抛出一个异常,若结果未计算完成,则会抛出。

        try {
            String result = end.join();
            System.out.println(result);
        } catch (CompletionException e) {
            System.out.println(e.getCause().getMessage());
        }
    }

    /**
     * join和get的区别
     */
    @Test
    public void test03(){
        CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
            int i =1/0;
            return 1;
        });

//        CompletableFuture.allOf(f1).join();
//        System.out.println("join()抛出包装的CompletionException,本质还是内部发生的异常,不需要手动try..catch");

        try {
            Integer result = f1.get();
            System.out.println("result: " + result);
            System.out.println("get()检查异常需要手动处理try..catch");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * handle手动处理异常
     */
    @Test
    public void test04(){
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "resultA")
                .thenApply(resultA -> resultA + " resultB")
                // 任务 C 抛出异常
                .thenApply(resultB -> {throw new RuntimeException();})
                // 处理任务 C 的返回值或异常
                .handle(new BiFunction<Object, Throwable, Object>() {
                    @Override
                    public Object apply(Object re, Throwable throwable) {
                        if (throwable != null) {
                            return "errorResultC";
                        }
                        return re;
                    }
                })
                .thenApply(resultC -> resultC + " resultD");
        System.out.println(future.join());
        // errorResultC resultD
    }

    /**
     * allOf 和 anyOf
     * allOf 全部任务执行完成后执行,无返回值
     * anyOf 任一任务执行后执行,返回值Object (任一任务的返回结果)
     */
    @Test
    public void test05(){
        CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
        CompletableFuture<Integer> cfB = CompletableFuture.supplyAsync(() -> 123);
        CompletableFuture<String> cfC = CompletableFuture.supplyAsync(() -> "resultC");

        CompletableFuture<Void> combinedFuture1 = CompletableFuture.allOf(cfA, cfB, cfC);
        // 所以这里的 join() 将阻塞,直到所有的任务执行结束
        Void join1 = combinedFuture1.join();
        System.out.println(join1);

        CompletableFuture<Object> combinedFuture2 = CompletableFuture.anyOf(cfA, cfB, cfC);
        //  join() 方法会返回最先完成的任务的结果,所以它的泛型用的是 Object,因为每个任务可能返回的类型不同。
        Object join2 = combinedFuture2.join();
        System.out.println(join2);
    }

    /**
     * either 方法
     * 各个带 either 的方法,表达的都是一个意思,指的是两个任务中的其中一个执行完成,就执行指定的操作
     * 它们几组的区别也很明显,分别用于表达是否需要任务 A 和任务 B 的执行结果,是否需要返回值
     *
     * 注意:
     * 1、cfA.acceptEither(cfB, result -> {}); 和 cfB.acceptEither(cfA, result -> {}); 是一个意思;
     * 2、第二个变种,加了 Async 后缀的方法,代表将需要执行的任务放到 ForkJoinPool.commonPool() 中执行(非完全严谨);
     *    第三个变种很好理解,将任务放到指定线程池中执行;
     * 3、难道第一个变种是同步的?不是的,而是说,它由任务 A 或任务 B 所在的执行线程来执行,取决于哪个任务先结束。
     */
    @Test
    public void test06(){
        CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
        CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");

        cfA.acceptEither(cfB, result -> {});
        cfA.acceptEitherAsync(cfB, result -> {});
        cfA.acceptEitherAsync(cfB, result -> {}, executorService);

        cfA.applyToEither(cfB, result -> {return result;});
        cfA.applyToEitherAsync(cfB, result -> {return result;});
        cfA.applyToEitherAsync(cfB, result -> {return result;}, executorService);

        cfA.runAfterEither(cfA, () -> {});
        cfA.runAfterEitherAsync(cfB, () -> {});
        cfA.runAfterEitherAsync(cfB, () -> {}, executorService);
    }

    /**
     * 计算结果完成时的回调方法
     * public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
     * public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
     * public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
     * public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
     */
    @Test
    public void test07(){

        final int randomNum = new Random().nextInt(100);
        System.out.println("randomNum: " + randomNum);

        final CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
            if (randomNum % 2 > 0) {
                int i = 10 / 0;
            }
            return randomNum;
        }).whenComplete((result, throwable) -> {
            System.out.println(MessageFormat.format("result:{0}, throwable:{1}", result, throwable));
        }).exceptionally((throwable -> {
            System.out.println(throwable);
            return -1;
        }));
        System.out.println("cf.join() = " + cf.join());
    }

    /**
     *  thenCombine 合并任务
     *  把两个 CompletionStage 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。
     */
    @Test
    public void test08(){
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello ");
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 120);
        CompletableFuture<String> result = future1.thenCombine(future2, (t, u) -> t + u);
        System.out.println(result.join());
    }

    /**
     * thenCompose 将第一个future的返回结果传入第二个future中执行
     * thenApply()转换的是泛型中的类型,是同一个CompletableFuture,相当于将CompletableFuture<T> 转换成CompletableFuture<U>
     * thenCompose()用来连接两个CompletableFuture,是生成一个新的CompletableFuture。
     */
    @Test
    public void test09() {
        CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(()->{
            int t = new Random().nextInt(10);
            System.out.println("t1="+t);
            return t;
        }).thenCompose(t -> CompletableFuture.supplyAsync(()->{
            return 100 + t;
        }));

        System.out.println(f1.join());
    }

    /**
     * 异常处理
     * 1. handle..()        不会抓获异常,所以配置多个都会被执行;
     * 2. exceptionally()   会抓获异常所以只生效一次。
     * 3. exceptionally()   必须在结束前调用,否则不生效
     *
     *我是1号异常处理器
     *我是3号异常处理器
     *我是2号异常处理器
     *我是4号异常处理器
     */
    @Test
    public void test10() {
        CompletableFuture<Object> handle = CompletableFuture.runAsync(() -> {
            throw new RuntimeException("异常");
        })
                .handleAsync((aVoid, throwable) -> {
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("我是1号异常处理器");
                    return null;
                })
                .handle((o, throwable) -> {
                    System.out.println("我是3号异常处理器");
                    return null;
                })
                .handleAsync((avoid, throwable) -> {
                    System.out.println("我是2号异常处理器");
                    throw new RuntimeException("处理异常中的异常");
//                  return null;
                })
                .exceptionally(throwable -> {
                    System.out.println("我是4号异常处理器");
                    return null;
                })
                .exceptionally(throwable -> {
                    System.out.println("我是5号异常处理器");
                    return null;
                });
        handle.join();
    }

    /**
     * complete() 完成一个计算,触发客户端的等待
     * completeExceptionally() 也可以抛出一个异常,触发客户端的等待
     *
     * 我们有两个后门方法可以重设这个值:obtrudeValue、obtrudeException
     * 但是使用的时候要小心,因为complete已经触发了客户端,有可能导致客户端会得到不期望的结果。
     */
    @Test
    public void test11(){
        CompletableFuture<String> f = new CompletableFuture<>();
//        boolean flag1 = f.complete("100");
//        System.out.println(flag1);       // true
//        System.out.println(f.join());    // 100

        boolean flag2 = f.completeExceptionally(new Exception("自定义异常"));
        System.out.println(flag2);          // true
        System.out.println(f.join());
    }

    /**
     * Java Future 转 CompletableFuture
     */
    @Test
    public void test12(){
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Future<String> future = executorService.submit(() -> "hello world");

        CompletableFuture<String> cf = toCompletable(future, executorService);
    }

    // future -> CompletableFuture
    public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return future.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }, executor);
    }

    /**
     * 将多个CompletableFuture组合成一个CompletableFuture,这个组合后的CompletableFuture的计算结果是个List,
     * 它包含前面所有的CompletableFuture的计算结果,guava的Futures.allAsList可以实现这样的功能,
     * 但是对于java CompletableFuture,我们需要一些辅助方法
     */
    @Test
    public void test13(){
        List<CompletableFuture<String>> futures = Lists.newArrayList(
                CompletableFuture.supplyAsync(() -> "a"),
                CompletableFuture.supplyAsync(() -> "b"),
                CompletableFuture.supplyAsync(() -> "c"),
                CompletableFuture.supplyAsync(() -> "d")
        );
        List<String> resultList = sequence(futures).join();
        System.out.println(resultList);     // [a, b, c, d]
    }

    // 多个CompletableFutureList的计算结果List包装成一个CompletableFuture
    public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
        CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList()));
    }
    public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) {
        List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList());
        return sequence(futureList);
    }

}
1. 本站所有资源来源于用户上传和网络,如有侵权请及时联系删除,本站不承担任何法律责任!
2. 分享目的仅供大家学习和研究,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的教程、源码等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,默认解压密码为"www.94zyw.com",如遇到无法解压的请联系管理员!
94资源网 » 多个CompletableFuture样例