适用操作符 功能需求
concat|concatAll 首尾相连的方式
merge|mergeAll 先到先得的方式
zip|zipAll 一一对应的方式
combineLatest、combineAll、withLatestFrom 持续合并多个数据流中最新产生的数据
race 选取第一个产生内容的数据流
startWith 在数据流前添加一个指定数据
forkJoin 只获取多个数据流最后产生的数据
switch、exhaust 从高阶数据流中切换数据源

concat:收尾相连

能够把多个数组中的元素依次合并到一个数组中。

1
2
3
const source1$ = timer(0,1000).pipe(take(5),map(x => x + 'A'));
const source2$ = timer(500,1000).pipe(take(5),map(x => x + 'B'));
source1$.pipe(concat(source2$))

0A1A2A3A
0B1B2B3B
0A1A2A3A0B1B2B3B

merge:先到先得快速通过

依次订阅上游Observable对象,把接收到的数据转给下游,等待所有上游对象Observable完结。
如果是同步数据流,表现与concat一致,应用异步数据流较好。

1
2
3
const source1$ = timer(0,1000).pipe(map(x => x + 'A'));
const source2$ = timer(500,1000).pipe(map(x => x + 'B'));
source1.pipe(merge(source2$));

0A1A2A3A4A
0B1B2B3B4B
0A0B1A1B2A2B3A3B4A4B

zip:拉链式组合

zip
zip的含义为“拉链”一对一的合并,zip只要给这完结的Observable对象吐出的所有数据找到配对的数据,那么zip就会给下游一个complete信号。

1
2
3
4
5
6
7
const timer$ = interval(1000);
const source1$ = of(0,1,2,3);
const source2$ = of('a','b','c','d');
timer$.pipe(
zip(source1$, source2$),
map(([index, s1, s2]) => `${s1}${s2}`)
);

0a1b2c3d
当数据量较大的时候,多个数据流吐出数据不匹配时,会产生数据积压。

combineLatest:合并最后一个数据

1
2
3
const source1$ = timer(250,1000).pipe(take(4))
const source2$ = timer(750,1000).pipe(take(4))
source1$.pipe(combineLatest(source2$))

0123
0123
0,01,01,12,12,23,23,3

小缺陷(glitch)

1
2
3
4
5
6
const original$ = timer(0, 1000);
const source1$ = original$.pipe(map(x => `${x}a`));
const source2$ = original$.pipe(map(x => `${x}b`));
source1$.pipe(
combineLatest(source2$)
);

×2×2×2
整个数据管道都是有original$驱动,且没一秒钟只产生一个数据,但是在combineLatest之后产生两个数据。
[‘0a’, ‘0b’]
[‘1a’, ‘0b’]
[‘1a’, ‘1b’]
[‘2a’, ‘1b’]
[‘2a’, ‘2b’]
….
问题的原因在于,我们理解的‘同时’不是真正的同时

withLatestFrom-解决glitch

1
2
3
4
const original$ = timer(0, 1000);
const source1$ = original$.pipe(map(x => `${x}a`));
const source2$ = original$.pipe(map(x => `${x}b`));
source1$.pipe(withLatestFrom(source2$));

0a,0b1a,1b2a,2b3a,3b
[‘0a’, ‘0b’]
[‘1a’, ‘1b’]
[‘2a’, ‘2b’]
[‘3a’, ‘3b’]
原理为给下游推送的数据只能由上游Observable对象驱动。

combineLatest、withLatestFrom小结

  • 如果要合并完全独立的Observable对象,使用combineLatest
  • 如果要把一个Observable对象”映射”成新的数据流,同时要冲其他Observable对象获取”最新数据”,就是用withLatestFrom

race:胜者通吃

starWith:在数据流前添加内容

forkJoin

forkJoin就是RxJS届的Promise.all,等待所有输入的Promise对象成功之后把结果合并。

1
2
3
const source1$ = interval(1000).pipe(map(x => `${x}a`), take(1))
const source2$ = interval(1000).pipe(map(x => `${x}b`), take(3))
const forkJoin$ = forkJoin(source1$, source2$)

0a
0b1b2b

需要注意,forkJoin会吧所有输入Observable对象产生的最后一个数据合并成给下游唯一的数据。

高阶Observable

本质:以函数为参数,返回结果为函数

1
2
3
4
5
const highOrder$ = interval(1000).pipe(
take(2), map(x => interval(1500).pipe(
map(y => `${x}:${y}`),
take(2)
)))

0:00:11:01:1
从弹珠图上可以观察到,高阶Observable完结,不代表内部Observable完结。
其价值为用管理数据的方式来管理多个Observable对象。

操作高阶Observable的合并类操作符

  • concatAll
  • mergeAll
  • zipAll
  • combineAll

    进化的高阶Observable处理

    switch:切换输入Observable

    总是切换到最新内部Observable对象获取数据
    1
    2
    3
    4
    5
    6
    const highOrder$ = interval(1000).pipe(
    take(2), map(x => interval(1500).pipe(
    map(y => `${x}:${y}`),
    take(2)
    )))
    highOrder$.pipe(switch());
    0:00:11:01:1
    1:01:1

    exhaust:耗尽

    exhaust操作符的意思为:在耗尽当前内部Observable对象之前不会切换到写一个内部Observable对象
    1
    2
    3
    4
    5
    6
    7
    8
    const ho$ = interval(1000).pipe(
    take(3),
    map(x => interval(700).pipe(
    map(y => `${x}:${y}`),
    take(2)
    ))
    )
    ho$.pipe(exhaust())
    0:00:11:01:12:02:1
    0:00:12:02:1
    从图可以总结出来
    exhaust输出数据流 = 时间 + 生存能力:)