const cellC2$ = cellA2$ .combineLatest(cellB2$) .map((cells) => cells[0] + cells[1]); cellC2$.subscribe((value) => { console.log(value); });
Core Concept 1: Pull model vs Push model
An an example for a pull
based code, we can think of a window.setInterval() that fires every 5000 seconds.
An example of a push would be to have a function fire and then the return continutes to filter, flatMap, map and subscribe.
Core Concept 2: Everything is a database
In the comparison where the $title.on('keyup', () => {})
runs with a promise returned. The query can run into race conditions.
Also note that every single result also fires.
The issues:
// Fix up and down arrow // Stop always querying // Getting race condition
Bad ways
if last query == currentTitle return
The Rx way
// npm install rxjs-es for es6 import $ from 'jquery'; import Rx from 'rxjs/Rx'; const $title = $('#title'); const $results = $('#results'); const keyUps$ = Rx.Observable.fromEvent($title, "keyup"); const queries$ = keyUps$ .map(e => e.target.value) .distinctUntilChanged() .debounceTime(250) .switchMap(getItems); // similar to merge, but if new query comes in, discard the old data //.mergeMap(getItems); // alias for flatMap queries$.subscribe(query => { // get rid of the promise will stop race condition $results.empty(); $results.append(items.map( r => $(`<li />`).text(r))); }) <!-- queries$.subscribe(query => { console.log(e); // prints out event getItems(query) .then(items => { $results.empty(); $results.append(items.map( r => $(`<li />`).text(r))); }); }) -->
An even better way.
import $ from 'jquery'; import Rx from 'rxjs/Rx'; const $title = $('#title'); const $results = $('#results'); Rx.Observable.fromEvent($title, 'keyup') .map((e) => e.target.value) .distinctUntilChanged() .debounceTime(500) .switchMap(getItems) .subscribe((items) => { $results.empty(); $results.append(items.map((r) => $(`<li />`).text(r))); });
Note, you can model anything in a reactive context by thinking a little bit differently.
Web API Request Example
import Rx from 'rxjs/Rx'; # promise will always execute - not lazy const promise = new Promise((resolve, reject) => { console.log("In promise"); resolve("hey"); }); promise.then(item => console.log(item)); # this doesn't give any output! # observables are lazy! # won't run without a subscription const simple$ = new Rx.Observable(observer => { console.log("Generating observable"); setTimeout(() => { observer.next("An items!"); setTimeout(() => { observer.next("Another item!"); observer.complete(); }, 1000); }, 1000); }); # creating a subscription # first arg is the next function # second arg is error # third arg is complete simple$.subscribe( item => console.log(`one.next ${item}`), error => console.log(`one.error ${item}`), () => console.log("one.complete") ); # Generating observable # one.next An item! # one.next Another item! # one.complete setTimeout(() => { simple$.subscribe({ next: item => console.log(`two.next ${item}`), error: error => console.log(`two.error ${item}`), complete: () => console.log("two.complete") }); }, 3000)
function createInterval(time) { return new Rx.Observable(observer => { let index = 0; let interval = setInterval(() => { observer.next(index++); }, time); return () => { // will run when we unsubscribe clearnInterval(interval); }; }); } function createSubscriber(tag) { return { next(item) { console.log(`${tag}.next ${item}`); }, error(error) { console.log(`${tag}.error ${error.stack || error }`); }, complete() { console.log(`${tag}.complete`); } }; } function take(observable, amount) { return new Rx.Observable(observer => { }); } // this is the core of subscriptions function take(sourceObservable, amount) { return new Rx.Observable(observer => { let count = 0; const subscription = sourceObservable.subscribe({ next(item) { observer.next(item); if (++count >= amount) { observer.complete(); } }, error(error) { observer.error(error); }, complete() { observer.complete(); } }); return () => subscription.unsubscribe(); }); } const everySecond_ = createInterval(1000); const firstFiveSeconds = take(everySecond_, 5); const subscription = everySecond_.subscribe(createSubscriber("one")); setTimeout(() => { subscription.unsubscribe(); }, 3500);
This subscription will console.log out forever and ever and ever... - unless, we dispose of a description
How do operators come into play?
We could run something like const subscription = everySecond_.take(3)subscribe(createSubscriber("one"));
The steps for it are that it listens for a source and emits a transformation!
import Rx from 'rxjs/Rx'; Rx.Observable.interval(500) .take(5) .subscribe(createSubscriber("interval")); Rx.Observable.timer(1000, 500) .take(3) .subscribe(createSubscriber("timer"); // note, array doesn't work - use from Rx.Observable.of("Hello world!", 42, "whoa") .subscribe(createSubscriber("of")); Rx.Observable.from(["Hello world!", 42, "whoa"]) .subscribe(createSubscriber("of")); Rx.Observable.from(generate()) .subscribe(createSubscriber("of")); Rx.Observable.from("hello world!") .subscribe(createSubscriber("of")); // it can also take in a generator function! function* generate() { yield 1; yield 5; yield "HEY"; } Rx.Observable.throw(new Error("Hey")) .subscribe(createSubscriber("error")); // empty Rx.Observable.empty() .subscribe(createSubscriber("empty")); // defer let sideEffect = 0; const defer = Rx.Observable.defer(() => { sideEffect++; return Rx.Obserable.of(sideEffect); }); defer.subscribe(createSubscriber("defer.one")); defer.subscribe(createSubscriber("defer.two")); defer.subscribe(createSubscriber("defer.three")); Rx.Observable.never() .subscribe(createSubscriber("never")); Rx.Observable.range(10, 30) .subscribe(createSubscriber("range"));
Benefits of the iterable from
?
Rx.Observable.fromEvent($title, 'keyup') .map((e) => e.target.value) .distinctUntilChanged() .debounceTime(500) .switchMap(getItems) .subscribe((items) => { $results.empty(); $results.append(items.map((i) => $('<li />').text(i))); });
NOTE: Without the subscribe, it will never be subscribed to the dom!
If we have the .take(10)
- it would complete after taking 10 and then furthermore unsubscribe and be great for performance!
fromEvent
calls from addEventListener
, so it can do powerful things like keyup
for those that don't initially support it.
import fs from 'fs'; fs.readdir('./src/server', (err, items) => { if (err) console.log(err); else { console.log(items); } }); // alternative const readdir = Rx.Observable.bindNodeCallBack(fs.readdir); readdir('./src/server') // mergeMap creates iterable converted from array .mergeMap((files) => Rx.Observable.from(files)) .map((file) => `MANIPULATED ${file}`) .subscribe(createSubscriber('readdir')); // promises function getItem() { return new Promise((resolve, reject) => { setTimeout(() => { resolve('Hello'); }, 1000); }); } Rx.Observable.fromPromise(getItem()).subscribe(createSubscriber('promise'));
Subjects are another Rx primitive. They are both an observable and a observer! Used to bridge non-reactive code with reactive code.
Behaviour, replay subjects etc.
Warning: you should only really consider them as a last resort when bridging non-reactive and reactive code.
const simple = new Rx.Subject(); simple.subscribe(createSubscriber('simple')); simple.next('Hello'); simple.next('World'); simple.complete(); const interval = Rx.Observable.interval(1000).take(5); const intervalSubject = new Rx.Subject(); intervalSubject.subscribe(interval); intervalSubject.subscribe(createSubscriber('sub1')); intervalSubject.subscribe(createSubscriber('sub2')); intervalSubject.subscribe(createSubscriber('sub3')); // subscribes after three seconds setTimeout(() => { intervalSubject.subscribe(createSubscriber('LOOK AT ME')); }, 3000);
Before, we had to invoke a function that call next
and complete
.
In the above example, intervalSubject is acting as a proxy to another observable.
// needs init state parameter const currentUser = new Rx.BehaviorSubject({ isLoggedIn: false }); const isLoggedIn = currentUser.map((u) => u.isLoggedIn); currentUser.next({ isLoggedIn: false }); isLoggedIn.subscribe(createSubscriber('isLoggedIn')); setTimeout(() => { currentUser.next({ isLoggedIn: true, name: 'nelson' }); }, 3000); setTimeout(() => { isLogged.subscribe(createSubscription('delayed')); }, 1500);
How do you remember multiple states?
const replay = new Rx.ReplaySubject(3); replay.next(1); replay.next(2); replay.subscribe(createSubscriber("one")); replay.next(3); replay.next(4); replay.next(5); // this subscription only gets the previous three items replay.subscribe(createSubscriber("two")); replay.next(6); // what you see one.next 1 one.next 2 one.next 3 one.next 4 one.next 5 two.next 3 two.next 4 two.next 5 one.next 6 two.next 6
Async Subjects
const apiCall = new Rx.AsyncSubject(); apiCall.next(1); apiCall.subscribe(createSubscriber("one")); apiCall.next(2); // only will emit the final item before it is complete apiCall.complete(); // if you subscribe to it again, that final value will be emitted setTimeout(() => { apiCall.subscribe(createSubscriber("two")); }, 2000); // output one.next 2 one.complete two.next 2 two.complete
Subject Summary
Sources:
fromEvent($title, 'keyup')
// this example shows when both start from the beginning eg cold import Rx from 'rxjs/Rx'; const interval = Rx.Observable.interval(1000).take(10); setTimeout(() => { interval.subscribe(createSubscriber('one')); }, 1200); setTimeout(() => { interval.subscribe(createSubscriber('two')); }, 3200); // HOT // connectable observable import Rx from 'rxjs/Rx'; const interval = Rx.Observable.interval(1000) .take(10) .publish(); interval.connect(); setTimeout(() => { interval.subscribe(createSubscriber('one')); }, 1200); setTimeout(() => { interval.subscribe(createSubscriber('two')); }, 3200); // if you connect after a set interval, then it begins executing and sharing the underlying observable
Why would you want a hot variable?
// here subscribe console.log runs twice const socket = { on: () => {} }; const chatMessage = new Rx.Observable((observable) => { console.log('subscribed'); socket.on('chat:message', (message) => observer.next(message)); }); chatMessage.subscribe(createSubscriber('one')); chatMessage.subscribe(createSubscriber('two')); // without it const socket = { on: () => {} }; const chatMessage = new Rx.Observable((observable) => { console.log('subscribed'); socket.on('chat:message', (message) => observer.next(message)); }).publish(); chatMessage.connect(); chatMessage.subscribe(createSubscriber('one')); chatMessage.subscribe(createSubscriber('two')); // using publishLast() const simple = new Rx.Observable((observer) => { observer.next('one'); observer.next('two'); observer.complete(); }); // always returns the last value const published = simple.publishLast(); // even if we subscribe before connect, both will get the last value published.subscribe(createSubscriber('one')); published.connect(); published.subscribe(creaSubscriber('two')); // using publishReplay() const simple = new Rx.Observable((observer) => { observer.next('one'); observer.next('two'); observer.next('three'); return () => console.log('Disposed'); }); // always returns the last value const published = simple.publishReplay(2); // even if we subscribe before connect, both will get the last value // to dispose without running complete, we need to disconnect by unsubscribing const sub1 = published.subscribe(createSubscriber('one')); const connection = published.connect(); const sub2 = published.subscribe(creaSubscriber('two')); sub1.unsubscribe(); sub2.unsubscribe(); connection.unsubscribe();
Refcount is a way to automatically handle the connection and the unsubscription of a connection observable.
It will connect to the first subscription and then disconnected on the last unsubscribe.
// using refCount() const simple = new Rx.Observable(observer => { observer.next("one"); observer.next("two"); observer.next("three"); return () => console.log("Disposed"); }); // always returns the last value const published = simple.publishReplay(2).refCount(); // even if we subscribe before connect, both will get the last value // to dispose without running complete, we need to disconnect by unsubscribing const sub1 = published.subscribe(createSubscriber("one")); const sub2 = published.subscribe(creaSubscriber("two")); sub1.unsubscribe(); sub2.unsubscribe();
The publish().refCount()
is done so often, that is has been turned in share()
.
Taxing processes that you don't want to repeat but you want multiple things to hook into the result, then turn it into a hot subscription.
Now we will just talk about the different primary operators that you will work with.
// do => get the next value and pass it back unchanged // finally => only completes after the range has completed, runs right at the end of the final value // filter => filters out given statement // interval => call timeout // startWith => set initial value Rx.Observable.range(1, 10) .do((a) => console.log(`From do ${a}`)) .map((a) => a * a) .subscribe(createSubscriber('simple')); Rx.Observable.range(1, 10) .finally(() => console.log(`From finally`)) .map((a) => a * 2) .subscribe(createSubscriber('finally')); Rx.Observable.range(1, 10) .filter((a) => a < 5) .map((a) => a * 2) .subscribe(createSubscriber('filter')); Rx.Observable.interval(1000) .startWith(-1) .subscribe(createSubscriber('interval'));
// merge - merge many observables togethers // concat - this concatenates observables to the end of another, can also take a list of Observables Rx.Observable.interval(1000) .merge(Rx.Observable.interval(500)) .take(5) .subscribe(createSubscriber("merge1")); Rx.Observable.merge( Rx.Observable.interval(1000).map(i => `${i} seconds), Rx.Observable.interval(500).map(i => `${i} half seconds)) .take(5) .subscribe(createSubscriber('merge2')); // different events for merged observables Rx.Observable.merge( socket.on$("login").map(user => processUser(user), socket.on$("logout").map(() => null)); Rx.Observable.range(1, 5) .concat(Rx.Observable.range(10,3)) .subscribe(createSubscriber("concat1"));
// map - a projection on every item that comes in // mergeMap - select many, does projection and then has another thing that we will work on // switchMap - similar to mergeMap but replaces with the latest value if another emission comes in function arrayMap(arr, proj) { let returnArray = []; for (let i of arr) { returnArray.push(proj(item)); } return returnArray; } arrayMap([1, 2, 3], (a) => a * a); // imagine array of dicts const albums = [{}, {}]; function arrayMergeMap(arr, proj) { let returnArray = []; for (let i of arr) { let projArray = proj(item); for (let j of projArray) { returnArray.push(proj(item)); } } return returnArray; } const tracks = arrayMergeMap(albums, (album) => album.tracks); Rx.Observable.range(1, 3) .mergeMap((i) => Rx.Observable.timer(i * 1000).map(() => `After ${i} seconds`) ) .subscribe(createSubscriber('mergeMap')); Rx.Observable.fromPromise(getTracks()) .mergeMap((tracks) => Rx.Observable.from(tracks)) .subscribe(createSubscriber('tracks')); function getTracks() { return new Promise((resolve, reject) => { setTimeout(() => { resolve(['track 1', 'track 2', 'track 3']); }, 1000); }); } // synchronous example Rx.Observable.of('my query') .do(() => console.log('Querying')) .mergeMap((a) => query(a)) .do(() => console.log('After querying')) .subscribe(createSubscriber('query')); function query(value) { return new Promise((resolve, reject) => { setTimeout(() => { resolve('This is the resolved value'); }, 1000); }); } // switch map
// reducer (acc, value) and works on value - doesn't emit until the completion // scan - processes and emits as it comes in Rx.Observable.range(1, 10) .reduce((acc, value) => acc + value) .subscribe(createSubscriber('reduce')); Rx.Observable.range(1, 10) .scan((acc, value) => acc + value) .subscribe(createSubscriber('scan'));
There have been some big changes to how buffer
has been used.
Buffer takes in an observable.
toArray will convert results into an array. - still has a clean exit if the never() is implemented!
Rx.Observable.range(1, 100) .bufferCount(25) .subscribe(createSubscriber("items"); // will take 25 items and pushing them into an array Rx.Observable.interval(500) .bufferTime(2000) .subscribe(createSubscriber("bufferTime"); // same behaviour! // emitting event causes buffer to flush Rx.Observable.interval(500) .buffer(Rx.Observable.interval(2000)) .subscribe(createSubscriber("buffer"); // // toArray // Rx.Observable.range(1, 10) .toArray() .subscribe(createSubscriber("range"));
const simple = new Rx.Observable((observer) => { console.log('Generating sequence'); observer.next(1); observer.next(2); observer.next(3); observer.next(4); observer.complete(); }); simple.first().subscribe(createSubscriber('first')); simple.last().subscribe(createSubscriber('last')); // displays 1 & 4 // if nothing is in there, there are EmptyError(s) thrown // single.error thrown is more than one error thrown simple.single().subscribe(createSubscriber('single')); // take and skip won't throw errors // take does the first however emissions // skip will take the emissions after a number simple.take(2).subscribe(createSubscriber('take')); simple.skip(2).subscribe(createSubscriber('skip')); // 3, 4 simple .skip(2) .take(2) .subscribe(createSubscriber('skip')); // skipWhile / takeWhile Rx.Observable.interval(500) .skipWhile((i) => i < 4) .takeWhile((i) => i < 10) .subscribe(createSubscriber('skipWhile/takeWhile')); // what's until and take emissions until Rx.Observable.interval(500) .skipUntil(Rx.Observable.timer(1000)) .takeUntil(Rx.Observable.timer(4000)) .subscribe(createSubscriber('skipUntil'));
How can we combine observables in different ways?
function arrayZip(arr1, arr2, selectorFunc) { const count = Math.min(arr1.length, arr2.length); const results = []; for (let i = 0; i < count; i++) { const combined = selector(arr1[i], arr2[i]); results.push(combined); } return results; } const arr1 = [32, 2, 52, 43, 54]; const arr2 = [1, 0, 10, 4, 1, 4, 6, 2]; const results = arrayZip(arr1, arr2, (left, right) => left * right); console.log(results); // in RxJS Rx.Observable.range(1.1) .zip( Rx.Observable.interval(500), (left, right) => `item: ${left}, at ${right * 500}` ) .subscribe(createSubscriber('zip')); // emits value when source emits // can also pass (left, right) function like zip as second parameter Rx.Observable.interval(1000) .withLatestFrom(Rx.Observable.interval(500)) .subscribe(createSubscriber('withLatestFrom')); // emit value if either do Rx.Observable.interval(1000) .combineLatest(Rx.Observable.interval(500)) .subscribe(createSubscriber('withLatestFrom'));
If an error happens, an observer stops emitting and can prevent values from emitting at all. Error handling is very important!
.catch(error => Rx.Observable.of(error))
can pass this down as an Observable.
.retry()
we can pass in with a numeral to ensure that we either keep retrying or retry a certain number of times.