RXJS_details
RxJS (Reactive Extentions for JavaScript)
RxJS (Reactive Extentions for JavaScript) is a library for reactive programming using Observables, making it easier to compose asyncchronous or callback-based code.
Key Benefits
- Handles async operation elegaintly
- Provides powerfull operators for data transformation
- Manages complex event sequences
- Enables cancellation of async operation
- Reduces callback hell
Operators (commonally used)
Combination
- combineLatest:
When any observable emits a value, emit the last emitted value from each. - concat:
Subscribe to observables in order as previous completes. (sequentially in order) - forkJoin :
When all observables complete, emit the last emitted value from each. - merge:
Turn multiple observables into a single observable. - startWith :
Emit given value first. - withLatestFrom:
Also provide the last value from another observable.
- combineLatest:
Creation
- from :
Turn an array, promise, or iterable into an observable. - fromEvent:
Turn event into observable sequence. - of :
Emit variable amount of values in a sequence and then emits a complete notification.: emits all the values at once - throwError:
Creates an Observable that immediately emits an error notification upon subscription.
- from :
Error Handling
- catch / catchError:
Gracefully handle errors in an observable sequence. - retry:
Retry an observable sequence a specific number of times.
- catch / catchError:
Filtering
- debounceTime :
Discard emitted values that take less than the specified time between output - distinctUntilChanged:
Only emit when the current value is different than the last. - filter:
Emit values that pass the provided condition. - find:
Emit the first item that passes predicate then complete. - skip:
Skip the provided number of emitted values. - take:
Emit provided number of values before completing. - takeUntil:
Emit values until provided observable emits.
- debounceTime :
Multicasting
- share :
Share source among multiple subscribers. - shareReplay:
Share source and replay specified number of emissions on subscription.
- share :
Transformation
- concatMap:
waits for previous inner observable to complete before starting next one. - map:
Transforms each value using a projection function. - mergeMap / flatMap :
Does NOT cancel previous inner observables. All run in parallel - switchMap:
Cancels previous inner observable when new value arrives. - scan :
Reduce over time.
- concatMap:
Utility
- tap / do :
Performs side effects without modifying the stream. - delay:
Delay emitted values by given time.
- tap / do :
Combination
- The combination operators allow the joining of information from multiple observables. Order, time, and structure of emitted values is the primary variation among these operators.
combineLatest
When any observable emits a value, emit the last emitted value from each.
Key Difference from ForkJoin
- Emits multiple times (whenever any source emits)
- Doesn't wait for completion
- Always use the latest value from each observable
- Needs all observables to emit at least once before first emission
NOTE: -if you are working with observables that only emit one value, or you only require the last value of each before completion, forkJoin is likely a better option.
import { combineLatest } from 'rxjs';
data$ = combineLatest([
this.categoryFilters$,
this.priceFilters$,
this.searchFilters$
]).pipe(
switchMap([category, price, search]) =>
this.productService.getProducts({category, price, search})
).subscribe(products => this.products = products);
concat
Subscribe to observables in order as previous completes
💡 think of concat like a line at a ATM, the next transaction (subscription) cannot start until the previous completes!
Use Case:
- Sequential operations
- Depentent API calls
- Maintaining order
import { of, concat } from 'rxjs';
concat(
of(1, 2, 3),
// subscribed after first completes
of(4, 5, 6),
// subscribed after second completes
of(7, 8, 9)
)
// log: 1, 2, 3, 4, 5, 6, 7, 8, 9
.subscribe(console.log);
forkJoin
When all observables complete, emit the last emitted value from each.
- ⚠ If an inner observable does not complete forkJoin will never emit a value!
import { forkJoin } from 'rxjs';
forkJoin({
users: this.http.get('/api/users'),
posts: this.http.get('/api/posts'),
comments: this.http.get('/api/comments')
}). subscribe(res => {
concole.log(res.users);
concole.log(res.posts);
concole.log(res.comments);
})
merge
Turn multiple observables into a single observable.
💡 This operator can be used as either a static or instance method!
💡 If order not throughput is a primary concern, try concat instead!
startWith
Emit given value first.
💡 A BehaviorSubject can also start with an initial value!
import { startWith, scan } from 'rxjs/operators';
import { of } from 'rxjs';
//emit ('World!', 'Goodbye', 'World!')
const source = of('World!', 'Goodbye', 'World!');
//start with 'Hello', concat current string to previous
const example = source.pipe(
startWith('Hello'),
scan((acc, curr) => `${acc} ${curr}`)
);
/*
output:
"Hello"
"Hello World!"
"Hello World! Goodbye"
"Hello World! Goodbye World!"
*/
const subscribe = example.subscribe(val => console.log(val));
withLatestFrom
Also provide the last value from another observable.
💡 If you want the last emission any time a variable number of observables emits, try combinelatest!
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs';
//emit every 5s
const source = interval(5000);
//emit every 1s
const secondSource = interval(1000);
const example = source.pipe(
withLatestFrom(secondSource),
map(([first, second]) => {
return `First Source (5s): ${first} Second Source (1s): ${second}`;
})
);
/*
"First Source (5s): 0 Second Source (1s): 4"
"First Source (5s): 1 Second Source (1s): 9"
"First Source (5s): 2 Second Source (1s): 14"
...
*/
const subscribe = example.subscribe(val => console.log(val));
Creation
from
Turn an array, promise, or iterable into an observable.
💡 This operator can be used to convert a promise to an observable!
💡 For arrays and iterables, all contained values will be emitted as a sequence!
💡 This operator can also be used to emit a string as a sequence of characters!
import { from } from 'rxjs';
//emit array as a sequence of values
const arraySource = from([1, 2, 3, 4, 5]);
//output: 1,2,3,4,5
const subscribe = arraySource.subscribe(val => console.log(val));
fromEvent:
Turn event into observable sequence.
// Observable from mouse clicks
mport { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';
//create observable that emits click events
const source = fromEvent(document, 'click');
//map to string with given event timestamp
const example = source.pipe(map(event => `Event time: ${event.timeStamp}`));
//output (example): 'Event time: 7276.390000000001'
const subscribe = example.subscribe(val => console.log(val));
of :
Emit variable amount of values in a sequence and then emits a complete notification. - emits all the values at once
// Emitting a sequence of numbers
import { of } from 'rxjs';
//emits any number of provided values in sequence
const source = of(1, 2, 3, 4, 5);
//output: 1,2,3,4,5
const subscribe = source.subscribe(val => console.log(val));
throwError:
Creates an Observable that immediately emits an error notification upon subscription.
Example 1: Basic error emission
import { throwError } from 'rxjs';
// Create an observable that immediately emits an error
const error$ = throwError(() => new Error('Something went wrong!'));
// Subscribe to see the error
error$.subscribe({
next: val => console.log('Next:', val), // Won't be called
error: err => console.error('Error caught:', err.message),
complete: () => console.log('Complete!') // Won't be called
});
// Output: "Error caught: Something went wrong!"
Error Handling
catch / catchError:
Gracefully handle errors in an observable sequence.
💡 Need to retry a failed operation? Check out retry or retryWhen!
💡 For resource cleanup regardless of error, use finalize!
⚠ Remember to return an observable from the catchError function!
Example 1: Catching error from observable
import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';
// emit error
const source = throwError('This is an error!');
// gracefully handle error, returning observable with error message
const example = source.pipe(
catchError(val => of(`I caught: ${val}`))
);
// output: 'I caught: This is an error'
const subscribe = example.subscribe(val => console.log(val));
retry:
Retry an observable sequence a specific number of times should an error occur.
💡 Useful for retrying HTTP requests!
💡 If you only want to retry in certain cases, check out retryWhen!
💡 For non error cases check out repeat!
Filtering
debounceTime :
Discard emitted values that take less than the specified time between output
💡 This operator is popular in scenarios such as type-ahead where the rate of user input must be controlled!
this.myFormControl.valueChanges.pipe(
debounceTime(300)
).subscribe(value => {
// handle the value after 300ms of inactivity
});
distinctUntilChanged:
Only emit when the current value is different than the last.
💡 distinctUntilChanged uses === comparison by default, object references must match!
💡 If you want to compare based on an object property, you can use distinctUntilKeyChanged instead!
// RxJS v6+
import { from } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
// only output distinct values, based on the last emitted value
const source$ = from([1, 1, 2, 2, 3, 3]);
source$
.pipe(distinctUntilChanged())
// output: 1,2,3
.subscribe(console.log);
filter:
Emit values that pass the provided condition.
💡 If you would like to complete an observable when a condition fails, check out takeWhile!
// RxJS v6+
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';
//emit (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
//filter out non-even numbers
const example = source.pipe(filter(num => num % 2 === 0));
//output: "Even number: 2", "Even number: 4"
const subscribe = example.subscribe(val => console.log(`Even number: ${val}`));
find:
Emit the first item that passes predicate then complete.
💡 If you always want the first item emitted, regardless of condition, try first()!
skip:
Skip the provided number of emitted values.
take:
Emit provided number of values before completing.
takeUntil:
Emit values until provided observable emits.
💡 If you only need a specific number of values, try take!
private destroy$ = new Subject<void>();
observable$
.pipe(takeUntil(this.destroy$))
.subscribe(data => console.log(data));
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
Multicasting
In RxJS observables are cold, or unicast by default. These operators can make an observable hot, or multicast, allowing side-effects to be shared among multiple subscribers.
share :
Share source among multiple subscribers.
💡 share is like multicast with a Subject and refCount!
const hot$ = this.http.get('/api/data/').pipe(share());
shareReplay:
Share source and replay specified number of emissions on subscription.
shareRepaly() makes a cold Observables hot and replays the Last N emissiomns to new subscribers.
/with shareReplay - single HTTP call, cached
const data$ = this.http.get('/api/config').pipe(
shareReplay(1) // Cache last 1 emission
);
Cold Observable:
- Creates data produser for each subscriber
- Each subscriber gets independent execution
- Exapmle:
Http requests, intervals, of()/from() operators.
const cold$ = this.http.get('/api/data');
cold$.subscribe(data => console.log('sub 1:' data)); // New request
cold$.subscribe((data => console.log('sub 2:' data)); // another New request
// 2 http request mode!
Hot Observable:
- Shares single data produser among all subscribers
- All subscribers share same execution
- Example : Subject, DOM event, shared streams
const hot$ = this.http.get('/api/data/').pipe(share());
hot$.subscribe(data => console.log('sub 1:' data)); // Makes request
hot$.subscribe((data => console.log('sub 2:' data)); // Share same request
// only 1 http request made!
Making cold Observable to Hot:
const shared$ = this.http.get('/api/data/').pipe(
share() // or shareReplay(1)
);
Transformation
Transforming values as they pass through the operator chain is a common task. These operators provide transformation techniques for nearly any use-case you will encounter.
concatMap:
waits for previous inner observable to complete before starting next one.
import {of} from 'rxjs';
import {concatMap, delay} from 'rxjs/operators';
of(1,2,3)pipe(
concatMap(id => this.http.get(`/api/log/${id}`).pipe(delay(1000)))
)
.subscribe(console.log) // Executes sequentially 1, then 2, then 3
Use Case:
- Sequential operations
- Depentent API calls
- Maintaining order
map:
Transforms each value using a projection function.
import {of} from 'rxjs';
import {map} from 'rxjs/operators';
//Eg1: simple transformation
of(1,2,3)
.pipe(
map(x=> x* 2)
)
.subscribe(console.log); //2, 4, 6
//Eg2: Object transformation
interface User {id: number; name: string; }
of({id: 1, name: 'sudhanshu'})
.pipe(
map((user:User)=> user.name)
)
.subscribe(console.log); // sudhanshu
//Eg3: API responce mapping
this.http.get<ApiResponse>('/api/users')
.pipe(
map(response => response.data),
map(users => users.map(u=> ({...u, fullName: `${u.firstname} ${u.lastname}`})))
)
.subscribe(users => console.log(users));
mergeMap / flatMap :
Does NOT cancel previous inner observables. All run in parallel
import {of} from 'rxjs';
import {mergeMap, delay} from 'rxjs/oprators';
//Eg 1: parallel api calls
of(1,2,3)
.pipe(
margeMap(id => this.http.get(`/api/users/${id}`))
)
.subscribe(user => console.log(user)); // All 3 request run in parallel
//Eg 2: File upload
const files = [file1, file2, file3];
from(files)
.pipe(
margeMap(file => this.uploadFile(file))
)
.subscribe(result => console.log(result));
// Eg 3: Concurrent limit
from([1,2,3,4,5]).pipe(
margeMap(
id => this.http.get(`/api/users/${id}`),
3 // Max 3 Concurrent requests
)
).subscribe(console.log);
Key Differences from switchMap:
- does not cancel previous ubcriptions
- on inner observables run concurrently
- Use when order doesn't matter and all request are important
switchMap:
Cancels previous inner observable when new value arrives.
Key Points
- Cancles previous subscription
- Prefect for search/autocomplete
- Prevents race conditions
- Only latest request matters
Utility
From logging, handling notifications, to setting up schedulers, these operators provide helpful utilities in your observable toolkit.
tap / do :
Performs side effects without modifying the stream.
tap() is used for side effects without modifying the stream. It's transparent - values through unchanged.
this.http.get('/api/users').pipe(
tap(response => console.log('API response:' response)), // Debugging
tap(()=> this.loading =false), // Ubdate loading state
tab(data => localStroage.setItom('cache', JSON.stringify(data))), //Caching
map( response => response.data) // Transfrom data
).subscribe(users => this.users=users);
Common use cases:
- Logging/debugging
- Setting loading/error flag
- caching
- Analytics tracking
- Side effect that don't modify data
delay:
Delay emitted values by given time.
// Example 1: Delay to recognize long press
import { fromEvent, of } from 'rxjs';
import { mergeMap, delay, takeUntil } from 'rxjs/operators';
const mousedown$ = fromEvent(document, 'mousedown');
const mouseup$ = fromEvent(document, 'mouseup');
mousedown$
.pipe(mergeMap(event => of(event).pipe(delay(700), takeUntil(mouseup$))))
.subscribe(event => console.log('Long Press!', event));
Comments
Post a Comment