一段几乎所有人都写过的反面代码
需要并发调用 5 个外部接口,最后聚合结果——多数人这么写:
1
2
3
4
5
6
7
8
9
10
| ExecutorService pool = Executors.newFixedThreadPool(5);
List<Future<Result>> futures = new ArrayList<>();
for (Task task : tasks) {
futures.add(pool.submit(() -> callApi(task)));
}
List<Result> results = new ArrayList<>();
for (Future<Result> f : futures) {
results.add(f.get()); // ★ 阻塞等
}
|
看起来挺合理。但有个性能陷阱——futures 的顺序是提交顺序,不是完成顺序。
假设 5 个接口耗时是:
写在 for 循环里 f.get() 时——第一次 get 拿 A 的结果(3s)→ 然后 B 的(已经完成,立刻返回)→ 然后 C(再等 2s)→ D 已完成 → E 已完成。
总耗时是 5s(最慢的那个)。这没问题。
但中间 3s ~ 5s 之间 D、E、B 都已经完成了,结果还在原地等 C——如果"处理结果"也耗时(比如把结果写 DB),整个流水线被最慢的任务卡住。
理想情况是:谁先完成,就先处理谁的结果——而不是按提交顺序傻等。ExecutorCompletionService 就是为这件事设计的。
一、CompletionService 是什么
CompletionService 是 JDK 5 就有的接口,但很多人不知道它存在:
1
2
3
4
5
6
| public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> take() throws InterruptedException; // 阻塞,拿一个完成的
Future<V> poll(); // 非阻塞
Future<V> poll(long timeout, TimeUnit unit); // 超时
}
|
它内部维护一个已完成任务队列——任何任务完成后立刻进队列,take() 从队列取出一个。
flowchart LR
Submit[submit×N] --> Pool[ThreadPool]
Pool --> Done[已完成队列]
Take[take/poll] -.从队列读.-> Done先完成的先被取走——而不是按提交顺序等。
二、用法对比
反例:朴素 Future.get()
1
2
3
4
5
6
7
8
9
| ExecutorService pool = Executors.newFixedThreadPool(5);
List<Future<Result>> futures = tasks.stream()
.map(t -> pool.submit(() -> callApi(t)))
.collect(toList());
for (Future<Result> f : futures) {
Result r = f.get(); // 按提交顺序阻塞
handle(r);
}
|
正解:CompletionService
1
2
3
4
5
6
7
8
9
10
11
| ExecutorService pool = Executors.newFixedThreadPool(5);
CompletionService<Result> ecs = new ExecutorCompletionService<>(pool);
for (Task t : tasks) {
ecs.submit(() -> callApi(t));
}
for (int i = 0; i < tasks.size(); i++) {
Result r = ecs.take().get(); // 先完成的先返回
handle(r);
}
|
唯一变化是 take() 替代了 f.get()——但行为完全不同。
三、性能差异:一个真实数字
5 个任务,耗时分别 1s、1s、2s、3s、5s。handle() 处理每个结果耗时 1s。
| 方案 | 耗时 |
|---|
| 朴素 Future.get() | 5s + 5×1s = 10s (处理被串行化) |
| CompletionService | 5s + 1s = 6s (处理跟着完成走) |
任务越多、处理越重,差距越大。这是 CompletionService 的真正价值——让"等待"和"处理"重叠。
四、典型场景
1. 并发调多个外部接口聚合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| public List<UserVO> batchGetUsers(List<Long> ids) {
CompletionService<UserVO> ecs = new ExecutorCompletionService<>(pool);
for (Long id : ids) {
ecs.submit(() -> rpcGetUser(id));
}
List<UserVO> result = new ArrayList<>();
for (int i = 0; i < ids.size(); i++) {
try {
result.add(ecs.take().get());
} catch (Exception e) {
log.warn("partial failure", e);
}
}
return result;
}
|
2. 并发处理大批量数据,结果立刻入库
1
2
3
4
5
6
7
8
| CompletionService<Record> ecs = new ExecutorCompletionService<>(pool);
for (Long id : ids) {
ecs.submit(() -> heavyCompute(id));
}
for (int i = 0; i < ids.size(); i++) {
Record r = ecs.take().get();
repository.save(r); // 一完成就入库,不傻等
}
|
3. “找到第一个就返回"语义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| public Result findFirstHit(List<Source> sources) {
CompletionService<Result> ecs = new ExecutorCompletionService<>(pool);
List<Future<Result>> futures = sources.stream()
.map(s -> ecs.submit(() -> tryQuery(s)))
.collect(toList());
try {
for (int i = 0; i < sources.size(); i++) {
Result r = ecs.take().get();
if (r != null) {
return r; // 第一个非空结果返回
}
}
return null;
} finally {
futures.forEach(f -> f.cancel(true)); // 取消其他
}
}
|
五、和 CompletableFuture 的区别
JDK 8 的 CompletableFuture 是更强大的并发工具——能不能用它替代 CompletionService?
1
2
3
4
5
6
| List<CompletableFuture<Result>> futures = tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> callApi(t), pool))
.collect(toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(toList()));
|
CompletableFuture 能做更多事——chain / combine / handle 异常 / async。但它默认是"等所有完成"或"等任意完成”,没有"按完成顺序逐个处理"的内置语义。
要做到"按完成顺序处理",CompletableFuture 需要这样:
1
2
3
4
5
| List<CompletableFuture<Result>> futures = tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> callApi(t), pool))
.collect(toList());
futures.forEach(f -> f.thenAccept(r -> handle(r)));
|
也行——但每个 future 完成时回调在 ForkJoinPool / 自定义池里执行,handle 之间是并发的。如果 handle 本身要按顺序做(写 DB 不能并发),还是 CompletionService 更适合。
| CompletionService | CompletableFuture |
|---|
| 按完成顺序处理 | ✓ | △ 用回调 |
| 顺序处理 handle | ✓ 主线程串行 | △ 要再加锁/单线程 |
| 链式组合 / 异常处理 | △ | ✓ 强 |
| async/await 风格 | ✗ | ✓ |
| 学习曲线 | 低 | 中 |
简单的"并发任务 + 顺序处理结果"——CompletionService 更直接。复杂的并发编排——CompletableFuture。
六、踩坑提醒
1. take() 是阻塞的
主线程会卡住。如果上层有超时要求:
1
2
3
4
| Future<Result> f = ecs.poll(500, TimeUnit.MILLISECONDS);
if (f == null) {
// 超时
}
|
2. take() 调用次数要等于 submit 次数
少调一次会漏处理一个结果,多调一次会无限阻塞。用 for 循环的次数严格等于 submit 数。
3. 异常需要在 .get() 里 try
1
2
3
4
5
6
7
8
9
| for (int i = 0; i < n; i++) {
Future<Result> f = ecs.take();
try {
Result r = f.get();
handle(r);
} catch (ExecutionException e) {
log.error("task failed", e.getCause());
}
}
|
ExecutionException 包装了原始异常,getCause() 才是业务真正抛的异常。
4. 共享线程池的污染
1
2
| CompletionService<X> ecsA = new ExecutorCompletionService<>(sharedPool);
CompletionService<Y> ecsB = new ExecutorCompletionService<>(sharedPool);
|
ECS 用的是底层 pool,不会把 A、B 的结果混淆——但池子被两边任务挤占。共享池容量要够。
5. cancel 后 take 可能拿到 cancelled future
如果你 cancel 了某些任务,take 可能拿到已 cancelled 的 future,调用 f.get() 会抛 CancellationException。一定要 catch。
6. JDK 内置实现内部用 LinkedBlockingQueue
队列无界——任务慢、消费慢时队列会无限增长。海量任务场景需要自己实现 ECS(用有界队列)或者限流。
七、一份生产级模板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| public <T, R> List<R> parallelExecute(
Collection<T> tasks,
Function<T, R> action,
ExecutorService pool,
long timeout,
TimeUnit unit) {
CompletionService<R> ecs = new ExecutorCompletionService<>(pool);
List<Future<R>> futures = tasks.stream()
.map(t -> ecs.submit(() -> action.apply(t)))
.collect(toList());
long deadline = System.nanoTime() + unit.toNanos(timeout);
List<R> results = new ArrayList<>();
try {
for (int i = 0; i < tasks.size(); i++) {
long remaining = deadline - System.nanoTime();
Future<R> f = ecs.poll(remaining, TimeUnit.NANOSECONDS);
if (f == null) {
log.warn("partial timeout, completed {}/{}", i, tasks.size());
break;
}
try {
results.add(f.get());
} catch (ExecutionException e) {
log.error("task failed", e.getCause());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
futures.forEach(f -> f.cancel(true)); // 主流程结束时取消未完成任务
}
return results;
}
|
小结
把全文压一句:
ExecutorCompletionService 让『先完成的任务先被处理』——这是 Java 并发里最被低估的一个工具。
什么时候选它:
- 多个任务并发执行
- 需要按完成顺序逐个处理结果
- 处理逻辑本身耗时 / 串行
- 不需要 CompletableFuture 的复杂编排
工程纪律:
submit 几次就要 take 几次- 异常 catch ExecutionException + getCause
- 主流程结束 cancel 未完成任务
- 海量任务时换有界队列实现
下次再写"并发调 N 个外部接口然后处理结果"时——别再 for 循环 f.get() 了。