Featured image of post Spring Reactor 入门与常见陷阱

Spring Reactor 入门与常见陷阱

Reactor 不只是『把 List 换成 Flux』——它是一种数据流驱动的编程范式。本文讲清核心概念、操作符、调度器与典型陷阱

写在前面

Spring 5 引入 WebFlux 后,“响应式编程"成了 Java 圈无法绕过的话题。但多数人对 Reactor 的理解止于:

“把 List 换成 Flux 不就行了?把 Object 换成 Mono?”

实际写起来你会发现根本不是这回事——

  • 同步代码改成 Reactor 后性能不一定提升
  • 一不小心 .block() 一下,所有响应式优势瞬间归零
  • 异常处理逻辑要重新设计
  • ThreadLocal、Spring Security context 全部不见了

Reactor 不是『async 的语法糖』——它是另一种编程范式。本文用最少的概念把 Reactor 讲清楚:核心思想、Mono / Flux、操作符、调度器、和典型陷阱。


一、为什么需要响应式

传统 Spring MVC 的线程模型是一请求一线程

Tomcat 默认线程池 200 个——理论上 200 并发。但实际上每个线程在等数据库时完全空闲——CPU 浪费、内存占用大。

响应式的思路是 少线程 + 异步驱动

请求来了立刻挂起 callback,不占线程;DB 返回时再继续——几个线程能撑几万 QPS

这就是 Node.js / Netty / Reactor 的哲学。


二、Reactor 的两个核心类型

Mono:0 或 1 个值的异步流

1
Mono<User> user = userRepo.findById(1L);

类似 Optional<CompletableFuture<T>>——可能有值、可能没值、可能异步。

Flux:0 到 N 个值的异步流

1
Flux<User> users = userRepo.findAll();

类似 Stream<CompletableFuture<T>>——一系列异步到达的元素。

共同特征:lazy

1
2
3
4
Flux<Integer> f = Flux.range(1, 10).map(i -> i * 2);
// 此时什么都没发生
f.subscribe(System.out::println);
// 现在才执行

没有 subscribe 就什么都不会发生——这是初学者最大的坑(“为什么我的 log 没打印”)。


三、常用操作符

创建

1
2
3
4
5
6
7
8
9
Mono.just(value)             // 单个值
Mono.empty()                 // 空
Mono.error(new RuntimeException())
Mono.fromCallable(() -> ...) // 包装阻塞调用

Flux.just(1, 2, 3)
Flux.range(1, 100)
Flux.fromIterable(list)
Flux.interval(Duration.ofSeconds(1))   // 每秒发一个

转换

1
2
3
4
Flux.range(1, 5)
    .map(i -> i * 2)              // 同步转换
    .flatMap(i -> Mono.fromCallable(() -> rpcCall(i)))   // 异步转换
    .filter(i -> i > 5)

map vs flatMap 的差异:

  • map:T → R
  • flatMap:T → Mono / Flux,会"展平"嵌套

组合

1
2
3
4
5
Mono.zip(getUserA(), getUserB())   // 并行执行,全部完成后合并
    .map(t -> new Pair(t.getT1(), t.getT2()));

Flux.merge(streamA, streamB)        // 多流交错合并
Flux.concat(streamA, streamB)       // A 完后再 B

错误处理

1
2
3
4
mono.onErrorReturn(defaultValue)            // 出错给默认值
    .onErrorResume(e -> backupMono)         // 出错切到备用流
    .retry(3)                               // 重试
    .timeout(Duration.ofSeconds(5));        // 超时

终止

1
2
mono.subscribe(value -> ..., err -> ..., () -> done);   // 触发执行
mono.block();                                            // ⚠️ 阻塞拿值

四、调度器:在哪个线程执行

Reactor 的关键之一——控制每段代码跑在哪个线程

1
2
3
4
5
Mono.fromCallable(() -> jdbcQuery())      // 阻塞调用
    .subscribeOn(Schedulers.boundedElastic())   // 移到 IO 线程池
    .map(this::transform)                       // 仍在 IO 线程
    .publishOn(Schedulers.parallel())            // 切到 CPU 线程池
    .map(this::heavyCompute);                    // 在 CPU 线程

主流调度器:

调度器用途
Schedulers.parallel()CPU 密集,线程数 = 核数
Schedulers.boundedElastic()阻塞 IO(默认线程上限 10×CPU,每线程任务队列上限 100k,空闲 60s 自动回收)
Schedulers.single()全局共享单线程
Schedulers.immediate()当前线程直接执行

subscribeOn vs publishOn

subscribeOnpublishOn
影响范围整条链(从源开始)之后的操作符
多次调用只有第一次生效每次都生效,切换线程
典型用途把"源头"放到对应线程中途切换线程

记住subscribeOn 全局生效一次,publishOn 每次都生效。


五、阻塞与响应式的边界

Reactor 的最大原则——响应式链里绝不能有阻塞代码

1
2
3
4
// ❌ 反例
Mono.just(id)
    .map(i -> jdbcTemplate.queryForObject("SELECT ...", ...))   // 阻塞!
    .subscribe();

JDBC 是阻塞的——直接放在 map 里会阻塞 Reactor 的事件线程,整个应用响应能力归零

正确做法 1:包到 fromCallable + boundedElastic:

1
2
3
Mono.fromCallable(() -> jdbcTemplate.queryForObject("SELECT ...", ...))
    .subscribeOn(Schedulers.boundedElastic())   // 在 IO 池跑
    .map(this::transform);

正确做法 2:用响应式驱动(R2DBC 替代 JDBC):

1
2
DatabaseClient client = ...;
client.sql("SELECT ...").map(row -> ...).first();   // 原生响应式

R2DBC、Spring Data Reactive Mongo / Redis、WebClient 都是响应式驱动——整条链是真异步


六、几个最常被踩的坑

1. 没 subscribe 就期待执行

1
2
3
4
public void process() {
    fluxOfWork.map(...).subscribe(...);   // 启动了
    monoOfThing.map(...);                  // ❌ 没 subscribe,这条链永远不跑
}

Reactor 链一旦写出来就要 subscribe,否则等于死代码

2. block() 滥用

1
User user = userRepo.findById(id).block();   // 把异步变同步

.block() 让响应式优势全没了——主要用在测试代码、main 函数、或与同步框架/启动初始化的桥接处。生产代码里大面积出现 .block() 通常是反模式信号。

3. ThreadLocal 失效

1
2
3
Mono.fromCallable(() -> {
    // ThreadLocal 拿不到——线程切换了
})

线程跨切换后 ThreadLocal、Spring Security 上下文都丢失。Reactor 提供 Context 替代

1
2
3
4
Mono.deferContextual(ctx -> {
    String userId = ctx.get("userId");
    return service.process(userId);
}).contextWrite(Context.of("userId", currentUserId));

4. 顺序保证

1
2
3
Flux.range(1, 100)
    .flatMap(i -> remoteCall(i))   // 并发执行,结果顺序乱
    .subscribe(System.out::println);

flatMap 默认并发——结果顺序不保证。要保序用 concatMap

1
.concatMap(i -> remoteCall(i))   // 串行执行,保序

5. Hot vs Cold

1
2
3
4
5
6
7
8
Flux<Integer> cold = Flux.range(1, 5);
cold.subscribe(System.out::println);   // 1 2 3 4 5
cold.subscribe(System.out::println);   // 1 2 3 4 5(每个 subscriber 都从头)

Flux<Long> hot = Flux.interval(Duration.ofSeconds(1)).share();
hot.subscribe(System.out::println);   // 0 1 2 ...
Thread.sleep(3000);
hot.subscribe(System.out::println);   // 3 4 5 ...(错过的丢了)

Cold 流给每个 subscriber 重放;Hot 流是广播——搞错会导致数据丢失


七、WebFlux:响应式 Web

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@RestController
public class UserController {

    private final UserRepository repo;

    @GetMapping("/users/{id}")
    public Mono<User> getUser(@PathVariable Long id) {
        return repo.findById(id);
    }

    @GetMapping("/users")
    public Flux<User> listUsers() {
        return repo.findAll();
    }

    @PostMapping("/users")
    public Mono<User> create(@RequestBody Mono<UserDTO> dto) {
        return dto.map(this::toEntity).flatMap(repo::save);
    }
}

WebFlux 用 Netty 做底层,单服务器能撑数万长连接——比 MVC 模型在高并发场景下省资源。

WebFlux 不是『性能银弹』——

  • 业务复杂、JDBC 重的场景,MVC 反而更简单
  • 团队没有响应式经验时,WebFlux 的代码维护成本高
  • 99% 的中小项目用 MVC 完全够

八、调试响应式代码

log() 操作符

1
2
3
4
mono.log("user-fetch")
    .map(...)
    .log("after-map")
    .subscribe();

打印每一步的 onSubscriberequestonNextonCompleteonError——是调试响应式链最基础的方法。

checkpoint

1
2
3
4
5
Flux.range(1, 5)
    .checkpoint("after-range")
    .map(i -> 100 / (i - 3))
    .checkpoint("after-divide")
    .subscribe(...);

checkpoint 给错误堆栈加标签——出错时能立刻定位到哪一段。

Reactor Tools

启动时 Hooks.onOperatorDebug() 让所有错误带完整堆栈——性能影响大,仅开发调试用


九、什么场景值得用 WebFlux

✅ 适合:

  • 大量 IO 等待(如 fetch 多个 RPC 然后聚合)
  • 长连接(SSE / WebSocket / 流式响应)
  • 网关/代理类应用(高并发转发)
  • 集成响应式数据源(R2DBC + WebClient + Reactive Redis)

❌ 不适合:

  • CPU 密集业务——线程模型变化无收益
  • JDBC + JPA 重度依赖——R2DBC 生态远不如 JPA 成熟
  • 小团队、传统业务——MVC 简单就行

小结

把全文压一句:

Reactor 不是『async 包装』,是『让你以"数据流"思维重写程序』——核心收益在 IO 密集、高并发场景,但要付出整条链路全异步的代价。

工程要点:

  • 没 subscribe 不会执行
  • .block() 是反模式
  • 阻塞代码必须用 boundedElastic
  • ThreadLocal 失效,用 Context
  • flatMap 不保序,concatMap 保序

掌握 Reactor 不只是学几个 API——是接受"程序由数据流驱动"这种思维。值得每个 Spring 开发者了解,但不必每个项目都用。

使用 Hugo 构建
主题 StackJimmy 设计