响应式编程

Reactive Programming

所谓响应式编程,通过实时监视计算流式数据,可以操作过程中的数据流,也可以经过一系列对数据流的操作展示最终结果
如下图所示

数据流一传入1,2,3,4四个数,我们可以对这条数据流进行求和,最后得到这条数据的总和10
也可以操作这条数据流,将数据流中的每个数乘以10,形成数据流二后再相加,得到100.

响应式编程是一种基于数据流(data stream)和变化传递(propagation of change)的声明式的变成规范
使用这种规范的编程方式大多数是偏向函数式编程

Reactive Stream

Java中也有操作流的支持,在JDK1.8之后引入了Java Stream,但是Java Stream无法解决两个问题:

  1. Java Stream是一种同步阻塞的API,在Web应用中,大量的请求都是同步阻塞的,IO阻塞会带来比较大的性能损耗,我们需要一种异步非阻塞响应式库
  2. Java Stream无法提供流量控制能力,如果流数据量非常大,来不及处理流中的数据,会造成数据积压

具备“异步非阻塞”和“流量控制”能力的数据流,我们称之为响应式流
目前,Java界实现了该响应式流规范的库主要有两个:RxJavaReactor

生产者、消费者概念

响应式流中,使用一种观察者模式,数据的发出者叫Publisher, 监听者Subscriber,也可以分别较发布者和订阅者

关于背压

响应式流使用背压(backpress来实现流量控制,前面说过,如果生产者的速度大大超过消费者的速度,那么消费者来不及处理流中的数据,数据积压或者丢失,如果有一种机制能向上游反馈流量请求的机制,但来不及处理时,就要求生产者生产速度不要过快,这种机制就叫做背压

Reactor入门

想使用好Reactor框架,首先先会使用它的两个生产者类FluxMono
Flux对象代表一个包含0..N个元素的响应式流,Mono对象代表0或1个元素

生成流的形式有很多,可以通过数组、集合、just等

1
2
3
4
5
6
7
8
// 1、 通过数组
Integer[] array = new Integer[]{1,2,3,4,5,6};
Flux.fromArray(array);
// 2、 通过集合
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list);
// 3、 通过just函数
Flux.just(1,2,3);

声明流元素后,需要订阅(监听)里面的数据

1
2
3
Flux.just(1,2,3,4,5,6)
// 订阅流中的元素,组个输出
.subscribe(System.out::println);

Flux和Mono提供了多个subscribe方法,分别是流处理时的逻辑,流处理发生异常时的逻辑,流处理完成时的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 订阅并触发数据流
subscribe();
// 订阅并指定对正常数据元素如何处理
subscribe(Consumer<? super T> consumer);
// 订阅并定义对正常数据元素和错误信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理,以及订阅发生时的处理逻辑
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);

Kotlin实现如下

1
2
3
4
// subscribe 方法才会触发数据流,才会开始消费数据
Flux.just(1,2,3,4,5,6).subscribe({ println("I am processing $it") },
{ println("I am execute error when process $it") },
{ println("Complete!") }

基础的流操作符

map

map操作可以将数据元素进行转换/映射,得到一个新的元素,新的流

1
2
3
4
5
6
7
8
9
public final <V> Flux<V> map(Function<? super T,? extends V> mapper)
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper)

// map
fun operatorMap() {
Flux.range(1,6)
.map { it * it } // 加工返回
.subscribe { println(it) } // 得到1,4,9,16,25,36
}

flatMap

flatMap操作可以将每个元素映射成一个流,然后将这些流合并成为一个更大的数据流

  • 注意,流的合并是异步的,先来先到,并非严格按照原始序列的顺序,如上图粉红色的新元素可以穿插在蓝色前面
1
2
3
4
5
6
7
8
9
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
public final <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)

// Java
Flux.just("flux", "mono")
.flatMap(s -> Flux.fromArray(s.split("\\s*"))
.delayElements(Duration.ofMillis(100)))
.subscribe(System.out::println);
Thread.sleep(1000);
  • 注意,上面有休眠1s,flatMap转换操作里面加入了异步操作,delayElement可以开启异步,所以需要让主线程休眠
    结果:

    注意流顺序被打乱了

filter

filter操作可以过滤数据元素

1
2
3
4
5
6
7
public final Flux<T> filter(Predicate<? super T> tester)
public final Mono<T> filter(Predicate<? super T> tester)

fun operatorFilter() =
Flux.range(1, 6)
.filter { it % 2 == 1 }
.subscribe { println(it) }

其他

  • “无副作用的peek”:doOnNext,doOnError,doOnComplete,doOnSubscribe,doOnCancel
  • 过滤/挑选:take、first、last、sample、skip
  • 用于线程调度:publishOn和subscribeOn方法
  • 自定义生成数据流:create和generate