YWW
YWW
发布于 2025-08-24 / 10 阅读
1
0

常用的多线程:JUC/FutureTask

对于一些调用第三方接口或多个任务处理,调用的时候不能阻塞主线程,同时又需要提高响应速度,需要用到一些多线程编排方式。

复习一些预备知识

1.JUC:java中提供的多线程编程包/并发框架 【java.util.concurrent】

提供了锁的定义LOCK、线程安全的原子量变量类Atomic变量 、线程执行任务存放的等待队列 、并发工具辅助类等

下面结合源码分析一下

【核心思想】

(1)【原子性变量】

(2)【锁LOCK】

(3)【辅助工具类】

  1. CountDownLatch

  2. CyclicBarrier

  3. Semaphore

2.CompleteableFuture

这个东西是Future接口的改进:
从前的Future对象和对应方法虽然有异步执行任务的功能,但是获取结果需要调用Future.get() 阻塞主线程来获取;或者Future.isDone() 判断任务结束再返回结果
另外,Future无法处理主线程子线程相互依赖的场景,就是一个线程需要前面线程执行后的结果。但Future 中 主线程需要等待子线程任务执行完毕之后在进行执行

CompleteableFuture 提供了很多方法,包括非阻塞的获取返回值,线程间的依赖,获取上一个线程结果后执行然后还能返回值

(1)创建方法:

runAsync:没有返回值的

supplyAsync: 有返回值

CompletableFuture<Void> paymentFuture = CompletableFuture.runAsync(() -> {
    // 验证支付(异步)
    validatePayment();
});

CompletableFuture<String> stockFuture = CompletableFuture.supplyAsync(() -> {
    // 扣减库存并返回状态
    return reduceStock();
});

(2)获取结果

get方法和join方法

get() 和 get(Long timeout,TimeUnit unit) 可以设置过期时间后抛出异常

join() 抛出非受检异常 get()必须进行异常处理
例:

String result = orderFuture.join();  // 获取任务结果,发生异常时抛出 CompletionException

两个方法收集结果的时候都是阻塞主线程的,阻塞知道任务完成

(3)组合多个异步任务结果

allof() 和 anyof()

【两个收集方法都是阻塞的】

allof(): 所有任务都完成后 返回一个新的CompletableFuture对象【等待所有任务】

anyof(): 任意一个任务完成,返回有一个CompletableFuture,相当于那个任务先执行完返回这个任务的CompletableFuture【等待一个任务】

(4)处理异步任务完成后的结果

thenApply / thenAccept / thenRun

【这里的方法是非阻塞的,即不阻塞主线程,任务完成后异步执行后续操作】

thenApply() : 处理返回值然后返回新的结果,需要有一个return

thenAccept() : 处理返回值 但不需要返回结果【比如获取到任务执行后从里面把数据拿出来填充到一个dto】

thenRun:在任务完成后执行一个没有输入和输出的操作。【记录日志或执行update、更新状态等】输入和输出都没有

// thenapply
CompletableFuture<String> orderFuture = stockFuture.thenApply(stock -> {
    // 扣减库存后,处理订单(可以修改订单信息)
    processOrder(stock);
    return "Order processed";
});
// thenaccept
xxx.thenAccept(audioBytes -> {
                        String fileName = String.format("%03d_%s.wav", idx, dto.getEmotion());
                        Path output = Path.of(outputDir, fileName);
                        try {
                            Files.write(output, audioBytes);
                        } catch (IOException e) {
                            throw new RuntimeException("文件保存失败" ,e);
                        }
// 两个都用于链式调用


异步任务的组合thenCompose / thenCombine

thenCompose() : 处理链式异步任务,返回一个CompletableFuture

【相当于在一个A异步任务中调用另一个B异步任务,B任务需要A任务入参】

thenCombine:用来组合两个并行执行的异步任务的结果,返回一个新结果。

【两个异步任务并发执行,把结果组合到一起】

// 先执行支付,支付完成后再执行扣减
CompletableFuture<String> orderFuture = paymentFuture
    .thenCompose(payment -> CompletableFuture.supplyAsync(() -> reduceStock(payment)));
// 如果支付和扣减一起执行,然后返回一个总订单金额
CompletableFuture<Integer> orderTotalFuture = paymentFuture
    .thenCombine(stockFuture, (payment, stock) -> payment + stock);

(5)处理异常exceptionally / handle / whenComplete

正常一个异步任务失败会抛出异常

exceptionally: 处理异常返回一个默认值,有点像throw new IOException

handle:在任务完成后处理结果或者异常,并返回一个新的值

whenComplete : 添加了一个回调函数,任务完成还是发生异常,都正常执行

// 出现异常记录日志,无异常则完成后记录日志
CompletableFuture<String> orderFuture = stockFuture
    .thenApply(stock -> processOrder(stock))
    .whenComplete((result, ex) -> {
        if (ex != null) {
            // 记录异常日志
            logError(ex);
        }
        // 完成订单处理后记录日志
        logOrder(result);
    });

注意点:future需要有返回值才能处理或捕获异常

最好自定义线程池然后在supplyAsync(()->{},executor)使用自定义的线程池

CompletableFuture 线程池策略最好使用 AbortPolicy,异步线程加耗时处理/重试机制【否则线程池饱和丢弃任务不会抛出异常】

AND: 任务都完成后,再执行后续的任务

OR: 有一个任务完成,就执行后面的任务


评论