对于一些调用第三方接口或多个任务处理,调用的时候不能阻塞主线程,同时又需要提高响应速度,需要用到一些多线程编排方式。
复习一些预备知识
1.JUC:java中提供的多线程编程包/并发框架 【java.util.concurrent】
提供了锁的定义LOCK、线程安全的原子量变量类Atomic变量 、线程执行任务存放的等待队列 、并发工具辅助类等
下面结合源码分析一下
【核心思想】
(1)【原子性变量】
(2)【锁LOCK】
(3)【辅助工具类】
CountDownLatch
CyclicBarrier
Semaphore
2.CompleteableFuture
这个东西是Future接口的改进:
从前的Future对象和对应方法虽然有异步执行任务的功能,但是获取结果需要调用Future.get() 阻塞主线程来获取;或者Future.isDone() 判断任务结束再返回结果
另外,Future无法处理主线程子线程相互依赖的场景,就是一个线程需要前面线程执行后的结果。但Future 中 主线程需要等待子线程任务执行完毕之后在进行执行
CompleteableFuture 提供了很多方法,包括非阻塞的获取返回值,线程间的依赖,获取上一个线程结果后执行然后还能返回值
(1)创建方法:
runAsync:没有返回值的
supplyAsync: 有返回值
CompletableFuture<Void> paymentFuture = CompletableFuture.runAsync(() -> {
// 验证支付(异步)
validatePayment();
});
CompletableFuture<String> stockFuture = CompletableFuture.supplyAsync(() -> {
// 扣减库存并返回状态
return reduceStock();
});
(2)获取结果
get方法和join方法
get() 和 get(Long timeout,TimeUnit unit) 可以设置过期时间后抛出异常
join() 抛出非受检异常 get()必须进行异常处理
例:
String result = orderFuture.join(); // 获取任务结果,发生异常时抛出 CompletionException
两个方法收集结果的时候都是阻塞主线程的,阻塞知道任务完成
(3)组合多个异步任务结果
allof() 和 anyof()
【两个收集方法都是阻塞的】
allof(): 所有任务都完成后 返回一个新的CompletableFuture对象【等待所有任务】
anyof(): 任意一个任务完成,返回有一个CompletableFuture,相当于那个任务先执行完返回这个任务的CompletableFuture【等待一个任务】
(4)处理异步任务完成后的结果
thenApply
/ thenAccept
/ thenRun
【这里的方法是非阻塞的,即不阻塞主线程,任务完成后异步执行后续操作】
thenApply()
: 处理返回值然后返回新的结果,需要有一个return
thenAccept()
: 处理返回值 但不需要返回结果【比如获取到任务执行后从里面把数据拿出来填充到一个dto】
thenRun
:在任务完成后执行一个没有输入和输出的操作。【记录日志或执行update、更新状态等】输入和输出都没有
// thenapply
CompletableFuture<String> orderFuture = stockFuture.thenApply(stock -> {
// 扣减库存后,处理订单(可以修改订单信息)
processOrder(stock);
return "Order processed";
});
// thenaccept
xxx.thenAccept(audioBytes -> {
String fileName = String.format("%03d_%s.wav", idx, dto.getEmotion());
Path output = Path.of(outputDir, fileName);
try {
Files.write(output, audioBytes);
} catch (IOException e) {
throw new RuntimeException("文件保存失败" ,e);
}
// 两个都用于链式调用
异步任务的组合thenCompose
/ thenCombine
thenCompose()
: 处理链式异步任务,返回一个CompletableFuture
【相当于在一个A异步任务中调用另一个B异步任务,B任务需要A任务入参】
thenCombine
:用来组合两个并行执行的异步任务的结果,返回一个新结果。
【两个异步任务并发执行,把结果组合到一起】
// 先执行支付,支付完成后再执行扣减
CompletableFuture<String> orderFuture = paymentFuture
.thenCompose(payment -> CompletableFuture.supplyAsync(() -> reduceStock(payment)));
// 如果支付和扣减一起执行,然后返回一个总订单金额
CompletableFuture<Integer> orderTotalFuture = paymentFuture
.thenCombine(stockFuture, (payment, stock) -> payment + stock);
(5)处理异常exceptionally
/ handle
/ whenComplete
正常一个异步任务失败会抛出异常
exceptionally: 处理异常返回一个默认值,有点像throw new IOException
handle
:在任务完成后处理结果或者异常,并返回一个新的值
whenComplete
: 添加了一个回调函数,任务完成还是发生异常,都正常执行
// 出现异常记录日志,无异常则完成后记录日志
CompletableFuture<String> orderFuture = stockFuture
.thenApply(stock -> processOrder(stock))
.whenComplete((result, ex) -> {
if (ex != null) {
// 记录异常日志
logError(ex);
}
// 完成订单处理后记录日志
logOrder(result);
});
注意点:future需要有返回值才能处理或捕获异常
最好自定义线程池然后在supplyAsync(()->{},executor)使用自定义的线程池
CompletableFuture 线程池策略最好使用 AbortPolicy,异步线程加耗时处理/重试机制【否则线程池饱和丢弃任务不会抛出异常】
AND: 任务都完成后,再执行后续的任务
OR: 有一个任务完成,就执行后面的任务