Featured image of post ExecutorCompletionService 与 CompletableFuture:场景与选型

ExecutorCompletionService 与 CompletableFuture:场景与选型

Future.get() 阻塞等结果是新人最容易踩的坑。CompletionService 让先完成的任务结果先被处理——Java 并发的『流水线收银台』

一段几乎所有人都写过的反面代码

需要并发调用 5 个外部接口,最后聚合结果——多数人这么写:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
ExecutorService pool = Executors.newFixedThreadPool(5);
List<Future<Result>> futures = new ArrayList<>();
for (Task task : tasks) {
    futures.add(pool.submit(() -> callApi(task)));
}

List<Result> results = new ArrayList<>();
for (Future<Result> f : futures) {
    results.add(f.get());   // ★ 阻塞等
}

看起来挺合理。但有个性能陷阱——futures 的顺序是提交顺序,不是完成顺序

假设 5 个接口耗时是:

接口耗时
A3s
B1s
C5s
D2s
E1s

写在 for 循环里 f.get() 时——第一次 get 拿 A 的结果(3s)→ 然后 B 的(已经完成,立刻返回)→ 然后 C(再等 2s)→ D 已完成 → E 已完成

总耗时是 5s(最慢的那个)。这没问题。

但中间 3s ~ 5s 之间 D、E、B 都已经完成了,结果还在原地等 C——如果"处理结果"也耗时(比如把结果写 DB),整个流水线被最慢的任务卡住

理想情况是:谁先完成,就先处理谁的结果——而不是按提交顺序傻等。ExecutorCompletionService 就是为这件事设计的。


一、CompletionService 是什么

CompletionService 是 JDK 5 就有的接口,但很多人不知道它存在

1
2
3
4
5
6
public interface CompletionService<V> {
    Future<V> submit(Callable<V> task);
    Future<V> take() throws InterruptedException;       // 阻塞,拿一个完成的
    Future<V> poll();                                   // 非阻塞
    Future<V> poll(long timeout, TimeUnit unit);        // 超时
}

它内部维护一个已完成任务队列——任何任务完成后立刻进队列,take() 从队列取出一个。

先完成的先被取走——而不是按提交顺序等。


二、用法对比

反例:朴素 Future.get()

1
2
3
4
5
6
7
8
9
ExecutorService pool = Executors.newFixedThreadPool(5);
List<Future<Result>> futures = tasks.stream()
        .map(t -> pool.submit(() -> callApi(t)))
        .collect(toList());

for (Future<Result> f : futures) {
    Result r = f.get();   // 按提交顺序阻塞
    handle(r);
}

正解:CompletionService

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletionService<Result> ecs = new ExecutorCompletionService<>(pool);

for (Task t : tasks) {
    ecs.submit(() -> callApi(t));
}

for (int i = 0; i < tasks.size(); i++) {
    Result r = ecs.take().get();   // 先完成的先返回
    handle(r);
}

唯一变化是 take() 替代了 f.get()——但行为完全不同。


三、性能差异:一个真实数字

5 个任务,耗时分别 1s、1s、2s、3s、5s。handle() 处理每个结果耗时 1s。

方案耗时
朴素 Future.get()5s + 5×1s = 10s (处理被串行化)
CompletionService5s + 1s = 6s (处理跟着完成走)

任务越多、处理越重,差距越大。这是 CompletionService 的真正价值——让"等待"和"处理"重叠。


四、典型场景

1. 并发调多个外部接口聚合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
public List<UserVO> batchGetUsers(List<Long> ids) {
    CompletionService<UserVO> ecs = new ExecutorCompletionService<>(pool);
    for (Long id : ids) {
        ecs.submit(() -> rpcGetUser(id));
    }
    List<UserVO> result = new ArrayList<>();
    for (int i = 0; i < ids.size(); i++) {
        try {
            result.add(ecs.take().get());
        } catch (Exception e) {
            log.warn("partial failure", e);
        }
    }
    return result;
}

2. 并发处理大批量数据,结果立刻入库

1
2
3
4
5
6
7
8
CompletionService<Record> ecs = new ExecutorCompletionService<>(pool);
for (Long id : ids) {
    ecs.submit(() -> heavyCompute(id));
}
for (int i = 0; i < ids.size(); i++) {
    Record r = ecs.take().get();
    repository.save(r);   // 一完成就入库,不傻等
}

3. “找到第一个就返回"语义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public Result findFirstHit(List<Source> sources) {
    CompletionService<Result> ecs = new ExecutorCompletionService<>(pool);
    List<Future<Result>> futures = sources.stream()
            .map(s -> ecs.submit(() -> tryQuery(s)))
            .collect(toList());

    try {
        for (int i = 0; i < sources.size(); i++) {
            Result r = ecs.take().get();
            if (r != null) {
                return r;       // 第一个非空结果返回
            }
        }
        return null;
    } finally {
        futures.forEach(f -> f.cancel(true));   // 取消其他
    }
}

五、和 CompletableFuture 的区别

JDK 8 的 CompletableFuture 是更强大的并发工具——能不能用它替代 CompletionService?

1
2
3
4
5
6
List<CompletableFuture<Result>> futures = tasks.stream()
        .map(t -> CompletableFuture.supplyAsync(() -> callApi(t), pool))
        .collect(toList());

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        .thenApply(v -> futures.stream().map(CompletableFuture::join).collect(toList()));

CompletableFuture 能做更多事——chain / combine / handle 异常 / async。但它默认是"等所有完成"或"等任意完成”,没有"按完成顺序逐个处理"的内置语义。

要做到"按完成顺序处理",CompletableFuture 需要这样:

1
2
3
4
5
List<CompletableFuture<Result>> futures = tasks.stream()
        .map(t -> CompletableFuture.supplyAsync(() -> callApi(t), pool))
        .collect(toList());

futures.forEach(f -> f.thenAccept(r -> handle(r)));

也行——但每个 future 完成时回调在 ForkJoinPool / 自定义池里执行,handle 之间是并发的。如果 handle 本身要按顺序做(写 DB 不能并发),还是 CompletionService 更适合。

CompletionServiceCompletableFuture
按完成顺序处理△ 用回调
顺序处理 handle✓ 主线程串行△ 要再加锁/单线程
链式组合 / 异常处理✓ 强
async/await 风格
学习曲线

简单的"并发任务 + 顺序处理结果"——CompletionService 更直接。复杂的并发编排——CompletableFuture


六、踩坑提醒

1. take() 是阻塞的

主线程会卡住。如果上层有超时要求:

1
2
3
4
Future<Result> f = ecs.poll(500, TimeUnit.MILLISECONDS);
if (f == null) {
    // 超时
}

2. take() 调用次数要等于 submit 次数

少调一次会漏处理一个结果,多调一次会无限阻塞。用 for 循环的次数严格等于 submit 数

3. 异常需要在 .get() 里 try

1
2
3
4
5
6
7
8
9
for (int i = 0; i < n; i++) {
    Future<Result> f = ecs.take();
    try {
        Result r = f.get();
        handle(r);
    } catch (ExecutionException e) {
        log.error("task failed", e.getCause());
    }
}

ExecutionException 包装了原始异常,getCause() 才是业务真正抛的异常。

4. 共享线程池的污染

1
2
CompletionService<X> ecsA = new ExecutorCompletionService<>(sharedPool);
CompletionService<Y> ecsB = new ExecutorCompletionService<>(sharedPool);

ECS 用的是底层 pool,不会把 A、B 的结果混淆——但池子被两边任务挤占。共享池容量要够。

5. cancel 后 take 可能拿到 cancelled future

如果你 cancel 了某些任务,take 可能拿到已 cancelled 的 future,调用 f.get() 会抛 CancellationException。一定要 catch。

6. JDK 内置实现内部用 LinkedBlockingQueue

队列无界——任务慢、消费慢时队列会无限增长。海量任务场景需要自己实现 ECS(用有界队列)或者限流。


七、一份生产级模板

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public <T, R> List<R> parallelExecute(
        Collection<T> tasks,
        Function<T, R> action,
        ExecutorService pool,
        long timeout,
        TimeUnit unit) {

    CompletionService<R> ecs = new ExecutorCompletionService<>(pool);
    List<Future<R>> futures = tasks.stream()
            .map(t -> ecs.submit(() -> action.apply(t)))
            .collect(toList());

    long deadline = System.nanoTime() + unit.toNanos(timeout);
    List<R> results = new ArrayList<>();
    try {
        for (int i = 0; i < tasks.size(); i++) {
            long remaining = deadline - System.nanoTime();
            Future<R> f = ecs.poll(remaining, TimeUnit.NANOSECONDS);
            if (f == null) {
                log.warn("partial timeout, completed {}/{}", i, tasks.size());
                break;
            }
            try {
                results.add(f.get());
            } catch (ExecutionException e) {
                log.error("task failed", e.getCause());
            }
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    } finally {
        futures.forEach(f -> f.cancel(true));   // 主流程结束时取消未完成任务
    }
    return results;
}

小结

把全文压一句:

ExecutorCompletionService 让『先完成的任务先被处理』——这是 Java 并发里最被低估的一个工具。

什么时候选它:

  • 多个任务并发执行
  • 需要按完成顺序逐个处理结果
  • 处理逻辑本身耗时 / 串行
  • 不需要 CompletableFuture 的复杂编排

工程纪律:

  • submit 几次就要 take 几次
  • 异常 catch ExecutionException + getCause
  • 主流程结束 cancel 未完成任务
  • 海量任务时换有界队列实现

下次再写"并发调 N 个外部接口然后处理结果"时——别再 for 循环 f.get()

使用 Hugo 构建
主题 StackJimmy 设计