hangscer

java并发-Future、Callable和CompletableFuture

2017/06/21

java8新的线程异步工具完全可以媲美scala。

Future和Callable

Callable是一个函数式接口,如下:

1
2
3
4
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}

Callable类似于Runnablecall()run()代表着需要多线程运行的方法体。

1
2
3
4
public interface ExecutorService extends Executor{
Future<?> submit(Runnable task);
......
}

一旦提交后,Callable中的代码则立刻运行。
Future.get()方法是阻塞方法。

1
2
3
4
5
6
7
8
9
10
11
Callable callable=()->{
Thread.sleep(3000);
System.out.println("haha");
return 100000;
};
ExecutorService executorService=Executors.newSingleThreadExecutor();
Future future=executorService.submit(callable);
Thread.sleep(3000);
System.out.println("is Done :"+future.isDone());
System.out.println(future.get());//阻塞方法
System.out.println("is Done :"+future.isDone());

CompletableFuture

CompletableFuture是java8提供更加完备的函数式异步工具。其继承了FutureCompletionStage接口。

1
2
3
CompletableFuture.completedFuture(10000).get();
CompletableFuture.supplyAsync(()->(10000));
......

主动计算

Future类似,get()方法仍然是阻塞方法。
Future不同,CompletableFuture还可以主动完成计算。这一点类似scala中的Promise

1
2
3
4
5
6
7
8
9
10
ExecutorService executorService=Executors.newSingleThreadExecutor();
CompletableFuture future=CompletableFuture.supplyAsync(()->{
Thread.sleep(100000000);
return 10000;
});
executorService.submit(()->{
Thread.sleep(1000);
future.complete(10);
});
System.out.println(future.get());

以上代码中future需要执行很长时间才能返回数值。显然我们不会让它等待着么长时间,于是我们可以主动完成future的计算,调用complete(Object obj)方法,触发某个客户端的等待(get()方法),这很类似于RXjava的思想。
当然,不仅仅可以主动计算一个值,还可以主动抛出异常:

1
public boolean completeExceptionally(Throwable ex)

创建CompletableFuture对象

completedFuture返回一个已经运算完成的CompletedFuture的对象。

1
2
3
4
5
public static void main(String[] args) throws InterruptedException, IOException,ExecutionException{
CompletableFuture<Integer> future=CompletableFuture.completedFuture(10000);
System.out.println(future.isDone());//true
System.out.println(future.get());//10000
}

还有其他方法异步创建CompletableFuture对象。

1
2
3
4
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

如果方法参数列表中没有executor的话,则内部使用ForkJoinPool.commonPool()做为线程池。

1
2
3
4
CompletableFuture future=CompletableFuture.supplyAsync(()->{
Thread.sleep(1000);
return "jianghang";
});

计算完成时的处理

当CompletableFuture计算结果完成时,或者抛出异常,我需要指定action:

1
2
3
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
......

特别说明:whenComplete意味着Action使用相同的线程执行。whenCompleteAsync可能会使用其他线程去执行。

1
2
3
4
future.whenComplete((value,exception)->{
System.out.println(value);//jianghang
System.out.println(exception);//null
});

以上程序中并没有抛出异常,所以exception为空。

转换

你可以把CompletableFuture看成是Monad(函数式编程概念)。不必阻塞等待某个计算完成。而是当某个计算一旦完成再去计算其他操作。

1
2
3
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
......

函数名有无Async与之前的whenComplete区别差不多。

1
2
3
4
5
6
7
public static void main(String[] args) throws InterruptedException, IOException,ExecutionException{
CompletableFuture<Integer> future=CompletableFuture.supplyAsync(()->(10));
future.thenApply(i->i*100).whenCompleteAsync((v,e)->{
System.out.println(v+" "+e);
});
System.in.read();
}

组合

thenCompose用于组合多个CompletableFuture,前一个结果用于下一个计算的参数,存在先后顺序:

1
2
3
future.thenCompose(i->{
return CompletableFuture.supplyAsync(()->(i*1000));
});

thenCombine用于组合两个CompletableFuture,它们之间并行执行,结果最后汇总:

1
2
3
4
5
6
public static void main(String[] args) throws InterruptedException, IOException,ExecutionException{
CompletableFuture<Integer> future1=CompletableFuture.supplyAsync(()->100);
CompletableFuture<String> future2=CompletableFuture.supplyAsync(()->"2000");
CompletableFuture<String> resultFuture=future1.thenCombine(future2,(Integer a,String b)->a+b);
System.out.println(resultFuture.get());
}

请注意thenCombine方法的参数是什么,future1与future2是并行计算的。