RXJS_details

 

RxJS (Reactive Extentions for JavaScript)

RxJS (Reactive Extentions for JavaScript) is a library for reactive programming using Observables, making it easier to compose asyncchronous or callback-based code.

Key Benefits

  • Handles async operation elegaintly
  • Provides powerfull operators for data transformation
  • Manages complex event sequences
  • Enables cancellation of async operation
  • Reduces callback hell

Operators (commonally used)

  • Combination

    • combineLatest: When any observable emits a value, emit the last emitted value from each.
    • concat: Subscribe to observables in order as previous completes. (sequentially in order)
    • forkJoin : When all observables complete, emit the last emitted value from each.
    • merge: Turn multiple observables into a single observable.
    • startWith : Emit given value first.
    • withLatestFrom: Also provide the last value from another observable.
  • Creation

    • from : Turn an array, promise, or iterable into an observable.
    • fromEvent: Turn event into observable sequence.
    • of : Emit variable amount of values in a sequence and then emits a complete notification. : emits all the values at once
    • throwError: Creates an Observable that immediately emits an error notification upon subscription.
  • Error Handling

    • catch / catchError: Gracefully handle errors in an observable sequence.
    • retry: Retry an observable sequence a specific number of times.
  • Filtering

    • debounceTime : Discard emitted values that take less than the specified time between output
    • distinctUntilChanged: Only emit when the current value is different than the last.
    • filter: Emit values that pass the provided condition.
    • find: Emit the first item that passes predicate then complete.
    • skip: Skip the provided number of emitted values.
    • take: Emit provided number of values before completing.
    • takeUntil: Emit values until provided observable emits.
  • Multicasting

    • share : Share source among multiple subscribers.
    • shareReplay: Share source and replay specified number of emissions on subscription.
  • Transformation

    • concatMap: waits for previous inner observable to complete before starting next one.
    • map: Transforms each value using a projection function.
    • mergeMap / flatMap : Does NOT cancel previous inner observables. All run in parallel
    • switchMap: Cancels previous inner observable when new value arrives.
    • scan :Reduce over time.
  • Utility

    • tap / do : Performs side effects without modifying the stream.
    • delay: Delay emitted values by given time.

Combination

  • The combination operators allow the joining of information from multiple observables. Order, time, and structure of emitted values is the primary variation among these operators.

combineLatest

When any observable emits a value, emit the last emitted value from each.

Key Difference from ForkJoin

  • Emits multiple times (whenever any source emits)
  • Doesn't wait for completion
  • Always use the latest value from each observable
  • Needs all observables to emit at least once before first emission

NOTE: -if you are working with observables that only emit one value, or you only require the last value of each before completion, forkJoin is likely a better option.

import { combineLatest } from 'rxjs';

data$ = combineLatest([
    this.categoryFilters$,
    this.priceFilters$,
    this.searchFilters$

]).pipe(
    switchMap([category, price, search]) => 
    this.productService.getProducts({category, price, search})
).subscribe(products => this.products = products);

concat

Subscribe to observables in order as previous completes

💡 think of concat like a line at a ATM, the next transaction (subscription) cannot start until the previous completes!

Use Case:

  • Sequential operations
  • Depentent API calls
  • Maintaining order
import { of, concat } from 'rxjs';

concat(
  of(1, 2, 3),
  // subscribed after first completes
  of(4, 5, 6),
  // subscribed after second completes
  of(7, 8, 9)
)
  // log: 1, 2, 3, 4, 5, 6, 7, 8, 9
  .subscribe(console.log);

forkJoin

When all observables complete, emit the last emitted value from each.

  • ⚠ If an inner observable does not complete forkJoin will never emit a value!
import { forkJoin } from 'rxjs';

forkJoin({
    users: this.http.get('/api/users'),
    posts: this.http.get('/api/posts'),
    comments: this.http.get('/api/comments')
}). subscribe(res => {
    concole.log(res.users);
    concole.log(res.posts);
    concole.log(res.comments);
})

merge

Turn multiple observables into a single observable.

💡 This operator can be used as either a static or instance method!

💡 If order not throughput is a primary concern, try concat instead!

startWith

Emit given value first.

💡 A BehaviorSubject can also start with an initial value!

import { startWith, scan } from 'rxjs/operators';
import { of } from 'rxjs';

//emit ('World!', 'Goodbye', 'World!')
const source = of('World!', 'Goodbye', 'World!');
//start with 'Hello', concat current string to previous
const example = source.pipe(
  startWith('Hello'),
  scan((acc, curr) => `${acc} ${curr}`)
);
/*
  output:
  "Hello"
  "Hello World!"
  "Hello World! Goodbye"
  "Hello World! Goodbye World!"
*/
const subscribe = example.subscribe(val => console.log(val));

withLatestFrom

Also provide the last value from another observable.

💡 If you want the last emission any time a variable number of observables emits, try combinelatest!

import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs';

//emit every 5s
const source = interval(5000);
//emit every 1s
const secondSource = interval(1000);
const example = source.pipe(
  withLatestFrom(secondSource),
  map(([first, second]) => {
    return `First Source (5s): ${first} Second Source (1s): ${second}`;
  })
);
/*
  "First Source (5s): 0 Second Source (1s): 4"
  "First Source (5s): 1 Second Source (1s): 9"
  "First Source (5s): 2 Second Source (1s): 14"
  ...
*/
const subscribe = example.subscribe(val => console.log(val));


Creation

from

Turn an array, promise, or iterable into an observable.

💡 This operator can be used to convert a promise to an observable!

💡 For arrays and iterables, all contained values will be emitted as a sequence!

💡 This operator can also be used to emit a string as a sequence of characters!

import { from } from 'rxjs';

//emit array as a sequence of values
const arraySource = from([1, 2, 3, 4, 5]);
//output: 1,2,3,4,5
const subscribe = arraySource.subscribe(val => console.log(val));

fromEvent:

Turn event into observable sequence.

// Observable from mouse clicks

mport { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';

//create observable that emits click events
const source = fromEvent(document, 'click');
//map to string with given event timestamp
const example = source.pipe(map(event => `Event time: ${event.timeStamp}`));
//output (example): 'Event time: 7276.390000000001'
const subscribe = example.subscribe(val => console.log(val));

of :

Emit variable amount of values in a sequence and then emits a complete notification. - emits all the values at once

// Emitting a sequence of numbers


import { of } from 'rxjs';
//emits any number of provided values in sequence
const source = of(1, 2, 3, 4, 5);
//output: 1,2,3,4,5
const subscribe = source.subscribe(val => console.log(val));

throwError:

Creates an Observable that immediately emits an error notification upon subscription.

Example 1: Basic error emission

import { throwError } from 'rxjs';

// Create an observable that immediately emits an error
const error$ = throwError(() => new Error('Something went wrong!'));

// Subscribe to see the error
error$.subscribe({
  next: val => console.log('Next:', val), // Won't be called
  error: err => console.error('Error caught:', err.message),
  complete: () => console.log('Complete!') // Won't be called
});

// Output: "Error caught: Something went wrong!"

Error Handling

catch / catchError:

Gracefully handle errors in an observable sequence.

💡 Need to retry a failed operation? Check out retry or retryWhen!

💡 For resource cleanup regardless of error, use finalize!

⚠ Remember to return an observable from the catchError function!

Example 1: Catching error from observable

import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';

// emit error
const source = throwError('This is an error!');

// gracefully handle error, returning observable with error message
const example = source.pipe(
  catchError(val => of(`I caught: ${val}`))
);

// output: 'I caught: This is an error'
const subscribe = example.subscribe(val => console.log(val));

retry:

Retry an observable sequence a specific number of times should an error occur.

💡 Useful for retrying HTTP requests!

💡 If you only want to retry in certain cases, check out retryWhen!

💡 For non error cases check out repeat!

Filtering

debounceTime :

Discard emitted values that take less than the specified time between output

💡 This operator is popular in scenarios such as type-ahead where the rate of user input must be controlled!

this.myFormControl.valueChanges.pipe(
  debounceTime(300)
).subscribe(value => {
  // handle the value after 300ms of inactivity
});

distinctUntilChanged:

Only emit when the current value is different than the last.

💡 distinctUntilChanged uses === comparison by default, object references must match!

💡 If you want to compare based on an object property, you can use distinctUntilKeyChanged instead!

// RxJS v6+
import { from } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

// only output distinct values, based on the last emitted value
const source$ = from([1, 1, 2, 2, 3, 3]);

source$
  .pipe(distinctUntilChanged())
  // output: 1,2,3
  .subscribe(console.log);

filter:

Emit values that pass the provided condition.

💡 If you would like to complete an observable when a condition fails, check out takeWhile!

// RxJS v6+
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

//emit (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
//filter out non-even numbers
const example = source.pipe(filter(num => num % 2 === 0));
//output: "Even number: 2", "Even number: 4"
const subscribe = example.subscribe(val => console.log(`Even number: ${val}`));

find:

Emit the first item that passes predicate then complete.

💡 If you always want the first item emitted, regardless of condition, try first()!

skip:

Skip the provided number of emitted values.

take:

Emit provided number of values before completing.

takeUntil:

Emit values until provided observable emits.

💡 If you only need a specific number of values, try take!

private destroy$ = new Subject<void>();

observable$
  .pipe(takeUntil(this.destroy$))
  .subscribe(data => console.log(data));

ngOnDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

Multicasting

In RxJS observables are cold, or unicast by default. These operators can make an observable hot, or multicast, allowing side-effects to be shared among multiple subscribers.

share :

Share source among multiple subscribers.

💡 share is like multicast with a Subject and refCount!

const hot$ = this.http.get('/api/data/').pipe(share());

shareReplay:

Share source and replay specified number of emissions on subscription.

shareRepaly() makes a cold Observables hot and replays the Last N emissiomns to new subscribers.

/with shareReplay - single HTTP call, cached
const data$ = this.http.get('/api/config').pipe(
   shareReplay(1) // Cache last 1 emission
);

Cold Observable:

  • Creates data produser for each subscriber
  • Each subscriber gets independent execution
  • Exapmle: Http requests, intervals, of()/from() operators.
 const cold$ = this.http.get('/api/data');

 cold$.subscribe(data => console.log('sub 1:' data)); // New request
 cold$.subscribe((data => console.log('sub 2:' data)); // another New request
 // 2 http request mode!

Hot Observable:

  • Shares single data produser among all subscribers
  • All subscribers share same execution
  • Example : Subject, DOM event, shared streams
const hot$ = this.http.get('/api/data/').pipe(share());


 hot$.subscribe(data => console.log('sub 1:' data)); // Makes request
 hot$.subscribe((data => console.log('sub 2:' data)); // Share same request
// only 1 http request made!
Making cold Observable to Hot:

const shared$ = this.http.get('/api/data/').pipe(
    share() // or shareReplay(1)
    );

Transformation

Transforming values as they pass through the operator chain is a common task. These operators provide transformation techniques for nearly any use-case you will encounter.

concatMap:

waits for previous inner observable to complete before starting next one.

import {of} from 'rxjs';
import {concatMap, delay} from 'rxjs/operators';

of(1,2,3)pipe(
    concatMap(id => this.http.get(`/api/log/${id}`).pipe(delay(1000)))
)
.subscribe(console.log) // Executes sequentially 1, then 2, then 3

Use Case:

  • Sequential operations
  • Depentent API calls
  • Maintaining order

map:

Transforms each value using a projection function.

import {of} from 'rxjs';
import {map} from 'rxjs/operators';

//Eg1: simple transformation
of(1,2,3)
.pipe(
    map(x=> x* 2)
)
.subscribe(console.log); //2, 4, 6

//Eg2: Object transformation
interface User {id: number; name: string; }

of({id: 1, name: 'sudhanshu'})
.pipe(
    map((user:User)=> user.name)
)
.subscribe(console.log); // sudhanshu

//Eg3: API responce mapping

this.http.get<ApiResponse>('/api/users')
.pipe(
    map(response => response.data),
    map(users => users.map(u=> ({...u, fullName: `${u.firstname} ${u.lastname}`})))
)
.subscribe(users => console.log(users));

mergeMap / flatMap :

Does NOT cancel previous inner observables. All run in parallel

import {of} from 'rxjs';
import {mergeMap, delay} from 'rxjs/oprators';

//Eg 1: parallel api calls
of(1,2,3)
.pipe(
    margeMap(id => this.http.get(`/api/users/${id}`))
)
.subscribe(user => console.log(user)); // All 3 request run in parallel

//Eg 2: File upload 
const files = [file1, file2, file3];

from(files)
    .pipe(
        margeMap(file => this.uploadFile(file))
    )
    .subscribe(result => console.log(result));

// Eg 3: Concurrent limit 
from([1,2,3,4,5]).pipe(
    margeMap(
        id => this.http.get(`/api/users/${id}`),
        3 // Max 3 Concurrent requests
    )
).subscribe(console.log);

Key Differences from switchMap:

  • does not cancel previous ubcriptions
  • on inner observables run concurrently
  • Use when order doesn't matter and all request are important

switchMap:

Cancels previous inner observable when new value arrives.

Key Points

  • Cancles previous subscription
  • Prefect for search/autocomplete
  • Prevents race conditions
  • Only latest request matters

Utility

From logging, handling notifications, to setting up schedulers, these operators provide helpful utilities in your observable toolkit.

tap / do :

Performs side effects without modifying the stream.

tap() is used for side effects without modifying the stream. It's transparent - values through unchanged.

this.http.get('/api/users').pipe(
    tap(response => console.log('API response:' response)), // Debugging
    tap(()=> this.loading =false), // Ubdate loading state
    tab(data => localStroage.setItom('cache', JSON.stringify(data))), //Caching
    map( response => response.data) // Transfrom data
).subscribe(users => this.users=users);

Common use cases:

  • Logging/debugging
  • Setting loading/error flag
  • caching
  • Analytics tracking
  • Side effect that don't modify data

delay:

Delay emitted values by given time.


// Example 1: Delay to recognize long press

import { fromEvent, of } from 'rxjs';
import { mergeMap, delay, takeUntil } from 'rxjs/operators';

const mousedown$ = fromEvent(document, 'mousedown');
const mouseup$ = fromEvent(document, 'mouseup');

mousedown$
  .pipe(mergeMap(event => of(event).pipe(delay(700), takeUntil(mouseup$))))
  .subscribe(event => console.log('Long Press!', event));

Comments

Popular posts from this blog