写在前面
Spring 5 引入 WebFlux 后,“响应式编程"成了 Java 圈无法绕过的话题。但多数人对 Reactor 的理解止于:
“把 List 换成 Flux 不就行了?把 Object 换成 Mono?”
实际写起来你会发现根本不是这回事——
- 同步代码改成 Reactor 后性能不一定提升
- 一不小心
.block() 一下,所有响应式优势瞬间归零 - 异常处理逻辑要重新设计
- ThreadLocal、Spring Security context 全部不见了
Reactor 不是『async 的语法糖』——它是另一种编程范式。本文用最少的概念把 Reactor 讲清楚:核心思想、Mono / Flux、操作符、调度器、和典型陷阱。
一、为什么需要响应式
传统 Spring MVC 的线程模型是一请求一线程:
flowchart LR
R1[Request 1] --> T1[Thread 1
占用整个生命周期]
R2[Request 2] --> T2[Thread 2]
R3[Request 3] --> T3[Thread 3]
T1 --> DB1[(DB query 100ms)]
T2 --> DB2[(DB query 200ms)]Tomcat 默认线程池 200 个——理论上 200 并发。但实际上每个线程在等数据库时完全空闲——CPU 浪费、内存占用大。
响应式的思路是 少线程 + 异步驱动:
flowchart LR
R1[Request 1] --> EL[Event Loop
单/少线程]
R2[Request 2] --> EL
R3[Request 3] --> EL
EL -.异步.-> DB[(DB)]
DB -.callback.-> EL请求来了立刻挂起 callback,不占线程;DB 返回时再继续——几个线程能撑几万 QPS。
这就是 Node.js / Netty / Reactor 的哲学。
二、Reactor 的两个核心类型
Mono:0 或 1 个值的异步流
1
| Mono<User> user = userRepo.findById(1L);
|
类似 Optional<CompletableFuture<T>>——可能有值、可能没值、可能异步。
Flux:0 到 N 个值的异步流
1
| Flux<User> users = userRepo.findAll();
|
类似 Stream<CompletableFuture<T>>——一系列异步到达的元素。
共同特征:lazy
1
2
3
4
| Flux<Integer> f = Flux.range(1, 10).map(i -> i * 2);
// 此时什么都没发生
f.subscribe(System.out::println);
// 现在才执行
|
没有 subscribe 就什么都不会发生——这是初学者最大的坑(“为什么我的 log 没打印”)。
三、常用操作符
创建
1
2
3
4
5
6
7
8
9
| Mono.just(value) // 单个值
Mono.empty() // 空
Mono.error(new RuntimeException())
Mono.fromCallable(() -> ...) // 包装阻塞调用
Flux.just(1, 2, 3)
Flux.range(1, 100)
Flux.fromIterable(list)
Flux.interval(Duration.ofSeconds(1)) // 每秒发一个
|
转换
1
2
3
4
| Flux.range(1, 5)
.map(i -> i * 2) // 同步转换
.flatMap(i -> Mono.fromCallable(() -> rpcCall(i))) // 异步转换
.filter(i -> i > 5)
|
map vs flatMap 的差异:
map:T → RflatMap:T → Mono / Flux,会"展平"嵌套
组合
1
2
3
4
5
| Mono.zip(getUserA(), getUserB()) // 并行执行,全部完成后合并
.map(t -> new Pair(t.getT1(), t.getT2()));
Flux.merge(streamA, streamB) // 多流交错合并
Flux.concat(streamA, streamB) // A 完后再 B
|
错误处理
1
2
3
4
| mono.onErrorReturn(defaultValue) // 出错给默认值
.onErrorResume(e -> backupMono) // 出错切到备用流
.retry(3) // 重试
.timeout(Duration.ofSeconds(5)); // 超时
|
终止
1
2
| mono.subscribe(value -> ..., err -> ..., () -> done); // 触发执行
mono.block(); // ⚠️ 阻塞拿值
|
四、调度器:在哪个线程执行
Reactor 的关键之一——控制每段代码跑在哪个线程:
1
2
3
4
5
| Mono.fromCallable(() -> jdbcQuery()) // 阻塞调用
.subscribeOn(Schedulers.boundedElastic()) // 移到 IO 线程池
.map(this::transform) // 仍在 IO 线程
.publishOn(Schedulers.parallel()) // 切到 CPU 线程池
.map(this::heavyCompute); // 在 CPU 线程
|
主流调度器:
| 调度器 | 用途 |
|---|
Schedulers.parallel() | CPU 密集,线程数 = 核数 |
Schedulers.boundedElastic() | 阻塞 IO(默认线程上限 10×CPU,每线程任务队列上限 100k,空闲 60s 自动回收) |
Schedulers.single() | 全局共享单线程 |
Schedulers.immediate() | 当前线程直接执行 |
subscribeOn vs publishOn
| subscribeOn | publishOn |
|---|
| 影响范围 | 整条链(从源开始) | 之后的操作符 |
| 多次调用 | 只有第一次生效 | 每次都生效,切换线程 |
| 典型用途 | 把"源头"放到对应线程 | 中途切换线程 |
记住:subscribeOn 全局生效一次,publishOn 每次都生效。
五、阻塞与响应式的边界
Reactor 的最大原则——响应式链里绝不能有阻塞代码。
1
2
3
4
| // ❌ 反例
Mono.just(id)
.map(i -> jdbcTemplate.queryForObject("SELECT ...", ...)) // 阻塞!
.subscribe();
|
JDBC 是阻塞的——直接放在 map 里会阻塞 Reactor 的事件线程,整个应用响应能力归零。
正确做法 1:包到 fromCallable + boundedElastic:
1
2
3
| Mono.fromCallable(() -> jdbcTemplate.queryForObject("SELECT ...", ...))
.subscribeOn(Schedulers.boundedElastic()) // 在 IO 池跑
.map(this::transform);
|
正确做法 2:用响应式驱动(R2DBC 替代 JDBC):
1
2
| DatabaseClient client = ...;
client.sql("SELECT ...").map(row -> ...).first(); // 原生响应式
|
R2DBC、Spring Data Reactive Mongo / Redis、WebClient 都是响应式驱动——整条链是真异步。
六、几个最常被踩的坑
1. 没 subscribe 就期待执行
1
2
3
4
| public void process() {
fluxOfWork.map(...).subscribe(...); // 启动了
monoOfThing.map(...); // ❌ 没 subscribe,这条链永远不跑
}
|
Reactor 链一旦写出来就要 subscribe,否则等于死代码。
2. block() 滥用
1
| User user = userRepo.findById(id).block(); // 把异步变同步
|
.block() 让响应式优势全没了——主要用在测试代码、main 函数、或与同步框架/启动初始化的桥接处。生产代码里大面积出现
.block() 通常是反模式信号。
3. ThreadLocal 失效
1
2
3
| Mono.fromCallable(() -> {
// ThreadLocal 拿不到——线程切换了
})
|
线程跨切换后 ThreadLocal、Spring Security 上下文都丢失。Reactor 提供 Context 替代:
1
2
3
4
| Mono.deferContextual(ctx -> {
String userId = ctx.get("userId");
return service.process(userId);
}).contextWrite(Context.of("userId", currentUserId));
|
4. 顺序保证
1
2
3
| Flux.range(1, 100)
.flatMap(i -> remoteCall(i)) // 并发执行,结果顺序乱
.subscribe(System.out::println);
|
flatMap 默认并发——结果顺序不保证。要保序用 concatMap:
1
| .concatMap(i -> remoteCall(i)) // 串行执行,保序
|
5. Hot vs Cold
1
2
3
4
5
6
7
8
| Flux<Integer> cold = Flux.range(1, 5);
cold.subscribe(System.out::println); // 1 2 3 4 5
cold.subscribe(System.out::println); // 1 2 3 4 5(每个 subscriber 都从头)
Flux<Long> hot = Flux.interval(Duration.ofSeconds(1)).share();
hot.subscribe(System.out::println); // 0 1 2 ...
Thread.sleep(3000);
hot.subscribe(System.out::println); // 3 4 5 ...(错过的丢了)
|
Cold 流给每个 subscriber 重放;Hot 流是广播——搞错会导致数据丢失。
七、WebFlux:响应式 Web
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| @RestController
public class UserController {
private final UserRepository repo;
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return repo.findById(id);
}
@GetMapping("/users")
public Flux<User> listUsers() {
return repo.findAll();
}
@PostMapping("/users")
public Mono<User> create(@RequestBody Mono<UserDTO> dto) {
return dto.map(this::toEntity).flatMap(repo::save);
}
}
|
WebFlux 用 Netty 做底层,单服务器能撑数万长连接——比 MVC 模型在高并发场景下省资源。
但WebFlux 不是『性能银弹』——
- 业务复杂、JDBC 重的场景,MVC 反而更简单
- 团队没有响应式经验时,WebFlux 的代码维护成本高
- 99% 的中小项目用 MVC 完全够
八、调试响应式代码
log() 操作符
1
2
3
4
| mono.log("user-fetch")
.map(...)
.log("after-map")
.subscribe();
|
打印每一步的 onSubscribe、request、onNext、onComplete、onError——是调试响应式链最基础的方法。
checkpoint
1
2
3
4
5
| Flux.range(1, 5)
.checkpoint("after-range")
.map(i -> 100 / (i - 3))
.checkpoint("after-divide")
.subscribe(...);
|
checkpoint 给错误堆栈加标签——出错时能立刻定位到哪一段。
启动时 Hooks.onOperatorDebug() 让所有错误带完整堆栈——性能影响大,仅开发调试用。
九、什么场景值得用 WebFlux
✅ 适合:
- 大量 IO 等待(如 fetch 多个 RPC 然后聚合)
- 长连接(SSE / WebSocket / 流式响应)
- 网关/代理类应用(高并发转发)
- 集成响应式数据源(R2DBC + WebClient + Reactive Redis)
❌ 不适合:
- CPU 密集业务——线程模型变化无收益
- JDBC + JPA 重度依赖——R2DBC 生态远不如 JPA 成熟
- 小团队、传统业务——MVC 简单就行
小结
把全文压一句:
Reactor 不是『async 包装』,是『让你以"数据流"思维重写程序』——核心收益在 IO 密集、高并发场景,但要付出整条链路全异步的代价。
工程要点:
- 没 subscribe 不会执行
.block() 是反模式- 阻塞代码必须用
boundedElastic - ThreadLocal 失效,用
Context flatMap 不保序,concatMap 保序
掌握 Reactor 不只是学几个 API——是接受"程序由数据流驱动"这种思维。值得每个 Spring 开发者了解,但不必每个项目都用。