解谜RxJS操作符-合并
/ / 点击适用操作符 | 功能需求 |
---|---|
concat|concatAll | 以首尾相连的方式 |
merge|mergeAll | 以先到先得的方式 |
zip|zipAll | 以一一对应的方式 |
combineLatest、combineAll、withLatestFrom | 持续合并多个数据流中最新产生的数据 |
race | 选取第一个产生内容的数据流 |
startWith | 在数据流前添加一个指定数据 |
forkJoin | 只获取多个数据流最后产生的数据 |
switch、exhaust | 从高阶数据流中切换数据源 |
concat:收尾相连
能够把多个数组中的元素依次合并到一个数组中。
1 | const source1$ = timer(0,1000).pipe(take(5),map(x => x + 'A')); |
merge:先到先得快速通过
依次订阅上游Observable对象,把接收到的数据转给下游,等待所有上游对象Observable完结。
如果是同步数据流,表现与concat一致,应用异步数据流较好。
1 | const source1$ = timer(0,1000).pipe(map(x => x + 'A')); |
zip:拉链式组合
zip的含义为“拉链”一对一的合并,zip只要给这完结的Observable对象吐出的所有数据找到配对的数据,那么zip就会给下游一个complete信号。
1 | const timer$ = interval(1000); |
当数据量较大的时候,多个数据流吐出数据不匹配时,会产生数据积压。
combineLatest:合并最后一个数据
1 | const source1$ = timer(250,1000).pipe(take(4)) |
小缺陷(glitch)
1 | const original$ = timer(0, 1000); |
整个数据管道都是有original$驱动,且没一秒钟只产生一个数据,但是在combineLatest之后产生两个数据。
[‘0a’, ‘0b’]
[‘1a’, ‘0b’]
[‘1a’, ‘1b’]
[‘2a’, ‘1b’]
[‘2a’, ‘2b’]
….
问题的原因在于,我们理解的‘同时’不是真正的同时
withLatestFrom-解决glitch
1 | const original$ = timer(0, 1000); |
[‘0a’, ‘0b’]
[‘1a’, ‘1b’]
[‘2a’, ‘2b’]
[‘3a’, ‘3b’]
原理为给下游推送的数据只能由上游Observable对象驱动。
combineLatest、withLatestFrom小结
- 如果要合并完全独立的Observable对象,使用combineLatest
- 如果要把一个Observable对象”映射”成新的数据流,同时要冲其他Observable对象获取”最新数据”,就是用withLatestFrom
race:胜者通吃
starWith:在数据流前添加内容
forkJoin
forkJoin就是RxJS届的Promise.all,等待所有输入的Promise对象成功之后把结果合并。
1 | const source1$ = interval(1000).pipe(map(x => `${x}a`), take(1)) |
需要注意,forkJoin会吧所有输入Observable对象产生的最后一个数据合并成给下游唯一的数据。
高阶Observable
本质:以函数为参数,返回结果为函数
1 | const highOrder$ = interval(1000).pipe( |
从弹珠图上可以观察到,高阶Observable完结,不代表内部Observable完结。
其价值为用管理数据的方式来管理多个Observable对象。
操作高阶Observable的合并类操作符
- concatAll
- mergeAll
- zipAll
- combineAll
进化的高阶Observable处理
switch:切换输入Observable
总是切换到最新内部Observable对象获取数据1
2
3
4
5
6const highOrder$ = interval(1000).pipe(
take(2), map(x => interval(1500).pipe(
map(y => `${x}:${y}`),
take(2)
)))
highOrder$.pipe(switch());exhaust:耗尽
exhaust操作符的意思为:在耗尽当前内部Observable对象之前不会切换到写一个内部Observable对象1
2
3
4
5
6
7
8const ho$ = interval(1000).pipe(
take(3),
map(x => interval(700).pipe(
map(y => `${x}:${y}`),
take(2)
))
)
ho$.pipe(exhaust())
从图可以总结出来
exhaust输出数据流 = 时间 + 生存能力:)