Order Of Execution For RxJS Observable From Chained Promises
Solution 1:
While trying to express the question to be well described I was still investigating and understanding the issue better and better. The main point is that first promise this.cache.set(this.key, value)
may be actually immediately resolved while Observable.fromPromise
makes no guarantee in which order all the following chained promises are being resolved.
The problem was caused by the fact that the last promise from each chain was executed only after state was altered by first promise of the last chain (thus VALUE 2
).
After all the solution looks pretty simple from code point of view yet it is not that obvious as consist of two key changes:
- defer the initial promise execution until
mergeAll
phase by usingObservable.defer
instead ofObservable.fromPromise
- limit (or actually disable) the concurrency of merging the promises by using
mergeAll(1)
thus working solution looks like this:
class CacheValueObservable {
private cache: DumbCache;
constructor(private key: string) {
this.cache = new DumbCache();
}
/*
* meta observer to handle promises from observables with all results and errors
* thanks to ReplaySubject(1) current value is available immediately after subscribing
*/
private _valueSource$$ = new Rx.ReplaySubject(1);
// disable merge concurrency
private _value$ = this._valueSource$$.mergeAll(1);
public value$() { return this._value$; }
public updateValue(value) {
this._valueSource$$.next(
// defer promise resolution to ensure they will be fully resolved
// one by one thanks to no concurrency in mergeAll
Rx.Observable.defer(() =>
this.cache.set(this.key, value)
.then(() => this.cache.get(this.key))
)
);
}
}
and here is live example: http://jsbin.com/bikawo/edit?html,js,console
Post a Comment for "Order Of Execution For RxJS Observable From Chained Promises"