Reactive Programming
所谓响应式编程,通过实时监视计算流式数据,可以操作过程中的数据流,也可以经过一系列对数据流的操作展示最终结果
如下图所示
数据流一传入1,2,3,4四个数,我们可以对这条数据流进行求和,最后得到这条数据的总和10
也可以操作这条数据流,将数据流中的每个数乘以10,形成数据流二后再相加,得到100.
响应式编程是一种基于数据流(data stream)和变化传递(propagation of change)的声明式的变成规范
使用这种规范的编程方式大多数是偏向函数式编程
Reactive Stream
Java中也有操作流的支持,在JDK1.8之后引入了Java Stream,但是Java Stream无法解决两个问题:
- Java Stream是一种同步阻塞的API,在Web应用中,大量的请求都是同步阻塞的,IO阻塞会带来比较大的性能损耗,我们需要一种异步非阻塞响应式库
- Java Stream无法提供流量控制能力,如果流数据量非常大,来不及处理流中的数据,会造成数据积压
具备“异步非阻塞”和“流量控制”能力的数据流,我们称之为响应式流
目前,Java界实现了该响应式流规范的库主要有两个:RxJava和Reactor
生产者、消费者概念
响应式流中,使用一种观察者模式,数据的发出者叫Publisher, 监听者Subscriber,也可以分别较发布者和订阅者
关于背压
响应式流使用背压(backpress来实现流量控制,前面说过,如果生产者的速度大大超过消费者的速度,那么消费者来不及处理流中的数据,数据积压或者丢失,如果有一种机制能向上游反馈流量请求的机制,但来不及处理时,就要求生产者生产速度不要过快,这种机制就叫做背压
Reactor入门
想使用好Reactor框架,首先先会使用它的两个生产者类Flux和Mono
Flux对象代表一个包含0..N个元素的响应式流,Mono对象代表0或1个元素
生成流的形式有很多,可以通过数组、集合、just等1
2
3
4
5
6
7
8// 1、 通过数组
Integer[] array = new Integer[]{1,2,3,4,5,6};
Flux.fromArray(array);
// 2、 通过集合
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list);
// 3、 通过just函数
Flux.just(1,2,3);
声明流元素后,需要订阅(监听)里面的数据1
2
3Flux.just(1,2,3,4,5,6)
// 订阅流中的元素,组个输出
.subscribe(System.out::println);
Flux和Mono提供了多个subscribe方法,分别是流处理时的逻辑,流处理发生异常时的逻辑,流处理完成时的逻辑1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// 订阅并触发数据流
subscribe();
// 订阅并指定对正常数据元素如何处理
subscribe(Consumer<? super T> consumer);
// 订阅并定义对正常数据元素和错误信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理,以及订阅发生时的处理逻辑
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
Kotlin实现如下1
2
3
4// subscribe 方法才会触发数据流,才会开始消费数据
Flux.just(1,2,3,4,5,6).subscribe({ println("I am processing $it") },
{ println("I am execute error when process $it") },
{ println("Complete!") }
基础的流操作符
map
map操作可以将数据元素进行转换/映射,得到一个新的元素,新的流
1 | public final <V> Flux<V> map(Function<? super T,? extends V> mapper) |
flatMap
flatMap操作可以将每个元素映射成一个流,然后将这些流合并成为一个更大的数据流
- 注意,流的合并是异步的,先来先到,并非严格按照原始序列的顺序,如上图粉红色的新元素可以穿插在蓝色前面
1 | public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) |
- 注意,上面有休眠1s,flatMap转换操作里面加入了异步操作,delayElement可以开启异步,所以需要让主线程休眠
结果:
注意流顺序被打乱了
filter
filter操作可以过滤数据元素
1 | public final Flux<T> filter(Predicate<? super T> tester) |
其他
- “无副作用的peek”:doOnNext,doOnError,doOnComplete,doOnSubscribe,doOnCancel
- 过滤/挑选:take、first、last、sample、skip
- 用于线程调度:publishOn和subscribeOn方法
- 自定义生成数据流:create和generate