引用
https://www.cnblogs.com/fingerboy/p/9948736.html
https://www.jianshu.com/p/6bac52527ca4
导言
public class CompletableFuture<T> implements Future<T>,CompletionStage<T>{...}
CompletableFuture 实现了Future和CompletionStage两个接口,所以我们有必要先了解一下这两个接口。
Future
public interface Future<V> {...}
这个接口在1.5版本是就已经出现了。
相比于CompletableFuture,Future只有寥寥5个方法。
- cancel:尝试取消任务
- 如已经完成或无法取消,则返回false。
- 如果任务未运行,则不运行任务。
- 如果正在运行,则根据boolean参数决定是否中断,true中断,false继续。
- isCancelled: 返回任务是否已经取消
- isDone:返回任务是否已完成
- get:获取返回值,未完成则等待
- get(long,TimeUnit):获取返回值,未完成则等待long,timeunit(如:1,秒)的时间
Future的实现是FutureTask,它实现了RunnableFuture,而RunnableFutuer又扩展了Runnable和Future。如图
FutureTask的构造方法主要是
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
因为其中有
/** The underlying callable; nulled out after running */
private Callable<V> callable;
状态以volatile维护
private volatile int state;
简单的使用方法
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
// do some ting...
return "hello world";
}
});
CompletionStage
完成阶段,用来在一个任务完成时,开启另一个任务。部分方法如下
CompletableFuture 使用
创建
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
- runAsync入参Runnable 无返回值,supplyAsync入参Supplier 有返回值。
- 如果没有传入Executor对象将会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码.
创建案例
//无返回值
public static void runAsync() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// do some thing...
System.out.println("run end ...");
});
future.get();
}
//有返回值
public static void supplyAsync() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
// do some thing...
System.out.println("run end ...");
return System.currentTimeMillis();
});
long time = future.get();
System.out.println("time = "+time);
}
串接
thenRun
忽视上一个值,完成后也无返回值。
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);
thenAccept
接受上一个任务的返回值,但完成后没有返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
T:上一个任务返回结果的类型
thenApply
接受上一个任务的返回值,完成后有返回值。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
T:上一个任务返回结果的类型
U:当前任务的返回值类型
handle
处理 在一个任务完成后继续,但较thenApply不同的是,handle可以处理异常。
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
T 上个任务的值
U 返回值
thenCompose
将自身与一个方法组合,组成一个新的Completable
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
于 thenApply相似,但设计理念不同,可参考
https://stackoverflow.com/questions/43019126/completablefuture-thenapply-vs-thencompose
两个任务合并
thenCombine
两个串接,接受两个返回值,合并返回一个
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor);
T: 自己的返回值
U: 加入者的返回值
V: 合并后的返回值
案例
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "world");
CompletableFuture<String> future = future1.thenCombine(future2, (t1, t2) -> t1 +" "+ t2);
System.out.println(future.join());
thenAcceptBoth
两个串接,接收两个,不返回
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
T: 自己的返回值
U: 加入者的返回值
runAfterBoth
两个串接,不接受,不返回
public CompletableFuture<Void>runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void>runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void>runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
两个任务选一个
applyToEither(..)
acceptEither(..)
runAfterEither(..)
都有各自的异步方法,每个3个,两者之间任意完成,先完成的执行。
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);
apply:接收参数,有返回值
accept:接收参数,无返回值
run:不收参,不返回
多个任务等待
allOf
多个CompletableFuture都执行完成后运算。多用于等待.join()
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
案例
List<CompletableFuture<Void>> completableFutureList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int t = i;
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(t * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(t);
});
completableFutureList.add(future);
}
CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0])).join();
System.out.println(10);
输出0-10
anyOf
任意一个CompletableFuture执行完成后运算。多用于等待.join()
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
证明是在新线程中执行
这里有一个很有趣的事情,当我在尝试证明加Async后是在一个新线程中执行时,我遇到一个问题,我尝试通过
Thread.currentThread().getId()
输出当前执行的线程ID时,thenApplyAsync与thenApply都返回了相同的值,即执行它们的(runAsync所在)线程。
这便与意思产生了分歧,他们都没有在新的线程中运行!
在扫源码的过程中,我突然想到,会不会是他们恰好抢了同一个线程呢?
于是我设计了如下代码。
CompletableFuture.runAsync(() -> {
System.out.println("1sta:" + Thread.currentThread().getId());
CompletableFuture.runAsync(() -> {
System.out.println("1runS:" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("1runE:" + Thread.currentThread().getId());
});
}).thenRunAsync(() -> {
System.out.println("2sta:" + Thread.currentThread().getId());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).thenRun(() -> {
System.out.println("3sta:" + Thread.currentThread().getId());
});
- 在第一个任务中,再调用一次异步任务(设为X),并使其等待1秒
- 第二个任务在第一个任务运行完成后,异步执行任务(设为Y),并在任务中等待2秒。此时X与Y将同时争夺线程,诺X夺到后,则Y必然在新的线程中运行。
- 第三个任务应该与Y在同一个线程中。(但不能证明什么)
输出
1sta:82
1runS:82
2sta:83
1runE:82
3sta:83
我们证明了,thenRunAsync确实会在一个新的线程中运行。
但现在我们不能保证,thenRun必然在运行thenRun线程中运行。
由于不清楚运行机制,我确实无法证明这一点。但代码修改后,多次测试都没有在新的线程中运行任务2,任务3
CompletableFuture.runAsync(() -> {
System.out.println("1sta:" + Thread.currentThread().getId());
CompletableFuture.runAsync(() -> {
System.out.println("1runS:" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("1runE:" + Thread.currentThread().getId());
});
}).thenRun(() -> {
System.out.println("2sta:" + Thread.currentThread().getId());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).thenRun(() -> {
System.out.println("3sta:" + Thread.currentThread().getId());
});
输出
1sta:82
2sta:82
1runS:83
1runE:83
3sta:82
姑且算是证明了吧qwq