响应式编程中Mono和Flux的区别

前言

当我们在使用Project Reactor,或者使用依赖于它的框架的时候。例如spring webflux,spring cloud gateway等,经常会用看到代码中有Mono和Flux两个术语。

响应式流

Reactor是由Pivotal公司开发的开源框架,它是开发响应式应用的基础。如今,它是建立在由Netflix、Pivotal和Lightbend的工程师以及其他大型JAVA玩家(Oracle和Red Hat)提出的联合倡议(reactive streams initiative)的基础上。
在这个倡议中,响应式流的规范被创建,有以下几个关键因素:

  • 响应式流应该是非阻塞的,
  • 它是一种数据流,
  • 它是异步工作的,
  • 它需要能够处理背压。

制定以上标准的原因是:当我们通常写应用程序的时候,我们会进行数据库调用、HTTP调用,我们发起请求,阻塞线程,直到有响应返回,然后继续。虽然这种方式可以工作,但是这是一种资源浪费。

发布者Publisher

在开发过程中,不再返回简单的POJO对象,而必须返回其他内容,在结果可用的时候返回。在响应式流的规范中,被称为发布者(Publisher)。发布者有一个subcribe()方法,该方法允许使用者在POJO可用时获取它。发布者可以通过以下两种形式返回结果:

  • Mono返回0个或1个结果,
  • Flux返回0个或多个结果,可能是无限个。

Mono

Mono是Publisher的一种,返回0或者1个结果,也可以返回一个Optional。
举个例子,我们有如下代码:

1
2
3
4
public Person findCurrentUser() {
if (isAuthenticated()) return new Person("Jane", "Doe");
else return null;
}

在Java8中,我们可以这样写:

1
2
3
4
public Optional<Person> findCurrentUser() {
if (isAuthenticated()) return Optional.of(new Person("Jane", "Doe"));
else return Optional.empty();
}

如果我们使用响应式流,可以写成这样:

1
2
3
4
public Mono<Person> findCurrentUser() {
if (isAuthenticated()) return Mono.just(new Person("Jane", "Doe"));
else return Mono.empty();
}

Flux

Flux也是Publisher的一种,返回0或者多个结果,甚至可以返回无数个结果。通常将其用作,集合collection、数组array、或者流stream的响应式计数方式。
举个例子,我们有如下代码:

1
2
3
4
5
6
public List<Person> findAll() {
return Arrays.asList(
new Person("Jane", "Doe"),
new Person("John", "Doe")
);
}

在Java8中,我们可以用Stream来实现:

1
2
3
4
5
6
public Stream<Person> findAll() {
return Stream.of(
new Person("Jane", "Doe"),
new Person("John", "Doe")
);
}

如果我们使用Flux,可以这样实现:

1
2
3
4
5
6
public Flux<Person> findAll() {
return Flux.just(
new Person("Jane", "Doe"),
new Person("John", "Doe")
);
}

从上面可以看到,响应式编程,与函数式编程很类似。

订阅者Subscriber

以下代码会在控制台输出什么呢?

1
2
3
4
Flux
.just(1, 2, 3, 4)
.reduce(Integer::sum)
.log();

答案是什么都不会输出,响应式流使用push模型,每一项都按照发布者的速度推送到流上,而不管订阅者是否能够跟随。但是也不用担心,因为有背压(back pressure)的存在,可以保证正确。
上述代码通常会认为输出10,但是其实不是的。因为响应流失延迟的,也可以称为懒惰的,只要没有订阅者就不会启动。所以,订阅者也是必不可少的一部分。

这个跟JAVA8中的惰性求值、及早求值概念类似。

异步特性

发布者在本质上是异步的,然后并非总是异步的,是否异步,取决于发布者的类型。看如下代码:

1
2
3
4
5
6
AtomicInteger sum = new AtomicInteger(0);
Flux
.just(1, 2, 3, 4)
.reduce(Integer::sum)
.subscribe(sum::set);
log.info("Sum is: {}", sum.get());

不同的人可能有不同的答案:1、要么输出10,因为对数字求和;2、要么输出0,因为是异步操作,在执行log的时候,还未进行sum求和。
正确的答案是第一种:输出10。因为Flux.just()默认情况下使用当前线程,因此程序在执行到达日志语句时已经计算出了结果。
那下面这个代码会输出什么吗?

1
2
3
4
5
6
7
AtomicInteger sum = new AtomicInteger(0);
Flux
.just(1, 2, 3, 4)
.subscribeOn(Schedulers.elastic())
.reduce(Integer::sum)
.subscribe(sum::set);
logger.info("Sum is: {}", sum.get());

将会输出0,因为在这里使用了subscribeOn方法,将使得订阅者在异步线程执行。因此,根据响应流的性质,它可以是同步的,也可以是异步的。上述代码可以将logger打印采用lambda实现:

1
2
3
4
Flux
.just(1, 2, 3, 4)
.reduce(Integer::sum)
.susbcribe(sum -> logger.info("Sum is: {}", sum);

构建自己的流

Project Reactor提供了很多内置的发布者。然而,在某些情况下,我们必须创建自己的Publisher。Mono和Flux都提供了create()方法来构建自定义的流。
例如,我们想使用响应流的Twitter4J库,可以写成这样:

1
2
3
4
5
6
return Flux.create(sink -> {
TwitterStream twitterStream = new TwitterStreamFactory(configuration).getInstance();
twitterStream.onStatus(sink::next);
twitterStream.onException(sink::error);
sink.onCancel(twitterStream::shutdown);
});

这也是一个无限流的例子,因为推文的数量将永远持续下去(或者直到Twitter关闭)。

热流和冷流

热流称为Host Stream,冷流称为Cold Stream。两者的区别在于:当我们多个订阅者使用冷观察的时候,流将重新启动。热观察的时候,流将复用。默认情况下,流是冷流。
如下方代码,默认采用的是冷流:

1
2
3
4
5
6
7
8
9
Flux<Integer> numbers = Flux
.just(1, 2, 3, 4)
.log();
numbers
.reduce(Integer::sum)
.subscribe(sum -> logger.info("Sum is: {}", sum));
numbers
.reduce((a, b) -> a * b)
.subscribe(product -> logger.info("Product is: {}", product));

上面的例子中,1到4被发布两次,一次针对第一个订阅者,一次针对第二个订阅者。
但是在有些情况下,我们不希望流重头开始。例如HTTP Request,在这种情况下,我们可以使用热流。在Project Reactor中,我们可以使用share()方法(针对Flux)或者cache()方法(针对Mono),代码如下所示:

1
2
3
4
5
6
7
8
9
10
Flux<Integer> numbers = Flux
.just(1, 2, 3, 4)
.log()
.share();
numbers
.reduce(Integer::sum)
.subscribe(sum -> logger.info("Sum is: {}", sum));
numbers
.reduce((a, b) -> a * b)
.subscribe(product -> logger.info("Product is: {}", product));

通过share()方法,1到4只发布一次,被两个订阅者共享。