const cellC2$ = cellA2$ .combineLatest(cellB2$) .map((cells) => cells[0] + cells[1]); cellC2$.subscribe((value) => { console.log(value); });
Core Concept 1: Pull model vs Push model
An an example for a pull based code, we can think of a window.setInterval() that fires every 5000 seconds.
An example of a push would be to have a function fire and then the return continutes to filter, flatMap, map and subscribe.
Core Concept 2: Everything is a database
In the comparison where the $title.on('keyup', () => {}) runs with a promise returned. The query can run into race conditions.
Also note that every single result also fires.
The issues:
// Fix up and down arrow // Stop always querying // Getting race condition
Bad ways
if last query == currentTitle returnThe Rx way
// npm install rxjs-es for es6 import $ from 'jquery'; import Rx from 'rxjs/Rx'; const $title = $('#title'); const $results = $('#results'); const keyUps$ = Rx.Observable.fromEvent($title, "keyup"); const queries$ = keyUps$ .map(e => e.target.value) .distinctUntilChanged() .debounceTime(250) .switchMap(getItems); // similar to merge, but if new query comes in, discard the old data //.mergeMap(getItems); // alias for flatMap queries$.subscribe(query => { // get rid of the promise will stop race condition $results.empty(); $results.append(items.map( r => $(`<li />`).text(r))); }) <!-- queries$.subscribe(query => { console.log(e); // prints out event getItems(query) .then(items => { $results.empty(); $results.append(items.map( r => $(`<li />`).text(r))); }); }) -->
An even better way.
import $ from 'jquery'; import Rx from 'rxjs/Rx'; const $title = $('#title'); const $results = $('#results'); Rx.Observable.fromEvent($title, 'keyup') .map((e) => e.target.value) .distinctUntilChanged() .debounceTime(500) .switchMap(getItems) .subscribe((items) => { $results.empty(); $results.append(items.map((r) => $(`<li />`).text(r))); });
Note, you can model anything in a reactive context by thinking a little bit differently.
Web API Request Example
import Rx from 'rxjs/Rx'; # promise will always execute - not lazy const promise = new Promise((resolve, reject) => { console.log("In promise"); resolve("hey"); }); promise.then(item => console.log(item)); # this doesn't give any output! # observables are lazy! # won't run without a subscription const simple$ = new Rx.Observable(observer => { console.log("Generating observable"); setTimeout(() => { observer.next("An items!"); setTimeout(() => { observer.next("Another item!"); observer.complete(); }, 1000); }, 1000); }); # creating a subscription # first arg is the next function # second arg is error # third arg is complete simple$.subscribe( item => console.log(`one.next ${item}`), error => console.log(`one.error ${item}`), () => console.log("one.complete") ); # Generating observable # one.next An item! # one.next Another item! # one.complete setTimeout(() => { simple$.subscribe({ next: item => console.log(`two.next ${item}`), error: error => console.log(`two.error ${item}`), complete: () => console.log("two.complete") }); }, 3000)
function createInterval(time) { return new Rx.Observable(observer => { let index = 0; let interval = setInterval(() => { observer.next(index++); }, time); return () => { // will run when we unsubscribe clearnInterval(interval); }; }); } function createSubscriber(tag) { return { next(item) { console.log(`${tag}.next ${item}`); }, error(error) { console.log(`${tag}.error ${error.stack || error }`); }, complete() { console.log(`${tag}.complete`); } }; } function take(observable, amount) { return new Rx.Observable(observer => { }); } // this is the core of subscriptions function take(sourceObservable, amount) { return new Rx.Observable(observer => { let count = 0; const subscription = sourceObservable.subscribe({ next(item) { observer.next(item); if (++count >= amount) { observer.complete(); } }, error(error) { observer.error(error); }, complete() { observer.complete(); } }); return () => subscription.unsubscribe(); }); } const everySecond_ = createInterval(1000); const firstFiveSeconds = take(everySecond_, 5); const subscription = everySecond_.subscribe(createSubscriber("one")); setTimeout(() => { subscription.unsubscribe(); }, 3500);
This subscription will console.log out forever and ever and ever... - unless, we dispose of a description
How do operators come into play?
We could run something like const subscription = everySecond_.take(3)subscribe(createSubscriber("one"));
The steps for it are that it listens for a source and emits a transformation!
import Rx from 'rxjs/Rx'; Rx.Observable.interval(500) .take(5) .subscribe(createSubscriber("interval")); Rx.Observable.timer(1000, 500) .take(3) .subscribe(createSubscriber("timer"); // note, array doesn't work - use from Rx.Observable.of("Hello world!", 42, "whoa") .subscribe(createSubscriber("of")); Rx.Observable.from(["Hello world!", 42, "whoa"]) .subscribe(createSubscriber("of")); Rx.Observable.from(generate()) .subscribe(createSubscriber("of")); Rx.Observable.from("hello world!") .subscribe(createSubscriber("of")); // it can also take in a generator function! function* generate() { yield 1; yield 5; yield "HEY"; } Rx.Observable.throw(new Error("Hey")) .subscribe(createSubscriber("error")); // empty Rx.Observable.empty() .subscribe(createSubscriber("empty")); // defer let sideEffect = 0; const defer = Rx.Observable.defer(() => { sideEffect++; return Rx.Obserable.of(sideEffect); }); defer.subscribe(createSubscriber("defer.one")); defer.subscribe(createSubscriber("defer.two")); defer.subscribe(createSubscriber("defer.three")); Rx.Observable.never() .subscribe(createSubscriber("never")); Rx.Observable.range(10, 30) .subscribe(createSubscriber("range"));
Benefits of the iterable from?
Rx.Observable.fromEvent($title, 'keyup') .map((e) => e.target.value) .distinctUntilChanged() .debounceTime(500) .switchMap(getItems) .subscribe((items) => { $results.empty(); $results.append(items.map((i) => $('<li />').text(i))); });
NOTE: Without the subscribe, it will never be subscribed to the dom!
If we have the .take(10) - it would complete after taking 10 and then furthermore unsubscribe and be great for performance!
fromEvent calls from addEventListener, so it can do powerful things like keyup for those that don't initially support it.
import fs from 'fs'; fs.readdir('./src/server', (err, items) => { if (err) console.log(err); else { console.log(items); } }); // alternative const readdir = Rx.Observable.bindNodeCallBack(fs.readdir); readdir('./src/server') // mergeMap creates iterable converted from array .mergeMap((files) => Rx.Observable.from(files)) .map((file) => `MANIPULATED ${file}`) .subscribe(createSubscriber('readdir')); // promises function getItem() { return new Promise((resolve, reject) => { setTimeout(() => { resolve('Hello'); }, 1000); }); } Rx.Observable.fromPromise(getItem()).subscribe(createSubscriber('promise'));
Subjects are another Rx primitive. They are both an observable and a observer! Used to bridge non-reactive code with reactive code.
Behaviour, replay subjects etc.
Warning: you should only really consider them as a last resort when bridging non-reactive and reactive code.
const simple = new Rx.Subject(); simple.subscribe(createSubscriber('simple')); simple.next('Hello'); simple.next('World'); simple.complete(); const interval = Rx.Observable.interval(1000).take(5); const intervalSubject = new Rx.Subject(); intervalSubject.subscribe(interval); intervalSubject.subscribe(createSubscriber('sub1')); intervalSubject.subscribe(createSubscriber('sub2')); intervalSubject.subscribe(createSubscriber('sub3')); // subscribes after three seconds setTimeout(() => { intervalSubject.subscribe(createSubscriber('LOOK AT ME')); }, 3000);
Before, we had to invoke a function that call next and complete.
In the above example, intervalSubject is acting as a proxy to another observable.
// needs init state parameter const currentUser = new Rx.BehaviorSubject({ isLoggedIn: false }); const isLoggedIn = currentUser.map((u) => u.isLoggedIn); currentUser.next({ isLoggedIn: false }); isLoggedIn.subscribe(createSubscriber('isLoggedIn')); setTimeout(() => { currentUser.next({ isLoggedIn: true, name: 'nelson' }); }, 3000); setTimeout(() => { isLogged.subscribe(createSubscription('delayed')); }, 1500);
How do you remember multiple states?
const replay = new Rx.ReplaySubject(3); replay.next(1); replay.next(2); replay.subscribe(createSubscriber("one")); replay.next(3); replay.next(4); replay.next(5); // this subscription only gets the previous three items replay.subscribe(createSubscriber("two")); replay.next(6); // what you see one.next 1 one.next 2 one.next 3 one.next 4 one.next 5 two.next 3 two.next 4 two.next 5 one.next 6 two.next 6
Async Subjects
const apiCall = new Rx.AsyncSubject(); apiCall.next(1); apiCall.subscribe(createSubscriber("one")); apiCall.next(2); // only will emit the final item before it is complete apiCall.complete(); // if you subscribe to it again, that final value will be emitted setTimeout(() => { apiCall.subscribe(createSubscriber("two")); }, 2000); // output one.next 2 one.complete two.next 2 two.complete
Subject Summary
Sources:
fromEvent($title, 'keyup')// this example shows when both start from the beginning eg cold import Rx from 'rxjs/Rx'; const interval = Rx.Observable.interval(1000).take(10); setTimeout(() => { interval.subscribe(createSubscriber('one')); }, 1200); setTimeout(() => { interval.subscribe(createSubscriber('two')); }, 3200); // HOT // connectable observable import Rx from 'rxjs/Rx'; const interval = Rx.Observable.interval(1000) .take(10) .publish(); interval.connect(); setTimeout(() => { interval.subscribe(createSubscriber('one')); }, 1200); setTimeout(() => { interval.subscribe(createSubscriber('two')); }, 3200); // if you connect after a set interval, then it begins executing and sharing the underlying observable
Why would you want a hot variable?
// here subscribe console.log runs twice const socket = { on: () => {} }; const chatMessage = new Rx.Observable((observable) => { console.log('subscribed'); socket.on('chat:message', (message) => observer.next(message)); }); chatMessage.subscribe(createSubscriber('one')); chatMessage.subscribe(createSubscriber('two')); // without it const socket = { on: () => {} }; const chatMessage = new Rx.Observable((observable) => { console.log('subscribed'); socket.on('chat:message', (message) => observer.next(message)); }).publish(); chatMessage.connect(); chatMessage.subscribe(createSubscriber('one')); chatMessage.subscribe(createSubscriber('two')); // using publishLast() const simple = new Rx.Observable((observer) => { observer.next('one'); observer.next('two'); observer.complete(); }); // always returns the last value const published = simple.publishLast(); // even if we subscribe before connect, both will get the last value published.subscribe(createSubscriber('one')); published.connect(); published.subscribe(creaSubscriber('two')); // using publishReplay() const simple = new Rx.Observable((observer) => { observer.next('one'); observer.next('two'); observer.next('three'); return () => console.log('Disposed'); }); // always returns the last value const published = simple.publishReplay(2); // even if we subscribe before connect, both will get the last value // to dispose without running complete, we need to disconnect by unsubscribing const sub1 = published.subscribe(createSubscriber('one')); const connection = published.connect(); const sub2 = published.subscribe(creaSubscriber('two')); sub1.unsubscribe(); sub2.unsubscribe(); connection.unsubscribe();
Refcount is a way to automatically handle the connection and the unsubscription of a connection observable.
It will connect to the first subscription and then disconnected on the last unsubscribe.
// using refCount() const simple = new Rx.Observable(observer => { observer.next("one"); observer.next("two"); observer.next("three"); return () => console.log("Disposed"); }); // always returns the last value const published = simple.publishReplay(2).refCount(); // even if we subscribe before connect, both will get the last value // to dispose without running complete, we need to disconnect by unsubscribing const sub1 = published.subscribe(createSubscriber("one")); const sub2 = published.subscribe(creaSubscriber("two")); sub1.unsubscribe(); sub2.unsubscribe();
The publish().refCount() is done so often, that is has been turned in share().
Taxing processes that you don't want to repeat but you want multiple things to hook into the result, then turn it into a hot subscription.
Now we will just talk about the different primary operators that you will work with.
// do => get the next value and pass it back unchanged // finally => only completes after the range has completed, runs right at the end of the final value // filter => filters out given statement // interval => call timeout // startWith => set initial value Rx.Observable.range(1, 10) .do((a) => console.log(`From do ${a}`)) .map((a) => a * a) .subscribe(createSubscriber('simple')); Rx.Observable.range(1, 10) .finally(() => console.log(`From finally`)) .map((a) => a * 2) .subscribe(createSubscriber('finally')); Rx.Observable.range(1, 10) .filter((a) => a < 5) .map((a) => a * 2) .subscribe(createSubscriber('filter')); Rx.Observable.interval(1000) .startWith(-1) .subscribe(createSubscriber('interval'));
// merge - merge many observables togethers // concat - this concatenates observables to the end of another, can also take a list of Observables Rx.Observable.interval(1000) .merge(Rx.Observable.interval(500)) .take(5) .subscribe(createSubscriber("merge1")); Rx.Observable.merge( Rx.Observable.interval(1000).map(i => `${i} seconds), Rx.Observable.interval(500).map(i => `${i} half seconds)) .take(5) .subscribe(createSubscriber('merge2')); // different events for merged observables Rx.Observable.merge( socket.on$("login").map(user => processUser(user), socket.on$("logout").map(() => null)); Rx.Observable.range(1, 5) .concat(Rx.Observable.range(10,3)) .subscribe(createSubscriber("concat1"));
// map - a projection on every item that comes in // mergeMap - select many, does projection and then has another thing that we will work on // switchMap - similar to mergeMap but replaces with the latest value if another emission comes in function arrayMap(arr, proj) { let returnArray = []; for (let i of arr) { returnArray.push(proj(item)); } return returnArray; } arrayMap([1, 2, 3], (a) => a * a); // imagine array of dicts const albums = [{}, {}]; function arrayMergeMap(arr, proj) { let returnArray = []; for (let i of arr) { let projArray = proj(item); for (let j of projArray) { returnArray.push(proj(item)); } } return returnArray; } const tracks = arrayMergeMap(albums, (album) => album.tracks); Rx.Observable.range(1, 3) .mergeMap((i) => Rx.Observable.timer(i * 1000).map(() => `After ${i} seconds`) ) .subscribe(createSubscriber('mergeMap')); Rx.Observable.fromPromise(getTracks()) .mergeMap((tracks) => Rx.Observable.from(tracks)) .subscribe(createSubscriber('tracks')); function getTracks() { return new Promise((resolve, reject) => { setTimeout(() => { resolve(['track 1', 'track 2', 'track 3']); }, 1000); }); } // synchronous example Rx.Observable.of('my query') .do(() => console.log('Querying')) .mergeMap((a) => query(a)) .do(() => console.log('After querying')) .subscribe(createSubscriber('query')); function query(value) { return new Promise((resolve, reject) => { setTimeout(() => { resolve('This is the resolved value'); }, 1000); }); } // switch map
// reducer (acc, value) and works on value - doesn't emit until the completion // scan - processes and emits as it comes in Rx.Observable.range(1, 10) .reduce((acc, value) => acc + value) .subscribe(createSubscriber('reduce')); Rx.Observable.range(1, 10) .scan((acc, value) => acc + value) .subscribe(createSubscriber('scan'));
There have been some big changes to how buffer has been used.
Buffer takes in an observable.
toArray will convert results into an array. - still has a clean exit if the never() is implemented!
Rx.Observable.range(1, 100) .bufferCount(25) .subscribe(createSubscriber("items"); // will take 25 items and pushing them into an array Rx.Observable.interval(500) .bufferTime(2000) .subscribe(createSubscriber("bufferTime"); // same behaviour! // emitting event causes buffer to flush Rx.Observable.interval(500) .buffer(Rx.Observable.interval(2000)) .subscribe(createSubscriber("buffer"); // // toArray // Rx.Observable.range(1, 10) .toArray() .subscribe(createSubscriber("range"));
const simple = new Rx.Observable((observer) => { console.log('Generating sequence'); observer.next(1); observer.next(2); observer.next(3); observer.next(4); observer.complete(); }); simple.first().subscribe(createSubscriber('first')); simple.last().subscribe(createSubscriber('last')); // displays 1 & 4 // if nothing is in there, there are EmptyError(s) thrown // single.error thrown is more than one error thrown simple.single().subscribe(createSubscriber('single')); // take and skip won't throw errors // take does the first however emissions // skip will take the emissions after a number simple.take(2).subscribe(createSubscriber('take')); simple.skip(2).subscribe(createSubscriber('skip')); // 3, 4 simple .skip(2) .take(2) .subscribe(createSubscriber('skip')); // skipWhile / takeWhile Rx.Observable.interval(500) .skipWhile((i) => i < 4) .takeWhile((i) => i < 10) .subscribe(createSubscriber('skipWhile/takeWhile')); // what's until and take emissions until Rx.Observable.interval(500) .skipUntil(Rx.Observable.timer(1000)) .takeUntil(Rx.Observable.timer(4000)) .subscribe(createSubscriber('skipUntil'));
How can we combine observables in different ways?
function arrayZip(arr1, arr2, selectorFunc) { const count = Math.min(arr1.length, arr2.length); const results = []; for (let i = 0; i < count; i++) { const combined = selector(arr1[i], arr2[i]); results.push(combined); } return results; } const arr1 = [32, 2, 52, 43, 54]; const arr2 = [1, 0, 10, 4, 1, 4, 6, 2]; const results = arrayZip(arr1, arr2, (left, right) => left * right); console.log(results); // in RxJS Rx.Observable.range(1.1) .zip( Rx.Observable.interval(500), (left, right) => `item: ${left}, at ${right * 500}` ) .subscribe(createSubscriber('zip')); // emits value when source emits // can also pass (left, right) function like zip as second parameter Rx.Observable.interval(1000) .withLatestFrom(Rx.Observable.interval(500)) .subscribe(createSubscriber('withLatestFrom')); // emit value if either do Rx.Observable.interval(1000) .combineLatest(Rx.Observable.interval(500)) .subscribe(createSubscriber('withLatestFrom'));
If an error happens, an observer stops emitting and can prevent values from emitting at all. Error handling is very important!
.catch(error => Rx.Observable.of(error)) can pass this down as an Observable.
.retry() we can pass in with a numeral to ensure that we either keep retrying or retry a certain number of times.