孤独プログラマー譚

孤独死が近い。

RxJS awaitをRxJSを書き直す

awaitの連続。

(async () => {
  let foo = await getPromise()
  foo = await getPromise(foo)
  foo = await getPromise(foo)
  cl(foo) // 3
})()

function getPromise(v = 0) {
  return new Promise((resolve) => {
    setTimeout(() => resolve(v + 1), 1000);
  });
}


thenのメソッドチェーンで書き換え。

let bar = getPromise()
.then(getPromise)
.then(getPromise)
.then(cl)


subscribeメソッドの入れ子。見た目が良くない。

obs()
.subscribe((v) => {
  obs(v).subscribe((v) => {
    obs(v).subscribe(cl)
  })
})

function obs(v = 0) {
  return rx.Observable.create((observer) => {
    setTimeout(() => {
      observer.next(v + 1)
      observer.complete()
    }, 1000)
  })
}


これが本命。mergeMapを使う。

obs()
.pipe(
  op.mergeMap(obs),
  op.mergeMap(obs),
)
.subscribe(cl)

awaitって何してるのか

今さらだが、今まで何となくawaitを使っていたけど、そもそも何をしてるのか。

(async () => {
  let promise = new Promise((resolve) => {
    setTimeout(() => resolve(123), 1000);
  });
  let foo = await promise
  cl(foo) // 123
})()

await は promiseオブジェクトからresolveされた値を取り出す。

RxJS mergeの使いどころ

複数のObservable、同一のsubscribeメソッドの時は、mergeでまとめることが出来る。


まとめる前。

let obs1  = obs(1)
let obs2  = obs(2)
obs1.subscribe(cl) // 1秒後に発火
obs2.subscribe(cl) // 2秒後に発火


まとめた後。

let obs1  = obs(1)
let obs2  = obs(2)
rx.merge(obs1,obs2)
.subscribe(cl) // 1秒後に発火, 2秒後に発火


共通メソッド。

function obs(t = 1) {
  return rx.Observable.create((observer) => {
    setTimeout(() => {
      observer.next(1)
    }, t * 1000)
  })
}

function cl(v) {
  console.log(v)
}

RxJS pluck

オブジェクトのプロパティ名を指定して値を返却できるオペレータ。
mapオペレータで十分な気がする。

let obs = of(
  { a: 1, b: { c: 2 } },
  { a: 3, b: { c: 4 } },
  { a: 5, b: { c: 6 } },
)

obs.pipe(
  op.pluck('a')
).subscribe(cl) // 1,3,5

obs.pipe(
  op.pluck('b', 'c')
).subscribe(cl) // 2,4,6

obs.pipe(
  op.map(x => x.a)
).subscribe(cl) // 1,3,5

RxJS exhaustMap

初めのストリームが優先して実行される。
実行中に後から来たストリームは、キャンセルされる。
switchMapと逆バージョンのようなイメージ。

of(0, 1, 2).pipe(
  exhaustMap(x => sto(x + 1))
).subscribe(cl) // 1

timer(0, 500).pipe(
  exhaustMap(x => sto(x, 1000))
).subscribe(cl) // 0,2,4,6...

timer(0, 1000).pipe(
  exhaustMap(x => sto(x, 1000))
).subscribe(cl) // 0,1,2,3...

RxJS オペレータscan, reduceの違い

scanはストリームごとにnextを発火する。

of(1, 2, 3, 4).pipe(
  scan((acc, val) => acc + val, 10)
)
.subscribe(cl) // 11, 13, 16, 20


reduceは、最後のストリームだけ、nextを発火する。

of(1, 2, 3, 4).pipe(
  reduce((acc, val) => acc + val, 100)
)
.subscribe(cl) // 120

RxJS 結合オペレータ覚え書き

let obs1 = sto(1)
let obs2 = sto(2)

merge(obs1, obs2).subscribe(cl) // 1秒後に1, 2
concat(obs1, obs2).subscribe(cl) // 1秒後に1, 2秒後に2
forkJoin(obs1, obs2).subscribe(cl) // 1秒後に[1, 2]
zip(of(1,2), of(3,4)).subscribe(cl) // 1秒後に[1, 3] [2, 4]



共通関数。

function sto(x, time = 1000)  {
  const observable = new Observable(subscriber => {
    setTimeout(() => {
      subscriber.next(x)
      subscriber.complete()
    }, time);
  });
  return observable
}

function cl(x) {
  console.log(x)
}