RXJS
#RXJS INTERVIEW GUIDE
Table of Contents
- Introduction to RxJS
- Core concepts
- Subjects
- Operators
- Common patterns
- Interview questions
- Pratical Scenarios
Table of Contents
RxJS (Reactive Extentions for JavaScript) is a library for reactive programming using Observables, making it easier to compose asyncchronous or callback-based code.
Key Benefits
- Handles async operation elegaintly
- Provides powerfull operators for data transformation
- Manages complex event sequences
- Enables cancellation of async operation
- Reduces callback hell
Core Concepts
1. Observable
An Observables is a lazy Push collection of multiple values over time.
import {Observable} from 'rxjs';
// Creating an Observable
const observable = new observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
//subscribing to an Observable
observable.subscribe({
next: (value)=> console.log(value),
error: (err)=> console.log(err),
complete: ()=> console.log('complete')
});
Key Points:
- Observables are lazy - they don't execute untill subscribed
- Can emait multiple value over time
- Can be synchronous or asynchronous
- Must be subscribed to start executation
2. Observer
An Observer is a consumer of value delavered by an observables.
const observer = {
next: (value)=> console.log(value),
error: (err)=> console.log(err),
complete: ()=> console.log('complete')
};
observable.subscribe(observer);
Observer Interface
next(value)- recevies emitted valueserror(value)- handle errorscomplate()- called when stream completes
Subjects
1. Subject
A Subject is both Observable & an Observer. It can multicast values to multiple observers.
import {subject} from 'rxjs';
const subject = new Subject<number>();
//Multiple subscribers
subject.subscribe(value => console.log('Observer A:', Value));
subject.subscribe(value => console.log('Observer B:', Value));
//Emit values
subject.next(1); // both subscriber receive 1
subject.next(2); // both subscriber receive 2
Use Cases:
- Event bus implementation
- Share data between components
- Multicast to Multiple observers
Important: Subject don't have initial value and late subscribers won't receive previous values.
2. BehaviorSubject
A BehaviourSubject holds one current value and emits it immediatey to new subscribers.
import {BehaviorSubject} from 'rxjs'
const behaviorSubject = new BehaviorSubject<number>(0); // initial value
behaviorSubject.subscribe(value => console.log('Observer A:', value)) // Receives 0
behaviorSubject.next(1);
behaviorSubject.next(2);
behaviorSubject.subscribe(value => console.log('Observer B:', value)) // Receives 2 (current value)
behaviorSubject.next(3); //both receive 3
Use Case:
- State management
- Current user data
- Configuration setting
- Form values
Key difference from Subject:
- Requires initial value
- Stores the latest value
- New Subscriber get the current value immediately
3. Replay Subject
import {ReplaySubject} from 'rxjs';
const replaySubject = new ReplaySubject<number>(2); // Buffer size 2
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
replaySubject.subscribe(value => console.log('Observer:', value)) // Receives 2, 3
4. AsyncSubject
import {AsyncSubject} from 'rxjs';
const asyncSubject = new AsyncSubject<number>(2); // Buffer size 2
asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.next(3);
asyncSubject.complete(); // Only emit 3 on complete
asyncSubject.subscribe(value => console.log('Observer:', value)) // Receives 3
Operators
1. Pipe()
The pipe() method is compose multiple operators together.
import {of} from 'rxjs';
import {map, fillter} from 'rxjs/operators';
of(1,2,3,4,5)
.pipe(
filter(x=> x%2 ===0),
map(x=> x*10)
)
.subscribe(console.log); //20, 40
Why pipe()?
- Chains multiple operators
- Better tree-shaking
- Improved performance
- Cleaner syntex
2. map()
Transforms each value using a projection function.
import {of} from 'rxjs';
import {map} from 'rxjs/operators';
//Eg1: simple transformation
of(1,2,3)
.pipe(
map(x=> x* 2)
)
.subscribe(console.log); //2, 4, 6
//Eg2: Object transformation
interface User {id: number; name: string; }
of({id: 1, name: 'sudhanshu'})
.pipe(
map((user:User)=> user.name)
)
.subscribe(console.log); // sudhanshu
//Eg3: API responce mapping
this.http.get<ApiResponse>('/api/users')
.pipe(
map(response => response.data),
map(users => users.map(u=> ({...u, fullName: `${u.firstname} ${u.lastname}`})))
)
.subscribe(users => console.log(users));
filter()
Filters value based on a predicate function.
import {of} from 'rxjs';
import {fillter} from 'rxjs/operators';
//Example1: Filter even numbers
of(1,2,3,4,5,6)
.pipe(
filter(x=> x%2 ===0)
)
.subscribe(console.log) //2, 4, 6
//Exampl 2: Fillter null/undefined
of(1, null, 3, undefined, 5)
.pipe(
filter(x=> x !=null)
)
.subscribe(console.log) // 1, 3, 5
//filter objects
interface Product {id: number; inStock: boolean; }
of(
{id:1, inStock: true},
{id:1, inStock: false},
{id:1, inStock: true}
).pipe(filter(product=> product.inStock))
.subscribe(console.log); // Only in-stock products
4. tap()
Performs side effects without modifying the stream.
import {of} from 'rxjs';
import {map, tap} from 'rxjs/operators';
//Eg 1: logging
of(1,2,3)
.pipe(
tap(x=> console.log('Before:' x)),
map(x=> x*2),
tap(x=> console.log('After:' x))
).subscribe();
//Eg 2: Debugging API calls
this.http.get<ApiResponse>('/api/users')
.pipe(
tap(response => console.log('API Response: ', response)),
tap(() => this.loading = false),
map(response => response.data)
)
.subscribe(users => console.log(users));
// Eg 3: Side effects without modification
of('data')
.pipe(
tap(data => localStroage.setItem('cache', data)),
tap(data => this.analytics.track('data-fetched'))
)
.subscribe(consol.log) // 'data' passes through unchanged
Use Cases
- Debugging
- Logging
- Caching
- Analytics
- Setting flags (loading, error, states)
5. switchMap()
Cancels previous inner observable when new value arrives.
import {fromEvent} from 'rxjs';
import {switchMap, debonceTime} from 'rxjs/operators';
// Eg 1: Search autocomplete
const searchBox = document.getElementById('search');
fromEvent(searchBox, 'input')
.pipe(
debounceTime(300),
switchMap(event => this.http.get(`/api/search?q=${event.traget.value}`))
)
.subscribe( result => console.log(result));
// Eg 2: Route params to Http request
this.route.params.pipe(
switchMap(params => this.http.get(`/api/users/${params.id}`))
)
.subscribe(user => this.user = user);
// Eg 3: Dependent requestes
this.getUserId().pipe(
switchMap(userId => this.http.get(`/api/users/${userId}`))
switchMap(user => this.http.get(`/api/users/${user.id}`))
)
.subscribe(orders => console.log(orders));
Key Points
- Cancles previous subscription
- Prefect for search/autocomplete
- Prevents race conditions
- Only latest request matters
6. margeMap() / flatMaph()
Does NOT cancel previous inner observables. All run in parallel.
import {of} from 'rxjs';
import {mergeMap, delay} from 'rxjs/oprators';
//Eg 1: parallel api calls
of(1,2,3)
.pipe(
margeMap(id => this.http.get(`/api/users/${id}`))
)
.subscribe(user => console.log(user)); // All 3 request run in parallel
//Eg 2: File upload
const files = [file1, file2, file3];
from(files)
.pipe(
margeMap(file => this.uploadFile(file))
)
.subscribe(result => console.log(result));
// Eg 3: Concurrent limit
from([1,2,3,4,5]).pipe(
margeMap(
id => this.http.get(`/api/users/${id}`),
3 // Max 3 Concurrent requests
)
).subscribe(console.log);
Key Differences from switchMap:
- does not cancel previous ubcriptions
- on inner observables run concurrently
- Use when order doesn't matter and all request are important
7. concatMap()
waits for previous inner observable to complete before starting next one.
import {of} from 'rxjs';
import {concatMap, delay} from 'rxjs/operators';
of(1,2,3)pipe(
concatMap(id => this.http.get(`/api/log/${id}`).pipe(delay(1000)))
)
.subscribe(console.log) // Executes sequentially 1, then 2, then 3
Use Case:
- Sequential operations
- Depentent API calls
- Maintaining order
8. debounceTime()
Delays emission until specified time has passed without another emmission.
import {fromEvent} from 'rxjs';
import {debounceTime, distinctUntilChanged, switchMap} from 'rxjs/operators';
//Eg 1: search input
const searchInput = document.getElementById('search');
fromEvent(searchInput, 'input').pipe(
debounceTime(300), // Wait 300ms after user stops typing
distinctUntilChanged(),
switchMap(event => this.searchService.search(event.target.value))
).subscribe(result => this.seachResult = result);
// Eg 2: window resize
formEvent(window, 'resize')pipe(
debounceTime(500)
).subscirbe(() => this.handleResize());
//Eg 3: Form value changes
this.myForm.valueChanges.pipe(
debounceTime(500)
).subscribe(values => this.autoSave(values));
Use Case
- Search autocomplete
- Form autosave
- Window resize handlers
- Scroll events Optimization: Prevents excessive API calls!
9. forkJoin()
Waits for all observables to complete, then emits an array of last values.
import {forkJoin} from 'rxjs';
// Eg 1: Multiple API calls
forkJoin({
users: this.http.get('/api/users'),
posts: this.http.get('/api/posts'),
comments: this.http.get('/api/comments')
}). subscribe(res => {
concole.log(res.users);
concole.log(res.posts);
concole.log(res.comments);
})
// Eg 2: Array Format
forkJoin([
this.http.get('/api/users/1'),
this.http.get('/api/users/2'),
this.http.get('/api/users/3')
]).subscribe(([user1, user2, user3]) => {
concole.log(user1, user2, user3);
})
// Eg 3:Dashboard data loading
forkJoin({
profile: this.userService.getProfile(),
stats: this.analyticsService.getStats(),
notification: this.notificationService.getNotifications(),
settings: this.settingService.getSettings()
}).pipe(
tap(() => this.loading=fase)
).subscribe(data => {
this.initializeDashboard(data);
});
Key Points
- Simiar to
Promise.all() - All observables must complete
- If any observable errors, the whole thing errors
- runs in parallel
Important If any observable never completes, forkjoin will never emit!
10. combinelatest()
Emits whenever any observable emits, combining the latest values from all.
import {combineLatest} from 'rxjs';
import {startwith} from 'rxjs/operators';
// Eg 1: Form filters
combineLatest([
this.categoryFilters$,
this.priceFilters$,
this,searchFilters$
]).pipe(
switchMap(([category, price, search]) =>
this.productService.getProducts({category, price, search })
)
).subscribe(products => this.products =products);
// Eg 2: User Permissions
combineLatest({
user: this.authService.user$,
permissions: this.permissionsService.permission$,
setting: this.settingService.settings$
}).subscribe(({user, permissions, setting}) => {
this.canEdit = this.checkPermission(user, permissions);
this.applySettings(settings);
});
// Eg 3: Reactive form combination
combineLatest([
this.firstNameControl.valueChanges.pipe(startwith('')),
this.lastNameControl.valueChanges.pipe(startwith(''))
]).pipe(
map(([firstName, lastName]) => `${firstName} ${lastName}`.trim())
).suscribe(fullName => this.fullName = fullName);
Key Difference from ForkJoin
- Emits multiple times (whenever any source emits)
- Doesn't wait for completion
- Always use the latest value from each observable
- Needs all observables to emit at least once before first emission
Other Important operators
//distinctUntilChanged - Only emit when value changes
of(1,1,2,2,3,3).pipe(
distinctUntilChanged()
).subscribe(console.log); // 1,2,3
//take - first N emmissions
of(1,2,3,4,5).pipe(
take(3)
).subscribe(console.log); // 1,2,3
//takeuntil - Take until another observable emits
const destroy$ = new Subject()
this.dataService.getData().pipe(
takeUntill(destroy$)
).subscribe(data => this.data = data);
// catchError - Handle errors
this.http.get('/api/data').pipe(
catchError(error =>{
cosole.error(error);
return of([]); // Return default value
})
).subscribe(data => this.data = data);
//retry - Retry on error
this.http.get('/api/data').pipe(
retry(3), // retry 3 times
catchError(error => of([]))
).subscribe(data => this.data = data);
//delay - Delay emmissions
of('data').pipe(delay(1000))
..subscribe(console.log); //after 1 second
//share - Share singe subscription among multiple subscribers
const shared$ = this.http.get('/api/data').pipe(share());
Common Patterns
1. serch wtih debounce
serachCotrol = new FormCntrol();
ngOnInit(){
this.searchCntrol.valueChanges.pipe(
debounceTime(300),
dinstinctUntilChanged(),
switchMap(query =>this.searchService.search(query))
).subscribe(res => this.res = res)
}
2. Unsubscribe pattern
export class MyComponent implements OnDestroy{
private distroy$ = new Subject<void>();
ngOnInit(){
this.dataService.getData().pipe(
takeUntill(this.distroy$)
).subscribe(data => this.data = data);
}
ngDistroy(){
this.distroy$.next();
this.distroy$.complate();
}
}
3. Loading state Pattern
getData(){
this.loading = true;
this.http.get('/api/data').pipe(
tap(()=> this.loading =false),
catchError(error =>{
this.loading =false;
this.error = error;
return of([]);
})
).subscribe(data => this.data = data);
}
4. Pooling Pattern
import {interval} from 'rxjs';
// poll every 5 seconds
interval(5000).pipe(
switchMap(()=> this.http.get('/api/status/')),
takeUntill(this.distroy$)
).subscribe(status => this.status = status);
Interview Questions
Q1: What is RxJS and why is it used in angular?
Answer RxJS is a library for reactive programming using Observables. It's used in Angular beacause:
- Angular's HttpClient returns Observbles
- Reactive froms use Observbles
- Router and ActiveRoute use Observbles
- Event handling easier
- Enables powerful data tranformation with operators
Q2: What is difference between Observbles and Promise?
Answer:
| Observbles | Promise |
|---|---|
| Lazy (doesn't execute until subscribed) | Eager (executes immediatly) |
| Can emit muntiple values over time | Emit single value |
| Cancellable (via unsubscribe) | Not cancellable |
| Provides operators (map, filter, etc.) | Limited transfromation (then, catch) |
| Can be synchronous or asynchronous | Always asynchronous |
//Promise - executes immediately
const promise = new Promise( resolve => {
console.log('Promise eecuted'); // Logs immediately
resolve('data');
})
//Observble - lazy
const Observbles = new Observbles(subscriber =>{
console.log('Observbles executed'); // Only logs when subscribed
subscriber.next('data');
});
Observbles.subscribe(); // Now it executes
Q3: what is difference between Subject and BehaviourSubject?
Answer:
| Subject | BehaviourSubjct |
|---|---|
| No intitial value required | Required initial value |
| New Subscriber don't get previous value | New Subscriber get current value immeditely |
| no concept of "current value" | Has .getValue() method |
Intermediate lavel (1-3 years)
Q4: What is difference betwween switchMap, mergeMap, and concatMap?
| Operator | Behavior | Use Case |
|---|---|---|
| switchMap | Cancels previous Oservables | Search route params |
| mergeMap | Runs all in parallel, doesn't cancel | Parallel API File uploads |
| *concatMap | Sequntial, wait for previous to complete | Sequential opwrations |
Q5: Explain debounceTime with real-world example.
Answer: debounceTime delays emission until a specified time has passed without another emission. Perfect for search inputs to avoid excessive api calls.
//without debouncetime - API call on every keystroke
// User type "angular" = 7 API call!
// with debounceTime - API call only after user stop typing
searchInput.valueChanges.pipe(
debounceTime(300), // wait 300ms after user stop typing
distinctUntillChanged(), // Only if value changed
switchMap(query => this.searchService.search(query))
).subscribe(res => this.res= res);
// User types "angualr" (7 krystrokes) = 1 api call!
Real world impact:
- Reduces server load
- Improves performance
- Batter user exprience
- Saves bandwidth
Q6: What is the purpose of tap() oprator?
Answer: tap() is used for side effects without modifying the stream. It's transparent - values through unchanged.
this.http.get('/api/users').pipe(
tap(response => console.log('API response:' response)), // Debugging
tap(()=> this.loading =false), // Ubdate loading state
tab(data => localStroage.setItom('cache', JSON.stringify(data))), //Caching
map( response => response.data) // Transfrom data
).subscribe(users => this.users=users);
Common use cases:
- Logging/debugging
- Setting loading/error flag
- caching
- Analytics tracking
- Side effect that don't modify data
Q7: When would you use forkJoin vs combineLatest?
Answer:
forkJoin: Use when you need to wait for all observebles to complate, then get all final values at once (like promise.all()).
//loading dashboard - need all data before showing
forkJoin({
users: this.http.get('/api/users'),
posts: this.http.get('/api/posts'),
comments: this.http.get('/api/comments')
}).subscribe(({user, posts, comments})=> {
this.initDashboard(users, posts, comments);
this.loading= false;
});
combineLatest: Use when you need to react to changes in any observables, combining their latest values.
// Filtering product - update whenever any filter changes
combineLatest([
this.categoryFilter$,
this.priceFilter$,
this.searchTerm$
]).pipe(
switchMap(([category, price, search]) =>
this.getProducts(category, price, search)
)
).subscribe(product => this.products = products)
Key Differences:
- forkJoin: Emits once when all complete
- combineLatest: Emits every time any source emits
- forkJoin: HTTP requests
- combineLatest: Reactive filters, form controls
Q8 How do you unsubscribe from observable in Angular?
Answer: There are several patterns:
// 1. Munal unsubscribe
export class MyComponent inplement OnDestroy{
private subscription: Subscription;
ngOnInit(){
this.subscription = this.dataService.getData()
.subscribe(data => this.data =data);
}
ngDestroy(){
this.subscription.unsubscribe();
}
}
// 2. takeUntill pattern (recommended)
export class MyComponent inplement OnDestroy{
private destroy$ = new Subject<void>();
ngOnInit(){
this.dataService.getData().pipe(
takeUntill(this.destroy$)
)
.subscribe(data => this.data =data);
}
ngDestroy(){
this.destroy$.next();
this.destroy$.complete();
}
}
// 3. Async pipe (bast for templates)
//Component
data$ = this.dataService.getData();
//Template
<div *ngFor="let item of data$ |async">{{item}}</div>
<!-- Automatically unsubscribe -->
// 4. take(1) for single emmission
this.http.get('/api/data').pipe(take(1))
.subscribe(data => this.data = data);
Q9: What is the concep of Hot and Cold Obesarvables.
Answer:
Cold Observable:
- Creates data produser for each subscriber
- Each subscriber gets independent execution
- Exapmle: Http requests, intervals
const cold$ = this.http.get('/api/data');
cold$.subscribe(data => console.log('sub 1:' data)); // New request
cold$.subscribe((data => console.log('sub 2:' data)); // another New request
// 2 http request mode!
Hot Observable:
- Shares single data produser among all subscribers
- All subscribers share same execution
- Example : Subject, DOM event, shared streams
const hot$ = this.http.get('/api/data/').pipe(share());
hot$.subscribe(data => console.log('sub 1:' data)); // Makes request
hot$.subscribe((data => console.log('sub 2:' data)); // Share same request
// only 1 http request made!
Making cold Observable to Hot:
const shared$ = this.http.get('/api/data/').pipe(
share() // or shareReplay(1)
);
Q10: What is differnce between map() and switchMap()?
Answer:
map() Transforms emitted values synchronously.
- Input: value -> Output: transfromed value
- Doesn't handel Observebales
switchMap() Projects each value as observables, cancels previous inner observable.
- Input: value -> Output: Oservable
- Handles nested Observables (flattening)
// map - simple transformation
of(1,2,3).pipe(
map(x => x*2)
)subscribe(console.log); // 2,4,6
// switchMap - Observable transformation
this.route.params.pipe(
switchMap(params => this.http.get(`/api/user/${params.id}`)
).subscribe(user => this.user = user);
//wrong - map wtih observable (doesn't flatten)
this.route.params.pipe(
map(params => this.http.get(`/api/user/${params.id}`) // returns Observable<Observable<user>>
).subscribe(observable =>{
observable.subscribe( user => this.user = user); // Nested subscribe -BAD!
});
Rule of thumb
- Use
map()for synchronous transformations - Use
switchMap()when function returns an observable
Advanced lavel (3+ Years)
Q11: Explain momery leaks in RxJS and how to prevent them.
Answer Memory leaks occurs when subscriptions are not properly cleaned up, keeping refrence alive.
Common causes:
// Memory leak - never unsubscribed
ngOnInit(){
this.dataService.getData().subscribe(data => this.data = data);
// Component destroyed but subscription lives on!
}
// Memory leak - interval keeps running
ngOnDestroy(){
interval(1000).subscribe(val => console.log(val));
// runs forever even after component destroyedd
}
Prevenation strategies:
// 1. takeUntill pattern
private destroy$ = new Subject<void>();
ngOnInit(){
this.dataService.getData()
.pipe(takeUntill(this.destroy$))
.subscribe(data => this.data = data);
}
ngDestroy(){
this.destroy$.next();
this.destroy$.complete();
}
// 2. Async pipe (automatically unsubscribes)
data$ = this.dataService.getData();
// Template: {{data| async}}
//3. take(1) for single emission
this.http.get('/api/data')
.pipe(take(1))
.subscribe(data => this.data = data);
// 4. Manual unsubscribe
private subscription: Subscription;
ngOnInit(){
this.subscription = this.dataService.getData()
.subscribe(data => this.data = data);
}
ngOnDestroy(){
this.subscription.unsubscribe();
}
Q12: How would you implement search feature with debouncing and cancellation?
Answer:
export class SearchComponenent implements OnInit, OnDestroy{
searchCntorl = new FormCntrol('');
searchResult$: Observable<SearchResult[]>;
loading = false;
private destroy$ = new subject<void>();
contructor(private searchService: SearchService){}
ngOnInIt(){
this.searchResult$ = this.searchControl.valueChanged.pipe(
debounceTime(300), // wait 300ms after user stops typing
denstinctUntillChanged(), //Only if value actually changed
tap(() => this.loading = true), //Show loading indicator
switchmap(query => {
if(!query || query.trim().length ===0){
return of([]); // return empty for empty query
}
return this.searchService.search(query).pipe(
catchError(error => {
console.error('Search error:', error);
return of([]); //Return empty on error
})
);
}),
tap(() => this.loading =false), // Hide loading indecator
takeUntill(this.destroy$) // Cleanup on destroy
);
}
ngDestroy(){
this.destroy$.next();
this.destroy$.complete();
}
}
Template:
<input [fromCntorl]="searchControl" placholder="search....">
<div *ngIf= "loading">Loading....</div>
<div *ngFor = "let result of searchResult$ | async">
{{result.name}}
</div>
Key Features:
- Debaunce Input (300ms delay)
- Cancle previous searches (switchMap)
- Handles empty queries
- Error handeling
- Loading state
- Proper cleanup
Q13: Explain shareReplay and when to use it.
Answer: shareRepaly() makes a cold Observables hot and replays the Last N emissiomns to new subscribers.
// without shareReplay -- multiple HTTP calls
const data$ = this.http.get('/api/config');
data$.subscribe(config => colsole.log('sub 1:', config)); // HTTP Call 1
data$.subscribe(config => colsole.log('sub 2:', config)); // HTTP Call 2
// 2 HTTP request!
//with shareReplay - single HTTP call, cached
const data$ = this.http.get('/api/config').pipe(
shareReplay(1) // Cache last 1 emission
);
data$.subscribe(config => colsole.log('sub 1:', config)); // HTTP Call
data$.subscribe(config => colsole.log('sub 2:', config)); // Uses cached value
// only 1 Http request
Use Cases:
// 1. Shared configuration
@Injectable({provideIn: 'root' })
export class Configservice{
private config$ = this.http.get<Config>('/api/config').pipe(
shareReplay(1)
);
getConfig(): Observalbe<Config>{
return this.config$; //All subscribers share same request
}
}
// 2. User profile (loaded once)
@Injectable({provideIn: 'root' })
export class AuthService{
currentUser$ = this.http.get<User>('/api/me').pipe(
shareReplay(1)
);
}
// reference data
categories$ = this.http.get<Category[]>('/api/cotegories').pipe(
shareReply(1)
);
shareReplay(1) vs Behavioursubject:
- shareReplay: Caches Observable, can't manually update
- BehaviourSubject: Full control, can manually emit values
Q14: How would you handle multiple dependent API calls?
// 1. Sequential - each depends on previous (concatMap)
getUserWithDetails(userId: number){
return this.http.get(`/api/users/${userId}`).pipe(
concatMap(user =>
this.http.get(`/api/users/${userId}/profile`).pipe(
map(orders => ({...userWithProfile, orders}))
)
)
);
}
// 2. Parallel -Independent calls (forkJoin)
getDashboardData(userId: number) {
return forkJoin({
profile: this.userService.getProfile(),
stats: this.analyticsService.getStats(),
notification: this.notificationService.getNotifications(),
settings: this.settingService.getSettings()
}).pipe(
map(({user, stats, notifications, setting }) = ({
user,
stats,
notifications,
settings
}))
);
}
// 3. Mixed - some sequential, some parallel
GetUserData(userId: number){
return this.http.get(`/api/users/${userId}`).pipe(
switchMap(user =>
forkJoin({
user: of(user), // Pass through
profile: this.http.get(`/api/profiles/${user.profileId}`),
orders: this.http.get(`/api/users/${user.id}/orders`),
preferences: this.http.get(`/api/users/${user.id}/preferences`)
})
),
map(data => ({
...data.user,
profile: data.profile,
orders: data.orders,
preferences: data.preferences
}))
);
}
// 4 with error handleing
getDashboardData(userId:number){
return forkJoin({
user: this.http.get(`/api/users/${userId}`).pipe(
catchError(error =>{
console.log('user fatch failed:', error);
return of(null);
})
),
stats: this.http.get(`/api/users/${userId}`).pipe(
catchError(error =>{
console.log('user fatch failed:', error);
return of({views:0, clicks: 0}); // Default stats
})
)
});
}
Q15: What is the difference between Subject, BehaviorSubject, ReplaySubject, and AsyncSubject?
Answer:
| Type | Initial Value | Replay | Emission Timing |
|---|---|---|---|
| Subject | No | No | Immediate to all subscribers |
| BehaviourSubject | yes(required) | Last Value | Immeditately to new subscribrts |
| ReplaySubject | No | Last N vlaue | Immeditately to new subscribrts |
| AsyncSubject | No | Only last value | Only on Complete |
// Subject - no replay
const subject = new Subject();
subject.next(1);
subject.subscribe(val => console.log('a:', val));
subject.next(2); // A: 2
subject.next(3); // A: 3
//Behavioursubject - replays current value
const behavior = new BehaviorSubject(0); //Requires initial value
behavior.next(1);
behavior.subscribe(val => console.log('B:', val)); // B: 1 (immediately)
behavior.next(2); // B: 2
//ReplaySubject - replays last N value
const replay = new ReplaySubject(2) // Buffer size 2
replay.next(1);
replay.next(2);
replay.next(3);
replay.subscribe(val => console.log('R:', val)); // R: 2, 3
replay.next(4);//R: 4
//Async Subject - Only last value on complete
const async = new AsyncSubject();
async.next(1);
async.next(2);
async.next(3);
replay.subscribe(val => console.log('AS:', val)); // Nothing yet
replay.complete();//AS:3 (Only now)
Use Case:
- Subject: Event bus, multicasting
- BehaviorSubject: State management, current user
- ReplaySubject: Chace recent values, chat history
- AsyncSubject: Last result of computation
Practical scenarios
Scenarios 1: Implement Auto-save functionality
export class AutoSaveComponent implements OnInit, OnDestroy {
from: FormGroup;
saveStatus$ = new BehaviourSubject<string>('');
private destroy$ = new Subject<void>();
constructor(
private fb: FormBuilder,
private dataService: Dataservice
){
this.form = this.fb.group({
title: [''],
contant: ['']
});
}
ngOnInit(){
this.form.valueChanges.pipe(
debaunce(1000), // wait 1s after user stops typing
distinctUntilChanged((prev, curr) =>
JSON.stringify(prev) === JSON.stringify(curr)
),
tap(() => this.saveStatus$.next('Saving...')),
switchMap(formValue =>
this.dataService.save(formValue).pipe(
catchError(error=> {
this.saveStatus$.next('Error saving');
return of(null);
})
)
),
takeUntil(this.destroy$)
).subscribe(() => {
this.savestatus$.next('Saved');
setTimeout(() => this.saveStatus$.next(''), 2000);
});
}
ngOnDestroy(){
this.destroy$.next();
this.destroy$.complete();
}
}
Senario 2: Implement Polling with start/stop
export class PollingComponent implements OnInit, OnDestroy{
private polling$ = new Subject<boolean>();
private destroy$ = new Subject<void>();
data: any;
constructor( private dataService: DataService){}
ngOnInit(){
this.polling$.pipe(
switchMap(shouldPoll =>
shouldPoll
? interval(5000).pipe(
startwith(0),
switchMap(() => this.dataService.getData())
)
:EMPTY
),
takeUntil(this.destroy$)
).subscribe(data => this.data = data);
}
startPolling(){
this.polling$.next(true);
}
stopPolling(){
this.polling$.next(false);
}
ngDestroy(){
this.destroy$.next();
this.destroy$.complate();
}
}
Scenerio 3: Type-ahead with Minimum Characters
export class TypeAheatComponent {
searchCntrol = new FormCntrol('');
suggestions$: Observable<string[]>;
constructor(private searchService: SearchService){
this.suggestions$ = this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
filter(query => query.length >=3) // Min 3 characters
switchMap(query =>
this.searchService.getSuggestions(query).pipe(
CatchError(()=> of([]))
)
)
);
}
}
BEST PRACTICES
- Use async pipe in template when possible
- Always unsubscribe (or use pattern that auto-unsubscribe)
- Handle errors gracefully
- Use switchMap for search and navigation
- Use forkJoin for parallel independent calls
- Use shareReplay for caching
- Keep Observables pure (use tap for side effects)
- Use TypeScript typing for type safety
Comments
Post a Comment