RXJS

#RXJS INTERVIEW GUIDE

Table of Contents

  1. Introduction to RxJS
  2. Core concepts
  3. Subjects
  4. Operators
  5. Common patterns
  6. Interview questions
  7. Pratical Scenarios

Table of Contents

RxJS (Reactive Extentions for JavaScript) is a library for reactive programming using Observables, making it easier to compose asyncchronous or callback-based code.

Key Benefits

  • Handles async operation elegaintly
  • Provides powerfull operators for data transformation
  • Manages complex event sequences
  • Enables cancellation of async operation
  • Reduces callback hell

Core Concepts

1. Observable

An Observables is a lazy Push collection of multiple values over time.

import {Observable} from 'rxjs';

// Creating an Observable
const observable = new observable(subscriber => {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    setTimeout(() => {
        subscriber.next(4);
        subscriber.complete();
    }, 1000);
});

//subscribing to an Observable
observable.subscribe({
    next: (value)=> console.log(value),
    error: (err)=> console.log(err),
    complete: ()=> console.log('complete')
});

Key Points:

  • Observables are lazy - they don't execute untill subscribed
  • Can emait multiple value over time
  • Can be synchronous or asynchronous
  • Must be subscribed to start executation

2. Observer

An Observer is a consumer of value delavered by an observables.

const observer = {
    next: (value)=> console.log(value),
    error: (err)=> console.log(err),
    complete: ()=> console.log('complete')
};

observable.subscribe(observer);

Observer Interface

  • next(value) - recevies emitted values
  • error(value) - handle errors
  • complate() - called when stream completes

Subjects

1. Subject

Subject is both Observable & an Observer. It can multicast values to multiple observers.

import {subject} from 'rxjs';

const subject = new Subject<number>();

//Multiple subscribers 
subject.subscribe(value => console.log('Observer A:', Value));
subject.subscribe(value => console.log('Observer B:', Value));

//Emit values 
subject.next(1); // both subscriber receive 1
subject.next(2); // both subscriber receive 2

Use Cases:

  • Event bus implementation
  • Share data between components
  • Multicast to Multiple observers

Important: Subject don't have initial value and late subscribers won't receive previous values.

2. BehaviorSubject

BehaviourSubject holds one current value and emits it immediatey to new subscribers.

import {BehaviorSubject} from 'rxjs'

const behaviorSubject = new BehaviorSubject<number>(0); // initial value

behaviorSubject.subscribe(value => console.log('Observer A:', value)) // Receives 0

behaviorSubject.next(1);
behaviorSubject.next(2);


behaviorSubject.subscribe(value => console.log('Observer B:', value)) // Receives 2 (current value)

behaviorSubject.next(3); //both receive 3

Use Case:

  • State management
  • Current user data
  • Configuration setting
  • Form values

Key difference from Subject:

  • Requires initial value
  • Stores the latest value
  • New Subscriber get the current value immediately

3. Replay Subject

import {ReplaySubject} from 'rxjs';

const replaySubject = new ReplaySubject<number>(2); // Buffer size 2

replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);

replaySubject.subscribe(value => console.log('Observer:', value)) // Receives 2, 3 

4. AsyncSubject

import {AsyncSubject} from 'rxjs';

const asyncSubject = new AsyncSubject<number>(2); // Buffer size 2

asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.next(3);
asyncSubject.complete(); // Only emit 3 on complete 
asyncSubject.subscribe(value => console.log('Observer:', value)) // Receives 3 

Operators

1. Pipe()

The pipe() method is compose multiple operators together.

import {of} from 'rxjs';
import {map, fillter} from 'rxjs/operators';

of(1,2,3,4,5)
.pipe(
    filter(x=> x%2 ===0),
    map(x=> x*10)
)
.subscribe(console.log); //20, 40

Why pipe()?

  • Chains multiple operators
  • Better tree-shaking
  • Improved performance
  • Cleaner syntex

2. map()

Transforms each value using a projection function.

import {of} from 'rxjs';
import {map} from 'rxjs/operators';

//Eg1: simple transformation
of(1,2,3)
.pipe(
    map(x=> x* 2)
)
.subscribe(console.log); //2, 4, 6

//Eg2: Object transformation
interface User {id: number; name: string; }

of({id: 1, name: 'sudhanshu'})
.pipe(
    map((user:User)=> user.name)
)
.subscribe(console.log); // sudhanshu

//Eg3: API responce mapping

this.http.get<ApiResponse>('/api/users')
.pipe(
    map(response => response.data),
    map(users => users.map(u=> ({...u, fullName: `${u.firstname} ${u.lastname}`})))
)
.subscribe(users => console.log(users));

filter()

Filters value based on a predicate function.

import {of} from 'rxjs';
import {fillter} from 'rxjs/operators';
//Example1: Filter even numbers
of(1,2,3,4,5,6)
.pipe(
    filter(x=> x%2 ===0)
)
.subscribe(console.log)  //2, 4, 6

//Exampl 2: Fillter null/undefined
of(1, null, 3, undefined, 5)
.pipe(
    filter(x=> x !=null)
)
.subscribe(console.log)  // 1, 3, 5

//filter objects 
interface Product {id: number; inStock: boolean; }

of(
    {id:1, inStock: true},
    {id:1, inStock: false},
    {id:1, inStock: true}
).pipe(filter(product=> product.inStock))
.subscribe(console.log); // Only in-stock products

4. tap()

Performs side effects without modifying the stream.

import {of} from 'rxjs';
import {map, tap} from 'rxjs/operators';

//Eg 1: logging
of(1,2,3)
.pipe(
    tap(x=> console.log('Before:' x)),
    map(x=> x*2),
    tap(x=> console.log('After:' x))
).subscribe();

//Eg 2: Debugging API calls

this.http.get<ApiResponse>('/api/users')
.pipe(
    tap(response => console.log('API Response: ', response)),
    tap(() => this.loading = false),
    map(response => response.data)
)
.subscribe(users => console.log(users));

// Eg 3: Side effects without modification
of('data')
.pipe(
    tap(data => localStroage.setItem('cache', data)),
    tap(data => this.analytics.track('data-fetched'))
)
.subscribe(consol.log) // 'data' passes through unchanged

Use Cases

  • Debugging
  • Logging
  • Caching
  • Analytics
  • Setting flags (loading, error, states)

5. switchMap()

Cancels previous inner observable when new value arrives.

import {fromEvent} from 'rxjs';
import {switchMap, debonceTime} from 'rxjs/operators';

// Eg 1: Search autocomplete 
const searchBox = document.getElementById('search');

fromEvent(searchBox, 'input')
    .pipe(
        debounceTime(300),
        switchMap(event => this.http.get(`/api/search?q=${event.traget.value}`))
    )
    .subscribe( result => console.log(result));

// Eg 2: Route params to Http request
this.route.params.pipe(
    switchMap(params => this.http.get(`/api/users/${params.id}`))
)
.subscribe(user => this.user = user);

// Eg 3: Dependent requestes
this.getUserId().pipe(
    switchMap(userId => this.http.get(`/api/users/${userId}`))
    switchMap(user => this.http.get(`/api/users/${user.id}`))
)
.subscribe(orders => console.log(orders));

Key Points

  • Cancles previous subscription
  • Prefect for search/autocomplete
  • Prevents race conditions
  • Only latest request matters

6. margeMap() / flatMaph()

Does NOT cancel previous inner observables. All run in parallel.

import {of} from 'rxjs';
import {mergeMap, delay} from 'rxjs/oprators';

//Eg 1: parallel api calls
of(1,2,3)
.pipe(
    margeMap(id => this.http.get(`/api/users/${id}`))
)
.subscribe(user => console.log(user)); // All 3 request run in parallel

//Eg 2: File upload 
const files = [file1, file2, file3];

from(files)
    .pipe(
        margeMap(file => this.uploadFile(file))
    )
    .subscribe(result => console.log(result));

// Eg 3: Concurrent limit 
from([1,2,3,4,5]).pipe(
    margeMap(
        id => this.http.get(`/api/users/${id}`),
        3 // Max 3 Concurrent requests
    )
).subscribe(console.log);

Key Differences from switchMap:

  • does not cancel previous ubcriptions
  • on inner observables run concurrently
  • Use when order doesn't matter and all request are important

7. concatMap()

waits for previous inner observable to complete before starting next one.

import {of} from 'rxjs';
import {concatMap, delay} from 'rxjs/operators';

of(1,2,3)pipe(
    concatMap(id => this.http.get(`/api/log/${id}`).pipe(delay(1000)))
)
.subscribe(console.log) // Executes sequentially 1, then 2, then 3

Use Case:

  • Sequential operations
  • Depentent API calls
  • Maintaining order

8. debounceTime()

Delays emission until specified time has passed without another emmission.

import {fromEvent} from 'rxjs';
import {debounceTime, distinctUntilChanged, switchMap} from 'rxjs/operators';

//Eg 1: search input 
const searchInput = document.getElementById('search');

fromEvent(searchInput, 'input').pipe(
    debounceTime(300), // Wait 300ms after user stops typing
    distinctUntilChanged(),
    switchMap(event => this.searchService.search(event.target.value))
).subscribe(result => this.seachResult = result);

// Eg 2: window resize
formEvent(window, 'resize')pipe(
    debounceTime(500)
).subscirbe(() => this.handleResize());

//Eg 3: Form value changes
this.myForm.valueChanges.pipe(
    debounceTime(500)
).subscribe(values => this.autoSave(values));

Use Case

  • Search autocomplete
  • Form autosave
  • Window resize handlers
  • Scroll events Optimization: Prevents excessive API calls!

9. forkJoin()

Waits for all observables to complete, then emits an array of last values.

import {forkJoin} from 'rxjs';

// Eg 1: Multiple API calls 
forkJoin({
    users: this.http.get('/api/users'),
    posts: this.http.get('/api/posts'),
    comments: this.http.get('/api/comments')
}). subscribe(res => {
    concole.log(res.users);
    concole.log(res.posts);
    concole.log(res.comments);
})

// Eg 2: Array Format
forkJoin([
    this.http.get('/api/users/1'),
    this.http.get('/api/users/2'),
    this.http.get('/api/users/3')
]).subscribe(([user1, user2, user3]) => {
    concole.log(user1, user2, user3);
})

// Eg 3:Dashboard data loading
forkJoin({
    profile: this.userService.getProfile(),
    stats: this.analyticsService.getStats(),
    notification: this.notificationService.getNotifications(),
    settings: this.settingService.getSettings()
}).pipe(
    tap(() => this.loading=fase)
).subscribe(data => {
    this.initializeDashboard(data);
});

Key Points

  • Simiar to Promise.all()
  • All observables must complete
  • If any observable errors, the whole thing errors
  • runs in parallel

Important If any observable never completes, forkjoin will never emit!

10. combinelatest()

Emits whenever any observable emits, combining the latest values from all.

import {combineLatest} from 'rxjs';
import {startwith} from 'rxjs/operators';

// Eg 1: Form filters
combineLatest([
    this.categoryFilters$,
    this.priceFilters$,
    this,searchFilters$
]).pipe(
    switchMap(([category, price, search]) => 
        this.productService.getProducts({category, price, search })
    )
).subscribe(products => this.products =products);

// Eg 2: User Permissions
combineLatest({
    user: this.authService.user$,
    permissions: this.permissionsService.permission$,
    setting: this.settingService.settings$
}).subscribe(({user, permissions, setting}) => {
    this.canEdit = this.checkPermission(user, permissions);
    this.applySettings(settings);
});

// Eg 3: Reactive form combination
combineLatest([
    this.firstNameControl.valueChanges.pipe(startwith('')),
    this.lastNameControl.valueChanges.pipe(startwith(''))
]).pipe(
    map(([firstName, lastName]) => `${firstName} ${lastName}`.trim())
).suscribe(fullName => this.fullName = fullName);

Key Difference from ForkJoin

  • Emits multiple times (whenever any source emits)
  • Doesn't wait for completion
  • Always use the latest value from each observable
  • Needs all observables to emit at least once before first emission

Other Important operators

//distinctUntilChanged - Only emit when value changes 
of(1,1,2,2,3,3).pipe(
    distinctUntilChanged()
).subscribe(console.log); // 1,2,3

//take - first N emmissions
of(1,2,3,4,5).pipe(
    take(3)
).subscribe(console.log); // 1,2,3

//takeuntil - Take until another observable emits
const destroy$ = new Subject()

this.dataService.getData().pipe(
    takeUntill(destroy$)
    ).subscribe(data => this.data = data);

// catchError - Handle errors
this.http.get('/api/data').pipe(
    catchError(error =>{
        cosole.error(error);
        return of([]); // Return default value
    })
).subscribe(data => this.data = data);

//retry - Retry on error
this.http.get('/api/data').pipe(
    retry(3), // retry 3 times
    catchError(error => of([]))
).subscribe(data => this.data = data);

//delay - Delay emmissions
of('data').pipe(delay(1000))
..subscribe(console.log); //after 1 second

//share - Share singe subscription among multiple subscribers
const shared$ = this.http.get('/api/data').pipe(share());

Common Patterns

1. serch wtih debounce

serachCotrol = new FormCntrol();

ngOnInit(){
    this.searchCntrol.valueChanges.pipe(
        debounceTime(300),
        dinstinctUntilChanged(),
        switchMap(query =>this.searchService.search(query))
    ).subscribe(res => this.res = res)
}

2. Unsubscribe pattern

export class MyComponent implements OnDestroy{
    private distroy$ = new Subject<void>();

    ngOnInit(){
        this.dataService.getData().pipe(
            takeUntill(this.distroy$)
        ).subscribe(data => this.data = data);
    }
    ngDistroy(){
        this.distroy$.next();
        this.distroy$.complate();
    }
}

3. Loading state Pattern

getData(){
    this.loading = true;

    this.http.get('/api/data').pipe(
        tap(()=> this.loading =false),
        catchError(error =>{
            this.loading =false;
            this.error = error;
            return of([]);
        })
    ).subscribe(data => this.data = data);
}

4. Pooling Pattern

import {interval} from 'rxjs';

// poll every 5 seconds
interval(5000).pipe(
    switchMap(()=> this.http.get('/api/status/')),
    takeUntill(this.distroy$)
).subscribe(status => this.status = status);

Interview Questions

Q1: What is RxJS and why is it used in angular?

Answer RxJS is a library for reactive programming using Observables. It's used in Angular beacause:

  • Angular's HttpClient returns Observbles
  • Reactive froms use Observbles
  • Router and ActiveRoute use Observbles
  • Event handling easier
  • Enables powerful data tranformation with operators

Q2: What is difference between Observbles and Promise?

Answer:

ObservblesPromise
Lazy (doesn't execute until subscribed)Eager (executes immediatly)
Can emit muntiple values over timeEmit single value
Cancellable (via unsubscribe)Not cancellable
Provides operators (map, filter, etc.)Limited transfromation (then, catch)
Can be synchronous or asynchronousAlways asynchronous
//Promise - executes immediately
const promise = new Promise( resolve => {
    console.log('Promise eecuted'); // Logs immediately
    resolve('data');
})

//Observble - lazy
const Observbles = new Observbles(subscriber =>{
    console.log('Observbles executed'); // Only logs when subscribed 
    subscriber.next('data');
});
Observbles.subscribe(); // Now it executes

Q3: what is difference between Subject and BehaviourSubject?

Answer:

SubjectBehaviourSubjct
No intitial value requiredRequired initial value
New Subscriber don't get previous valueNew Subscriber get current value immeditely
no concept of "current value"Has .getValue() method

Intermediate lavel (1-3 years)

Q4: What is difference betwween switchMap, mergeMap, and concatMap?

OperatorBehaviorUse Case
switchMapCancels previous OservablesSearch route params
mergeMapRuns all in parallel, doesn't cancelParallel API File uploads
*concatMapSequntial, wait for previous to completeSequential opwrations

Q5: Explain debounceTime with real-world example.

Answer: debounceTime delays emission until a specified time has passed without another emission. Perfect for search inputs to avoid excessive api calls.

//without debouncetime - API call on every keystroke
// User type "angular" = 7 API call!

// with debounceTime - API call only after user stop typing
searchInput.valueChanges.pipe(
    debounceTime(300), // wait 300ms after user stop typing
    distinctUntillChanged(), // Only if value changed 
    switchMap(query => this.searchService.search(query))
).subscribe(res => this.res= res);

// User types "angualr" (7 krystrokes) = 1 api call!

Real world impact:

  • Reduces server load
  • Improves performance
  • Batter user exprience
  • Saves bandwidth

Q6: What is the purpose of tap() oprator?

Answer: tap() is used for side effects without modifying the stream. It's transparent - values through unchanged.

this.http.get('/api/users').pipe(
    tap(response => console.log('API response:' response)), // Debugging
    tap(()=> this.loading =false), // Ubdate loading state
    tab(data => localStroage.setItom('cache', JSON.stringify(data))), //Caching
    map( response => response.data) // Transfrom data
).subscribe(users => this.users=users);

Common use cases:

  • Logging/debugging
  • Setting loading/error flag
  • caching
  • Analytics tracking
  • Side effect that don't modify data

Q7: When would you use forkJoin vs combineLatest?

Answer:

forkJoin: Use when you need to wait for all observebles to complate, then get all final values at once (like promise.all()).

//loading dashboard - need all data before showing
forkJoin({
    users: this.http.get('/api/users'),
    posts: this.http.get('/api/posts'),
    comments: this.http.get('/api/comments')
}).subscribe(({user, posts, comments})=> {
    this.initDashboard(users, posts, comments);
    this.loading= false;
});

combineLatest: Use when you need to react to changes in any observables, combining their latest values.

// Filtering product - update whenever any filter changes
combineLatest([
    this.categoryFilter$,
    this.priceFilter$,
    this.searchTerm$
]).pipe(
    switchMap(([category, price, search]) =>
        this.getProducts(category, price, search)
    )
).subscribe(product => this.products = products)

Key Differences:

  • forkJoin: Emits once when all complete
  • combineLatest: Emits every time any source emits
  • forkJoin: HTTP requests
  • combineLatest: Reactive filters, form controls

Q8 How do you unsubscribe from observable in Angular?

Answer: There are several patterns:

// 1. Munal unsubscribe
export class MyComponent inplement OnDestroy{
    private subscription: Subscription;
    ngOnInit(){
        this.subscription = this.dataService.getData()
        .subscribe(data => this.data =data);
    }
    ngDestroy(){
        this.subscription.unsubscribe();
    }
}

// 2. takeUntill pattern (recommended)
export class MyComponent inplement OnDestroy{
    private destroy$ = new Subject<void>();

    ngOnInit(){
        this.dataService.getData().pipe(
            takeUntill(this.destroy$)
        )
        .subscribe(data => this.data =data);
    }
    ngDestroy(){
        this.destroy$.next();
        this.destroy$.complete();
    }
}

// 3. Async pipe (bast for templates)
//Component
data$ = this.dataService.getData();

//Template
<div *ngFor="let item of data$ |async">{{item}}</div>
<!-- Automatically unsubscribe -->

// 4. take(1) for single emmission
this.http.get('/api/data').pipe(take(1))
.subscribe(data => this.data = data);


Q9: What is the concep of Hot and Cold Obesarvables.

Answer:

Cold Observable:

  • Creates data produser for each subscriber
  • Each subscriber gets independent execution
  • Exapmle: Http requests, intervals
 const cold$ = this.http.get('/api/data');

 cold$.subscribe(data => console.log('sub 1:' data)); // New request
 cold$.subscribe((data => console.log('sub 2:' data)); // another New request
 // 2 http request mode!

Hot Observable:

  • Shares single data produser among all subscribers
  • All subscribers share same execution
  • Example : Subject, DOM event, shared streams
const hot$ = this.http.get('/api/data/').pipe(share());


 hot$.subscribe(data => console.log('sub 1:' data)); // Makes request
 hot$.subscribe((data => console.log('sub 2:' data)); // Share same request
// only 1 http request made!

Making cold Observable to Hot:

const shared$ = this.http.get('/api/data/').pipe(
    share() // or shareReplay(1)
    );


Q10: What is differnce between map() and switchMap()?

Answer:

map() Transforms emitted values synchronously.

  • Input: value -> Output: transfromed value
  • Doesn't handel Observebales

switchMap() Projects each value as observables, cancels previous inner observable.

  • Input: value -> Output: Oservable
  • Handles nested Observables (flattening)
// map - simple transformation
of(1,2,3).pipe(
    map(x => x*2)
)subscribe(console.log); // 2,4,6

// switchMap - Observable transformation
this.route.params.pipe(
    switchMap(params => this.http.get(`/api/user/${params.id}`)
).subscribe(user => this.user = user);

//wrong - map wtih observable (doesn't flatten)
this.route.params.pipe(
    map(params => this.http.get(`/api/user/${params.id}`) // returns Observable<Observable<user>>
).subscribe(observable =>{
    observable.subscribe( user => this.user = user); // Nested subscribe -BAD!
});

Rule of thumb

  • Use map() for synchronous transformations
  • Use switchMap() when function returns an observable

Advanced lavel (3+ Years)

Q11: Explain momery leaks in RxJS and how to prevent them.

Answer Memory leaks occurs when subscriptions are not properly cleaned up, keeping refrence alive.

Common causes:

// Memory leak - never unsubscribed 
ngOnInit(){
    this.dataService.getData().subscribe(data => this.data = data);
    // Component destroyed but subscription lives on!
}

// Memory leak - interval keeps running 
ngOnDestroy(){
    interval(1000).subscribe(val => console.log(val));
    // runs forever even after component destroyedd
}

Prevenation strategies:

// 1. takeUntill pattern
private destroy$ = new Subject<void>();

ngOnInit(){
    this.dataService.getData()
    .pipe(takeUntill(this.destroy$))
    .subscribe(data => this.data = data);
}

ngDestroy(){
    this.destroy$.next();
    this.destroy$.complete();
}

// 2. Async pipe (automatically unsubscribes)
data$ = this.dataService.getData();
// Template: {{data| async}}

//3. take(1) for single emission
this.http.get('/api/data')
.pipe(take(1))
.subscribe(data => this.data = data);

// 4. Manual unsubscribe
private subscription: Subscription;

ngOnInit(){
    this.subscription = this.dataService.getData()
    .subscribe(data => this.data = data);
}

ngOnDestroy(){
    this.subscription.unsubscribe();
}

Q12: How would you implement search feature with debouncing and cancellation?

Answer:

export class SearchComponenent implements OnInit, OnDestroy{
    searchCntorl = new FormCntrol('');
    searchResult$: Observable<SearchResult[]>;
    loading = false;
    private destroy$ = new subject<void>();

    contructor(private searchService: SearchService){}

    ngOnInIt(){
        this.searchResult$ = this.searchControl.valueChanged.pipe(
            debounceTime(300), // wait 300ms after user stops typing
            denstinctUntillChanged(), //Only if value actually changed
            tap(() => this.loading = true), //Show loading indicator
            switchmap(query => {
                if(!query || query.trim().length ===0){
                    return of([]); // return empty for empty query
                }

                return this.searchService.search(query).pipe(
                    catchError(error => {
                        console.error('Search error:', error);
                        return of([]); //Return empty on error
                    })
                );
            }),
            tap(() => this.loading =false), // Hide loading indecator
            takeUntill(this.destroy$) // Cleanup on destroy
        );
    }
    ngDestroy(){
        this.destroy$.next();
        this.destroy$.complete();
    }
}

Template:

<input [fromCntorl]="searchControl" placholder="search....">
<div *ngIf= "loading">Loading....</div>
<div *ngFor = "let result of searchResult$ | async">
    {{result.name}}
</div>

Key Features:

  • Debaunce Input (300ms delay)
  • Cancle previous searches (switchMap)
  • Handles empty queries
  • Error handeling
  • Loading state
  • Proper cleanup

Q13: Explain shareReplay and when to use it.

Answer: shareRepaly() makes a cold Observables hot and replays the Last N emissiomns to new subscribers.

// without shareReplay -- multiple HTTP calls
const data$ = this.http.get('/api/config');

data$.subscribe(config => colsole.log('sub 1:', config)); // HTTP Call 1
data$.subscribe(config => colsole.log('sub 2:', config)); // HTTP Call 2
// 2 HTTP request!

//with shareReplay - single HTTP call, cached
const data$ = this.http.get('/api/config').pipe(
   shareReplay(1) // Cache last 1 emission
);
data$.subscribe(config => colsole.log('sub 1:', config)); // HTTP Call 
data$.subscribe(config => colsole.log('sub 2:', config)); // Uses cached value
// only 1 Http request

Use Cases:

// 1. Shared configuration
@Injectable({provideIn: 'root' })
export class Configservice{
   private config$ = this.http.get<Config>('/api/config').pipe(
       shareReplay(1)
   );

   getConfig(): Observalbe<Config>{
       return this.config$; //All subscribers share same request 
   }
}

// 2. User profile (loaded once)
@Injectable({provideIn: 'root' })
export class AuthService{
   currentUser$ = this.http.get<User>('/api/me').pipe(
       shareReplay(1)
   );
}

// reference data
categories$ = this.http.get<Category[]>('/api/cotegories').pipe(
   shareReply(1)
);

shareReplay(1) vs Behavioursubject:

  • shareReplay: Caches Observable, can't manually update
  • BehaviourSubject: Full control, can manually emit values

Q14: How would you handle multiple dependent API calls?

// 1. Sequential - each depends on previous (concatMap)
getUserWithDetails(userId: number){
    return this.http.get(`/api/users/${userId}`).pipe(
        concatMap(user =>
            this.http.get(`/api/users/${userId}/profile`).pipe(
                map(orders => ({...userWithProfile, orders}))
            )
        )
    );
}

// 2. Parallel -Independent calls (forkJoin)
getDashboardData(userId: number) {
    return forkJoin({
        profile: this.userService.getProfile(),
        stats: this.analyticsService.getStats(),
        notification: this.notificationService.getNotifications(),
        settings: this.settingService.getSettings()
    }).pipe(
        map(({user, stats, notifications, setting }) = ({
            user, 
            stats,
            notifications,
            settings
        }))
    );
}

// 3. Mixed - some sequential, some parallel
GetUserData(userId: number){
    return this.http.get(`/api/users/${userId}`).pipe(
        switchMap(user => 
            forkJoin({
                user: of(user), // Pass through
                profile: this.http.get(`/api/profiles/${user.profileId}`),
                orders: this.http.get(`/api/users/${user.id}/orders`),
                preferences: this.http.get(`/api/users/${user.id}/preferences`)
            })
        ),
        map(data => ({
            ...data.user,
            profile: data.profile,
            orders: data.orders,
            preferences: data.preferences
        }))
    );
}

// 4 with error handleing
getDashboardData(userId:number){
    return forkJoin({
        user: this.http.get(`/api/users/${userId}`).pipe(
            catchError(error =>{
                console.log('user fatch failed:', error);
                return of(null);
            })
        ),
        stats: this.http.get(`/api/users/${userId}`).pipe(
            catchError(error =>{
                console.log('user fatch failed:', error);
                return of({views:0, clicks: 0}); // Default stats
            })
        )
    });
}

Q15: What is the difference between Subject, BehaviorSubject, ReplaySubject, and AsyncSubject?

Answer:

TypeInitial ValueReplayEmission Timing
SubjectNoNoImmediate to all subscribers
BehaviourSubjectyes(required)Last ValueImmeditately to new subscribrts
ReplaySubjectNoLast N vlaueImmeditately to new subscribrts
AsyncSubjectNoOnly last valueOnly on Complete
// Subject - no replay
const subject = new Subject();
subject.next(1);
subject.subscribe(val => console.log('a:', val));
subject.next(2); // A: 2
subject.next(3); // A: 3

//Behavioursubject - replays current value
const behavior = new BehaviorSubject(0); //Requires initial value
behavior.next(1);
behavior.subscribe(val => console.log('B:', val)); // B: 1 (immediately)
behavior.next(2); // B: 2

//ReplaySubject - replays last N value 
const replay = new ReplaySubject(2) // Buffer size 2
replay.next(1);
replay.next(2);
replay.next(3);
replay.subscribe(val => console.log('R:', val)); // R: 2, 3
replay.next(4);//R: 4

//Async Subject - Only last value on complete
const async = new AsyncSubject();
async.next(1);
async.next(2);
async.next(3);
replay.subscribe(val => console.log('AS:', val)); // Nothing yet
replay.complete();//AS:3 (Only now)

Use Case:

  • Subject: Event bus, multicasting
  • BehaviorSubject: State management, current user
  • ReplaySubject: Chace recent values, chat history
  • AsyncSubject: Last result of computation

Practical scenarios

Scenarios 1: Implement Auto-save functionality

export class AutoSaveComponent implements OnInit, OnDestroy {
    from: FormGroup;
    saveStatus$ = new BehaviourSubject<string>('');
    private destroy$ = new Subject<void>();

    constructor(
        private fb: FormBuilder,
        private dataService: Dataservice
    ){
        this.form = this.fb.group({
            title: [''],
            contant: ['']
        });
    }

    ngOnInit(){
        this.form.valueChanges.pipe(
            debaunce(1000), // wait 1s after user stops typing
            distinctUntilChanged((prev, curr) => 
                JSON.stringify(prev) === JSON.stringify(curr)
            ),
            tap(() => this.saveStatus$.next('Saving...')),
            switchMap(formValue =>
                this.dataService.save(formValue).pipe(
                    catchError(error=> {
                        this.saveStatus$.next('Error saving');
                        return of(null);
                    })
                )
            ),
            takeUntil(this.destroy$)
        ).subscribe(() => {
            this.savestatus$.next('Saved');
            setTimeout(() => this.saveStatus$.next(''), 2000);
        });
    }

    ngOnDestroy(){
        this.destroy$.next();
        this.destroy$.complete();
    }
}

Senario 2: Implement Polling with start/stop

export class PollingComponent implements OnInit, OnDestroy{
    private polling$ = new Subject<boolean>();
    private destroy$ = new Subject<void>();
    data: any;

    constructor( private dataService: DataService){}

    ngOnInit(){
        this.polling$.pipe(
            switchMap(shouldPoll =>
                shouldPoll
                ? interval(5000).pipe(
                    startwith(0),
                    switchMap(() => this.dataService.getData())
                )
                :EMPTY
            ),
            takeUntil(this.destroy$)
        ).subscribe(data => this.data = data);
    }

    startPolling(){
        this.polling$.next(true);
    }
    stopPolling(){
        this.polling$.next(false);
    }

    ngDestroy(){
        this.destroy$.next();
        this.destroy$.complate();
    }
}

Scenerio 3: Type-ahead with Minimum Characters

export class TypeAheatComponent {
    searchCntrol = new FormCntrol('');
    suggestions$: Observable<string[]>;

    constructor(private searchService: SearchService){
        this.suggestions$ = this.searchControl.valueChanges.pipe(
            debounceTime(300),
            distinctUntilChanged(),
            filter(query => query.length >=3) // Min 3 characters
            switchMap(query => 
                this.searchService.getSuggestions(query).pipe(
                    CatchError(()=> of([]))
                )
            )
        );
    }
}

BEST PRACTICES

  • Use async pipe in template when possible
  • Always unsubscribe (or use pattern that auto-unsubscribe)
  • Handle errors gracefully
  • Use switchMap for search and navigation
  • Use forkJoin for parallel independent calls
  • Use shareReplay for caching
  • Keep Observables pure (use tap for side effects)
  • Use TypeScript typing for type safety

Comments

Popular posts from this blog