Observable

Observable 是 RxJS 中的基础,是所有数据处理过程的数据来源。Observable 实例在创建时可以使用一个带有next()error()complete()方法的对象,其中next()方法用于发出一个数据,error()方法用于发出一个错误,complete()方法用于结束 Observable 实例的数据发送。

以下示例实现了一个简单的 Observable 数据发送。

import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  setTimeout(() => {
    subscriber.next(3);
    subscriber.complete();
  }, 1000);
});

当这个 Observable 创建完成以后,其中的数据并不会发送出来,必须要有订阅才能开始发送数据,一个 Observable 实例只能被一个 Observer 订阅。对于 Observable 的订阅可以参考以下示例。

observable.subscribe({
  next(x) {
    console.log(x);
  },
  error(err) {
    console.error(err);
  },
  complete() {
    console.log('Done');
  }
});

可以看出对 Observable 的订阅就是针对 Observable 中三种方法发出数据的对应处理。通过.subscribe()方法会返回一个Subscription类型实例,可以通过其注销掉已经建立的订阅。.subscribe()方法可以直接接受三个函数,分别对next()error()complete()进行相应,并且其中的一些可以省略。

Observable 还提供了一个.pipe()方法,其中可以传入一系列的 Operator,这些 Operator 将会按照.pipe()中的排列顺序对 Observable 发出的数据进行操作,经过.pipe()处理的数据将会传递给 Observer。