引用

https://www.cnblogs.com/fingerboy/p/9948736.html
https://www.jianshu.com/p/6bac52527ca4

导言

public class CompletableFuture<T> implements Future<T>,CompletionStage<T>{...}

CompletableFuture 实现了Future和CompletionStage两个接口,所以我们有必要先了解一下这两个接口。

Future

public interface Future<V> {...}

这个接口在1.5版本是就已经出现了。
相比于CompletableFuture,Future只有寥寥5个方法。
image.png

  1. cancel:尝试取消任务
  • 如已经完成或无法取消,则返回false。
  • 如果任务未运行,则不运行任务。
  • 如果正在运行,则根据boolean参数决定是否中断,true中断,false继续。
  1. isCancelled: 返回任务是否已经取消
  2. isDone:返回任务是否已完成
  3. get:获取返回值,未完成则等待
  4. get(long,TimeUnit):获取返回值,未完成则等待long,timeunit(如:1,秒)的时间

Future的实现是FutureTask,它实现了RunnableFuture,而RunnableFutuer又扩展了Runnable和Future。如图
image.png

FutureTask的构造方法主要是

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

因为其中有

/** The underlying callable; nulled out after running */
private Callable<V> callable;

状态以volatile维护

private volatile int state;

简单的使用方法

    FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
        @Override
        public String call() throws Exception {
//            do some ting...
            return "hello world";
        }
    });

CompletionStage

完成阶段,用来在一个任务完成时,开启另一个任务。部分方法如下
image.png

CompletableFuture 使用

创建

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  • runAsync入参Runnable 无返回值,supplyAsync入参Supplier 有返回值。
  • 如果没有传入Executor对象将会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码.

创建案例

    //无返回值
    public static void runAsync() throws Exception {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
//          do some thing...
            System.out.println("run end ...");
        });

        future.get();
    }

    //有返回值
    public static void supplyAsync() throws Exception {
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
//          do some thing...
            System.out.println("run end ...");
            return System.currentTimeMillis();
        });

        long time = future.get();
        System.out.println("time = "+time);
    }

串接

thenRun

忽视上一个值,完成后也无返回值。

public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);

thenAccept

接受上一个任务的返回值,但完成后没有返回值

public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

T:上一个任务返回结果的类型

thenApply

接受上一个任务的返回值,完成后有返回值。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

T:上一个任务返回结果的类型
U:当前任务的返回值类型

handle

处理 在一个任务完成后继续,但较thenApply不同的是,handle可以处理异常。

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

T 上个任务的值
U 返回值

thenCompose

将自身与一个方法组合,组成一个新的Completable

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

于 thenApply相似,但设计理念不同,可参考
https://stackoverflow.com/questions/43019126/completablefuture-thenapply-vs-thencompose

两个任务合并

thenCombine

两个串接,接受两个返回值,合并返回一个

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor);

T: 自己的返回值
U: 加入者的返回值
V: 合并后的返回值
案例

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "world");
        CompletableFuture<String> future = future1.thenCombine(future2, (t1, t2) -> t1 +" "+ t2);
        System.out.println(future.join());

thenAcceptBoth

两个串接,接收两个,不返回

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);

T: 自己的返回值
U: 加入者的返回值

runAfterBoth

两个串接,不接受,不返回

public CompletableFuture<Void>runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void>runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void>runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

两个任务选一个

applyToEither(..)

acceptEither(..)

runAfterEither(..)

都有各自的异步方法,每个3个,两者之间任意完成,先完成的执行。

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);

apply:接收参数,有返回值
accept:接收参数,无返回值
run:不收参,不返回

多个任务等待

allOf

多个CompletableFuture都执行完成后运算。多用于等待.join()

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

案例

List<CompletableFuture<Void>> completableFutureList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            int t = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(t * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(t);
            });
            completableFutureList.add(future);
        }
        CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0])).join();
        System.out.println(10);

输出0-10

anyOf

任意一个CompletableFuture执行完成后运算。多用于等待.join()

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

证明是在新线程中执行

这里有一个很有趣的事情,当我在尝试证明加Async后是在一个新线程中执行时,我遇到一个问题,我尝试通过

Thread.currentThread().getId()

输出当前执行的线程ID时,thenApplyAsync与thenApply都返回了相同的值,即执行它们的(runAsync所在)线程。
这便与意思产生了分歧,他们都没有在新的线程中运行!
在扫源码的过程中,我突然想到,会不会是他们恰好抢了同一个线程呢?
于是我设计了如下代码。

CompletableFuture.runAsync(() -> {
            System.out.println("1sta:" + Thread.currentThread().getId());
            CompletableFuture.runAsync(() -> {
                System.out.println("1runS:" + Thread.currentThread().getId());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("1runE:" + Thread.currentThread().getId());
            });
        }).thenRunAsync(() -> {
            System.out.println("2sta:" + Thread.currentThread().getId());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).thenRun(() -> {
            System.out.println("3sta:" + Thread.currentThread().getId());
        });
  1. 在第一个任务中,再调用一次异步任务(设为X),并使其等待1秒
  2. 第二个任务在第一个任务运行完成后,异步执行任务(设为Y),并在任务中等待2秒。此时X与Y将同时争夺线程,诺X夺到后,则Y必然在新的线程中运行。
  3. 第三个任务应该与Y在同一个线程中。(但不能证明什么)

输出

1sta:82
1runS:82
2sta:83
1runE:82
3sta:83

我们证明了,thenRunAsync确实会在一个新的线程中运行。
但现在我们不能保证,thenRun必然在运行thenRun线程中运行。
由于不清楚运行机制,我确实无法证明这一点。但代码修改后,多次测试都没有在新的线程中运行任务2,任务3

CompletableFuture.runAsync(() -> {
            System.out.println("1sta:" + Thread.currentThread().getId());
            CompletableFuture.runAsync(() -> {
                System.out.println("1runS:" + Thread.currentThread().getId());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("1runE:" + Thread.currentThread().getId());
            });
        }).thenRun(() -> {
            System.out.println("2sta:" + Thread.currentThread().getId());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).thenRun(() -> {
            System.out.println("3sta:" + Thread.currentThread().getId());
        });

输出

1sta:82
2sta:82
1runS:83
1runE:83
3sta:82

姑且算是证明了吧qwq