Let’s start from the basics and gradually progress towards more advanced concepts in RxJS in Angular
Observables and Observers:
Observables are the foundation of RxJS, representing a stream of data that can be observed over time. Observers are the consumers of these streams, listening for emitted data and reacting accordingly.
Observables and Observers are similar to the concepts of events and event handlers in C#.
Observables:
In Angular, you can create an Observable using the Observable
class from the rxjs
library. An Observable can emit data asynchronously over time, and can be subscribed by Observers to receive the emitted data.
Example in Angular:
import { Observable } from 'rxjs'; // Create an Observable that emits a stream of numbers const numbers$ = new Observable(observer => { let count = 1; const intervalId = setInterval(() => { observer.next(count++); }, 1000); // Cleanup logic when the Observable is unsubscribed return () => { clearInterval(intervalId); }; }); // Subscribe to the Observable to receive the emitted numbers numbers$.subscribe(value => console.log(value));
Observers:
In Angular, an Observer is an object that defines how to handle the emitted data from an Observable.
It has three optional methods:
next
for handling the emitted data
error
for handling any errors
complete
for handling the completion of the Observable.
Example in Angular:
import { Observable } from 'rxjs'; // Create an Observable that emits a stream of numbers const numbers$ = new Observable(observer => { let count = 1; const intervalId = setInterval(() => { observer.next(count++); }, 1000); // Cleanup logic when the Observable is unsubscribed return () => { clearInterval(intervalId); }; }); // Define an Observer to handle the emitted numbers const observer = { next: value => console.log(value), error: err => console.error(err), complete: () => console.log('Observable completed') }; // Subscribe the Observer to the Observable numbers$.subscribe(observer);
- Operators: Operators are functions that allow you to transform, filter, and combine data streams emitted by Observables.They are similar to LINQ operators in C#.
- Transformation Operators: Transformation operators allow you to transform the data emitted by an Observable into a different format or structure.
- Example in Angular with
map
operator:
import { of } from 'rxjs'; import { map } from 'rxjs/operators'; // Create an observable that emits a stream of numbers const numbers$ = of(1, 2, 3, 4, 5); // Use map operator to square each number in the stream const squaredNumbers$ = numbers$.pipe( map(num => num * num) ); // Subscribe to the transformed data stream squaredNumbers$.subscribe(value => console.log(value));
- Filtering Operators: Filtering operators allow you to filter the data emitted by an Observable based on a given condition.
Example in Angular with filter
operator:
import { of } from 'rxjs'; import { filter } from 'rxjs/operators'; // Create an observable that emits a stream of numbers const numbers$ = of(1, 2, 3, 4, 5); // Use filter operator to get only even numbers from the stream const evenNumbers$ = numbers$.pipe( filter(num => num % 2 === 0) ); // Subscribe to the filtered even numbers stream evenNumbers$.subscribe(value => console.log(value));
Subjects:
Subjects are both Observables and Observers, which allows you to emit and subscribe to data streams directly.
Subjects are similar to the concept of event emitters in C#.
Example in Angular with Subject
:
import { Subject } from 'rxjs'; // Create a Subject to emit and subscribe to a stream of numbers const numbersSubject$ = new Subject<number>(); // Subscribe to the Subject to receive emitted numbers numbersSubject$.subscribe(value => console.log(value)); // Emit numbers to the Subject numbersSubject$.next(1); numbersSubject$.next(2); numbersSubject$.next(3);
Hot and Cold Observables:
Observables can be categorized into two types
- Hot
- Cold.
Hot Observables emit data regardless of whether there are any subscribers Cold Observables only emit data when there are active subscribers.
Example in Angular with Hot and Cold Observables:
import { interval, fromEvent } from 'rxjs'; // Hot Observable - emits data regardless of subscribers const hotObservable$ = interval(1000); // Cold Observable - emits data only when subscribed const button = document.querySelector('button'); const coldObservable$ = fromEvent(button, 'click'); // Subscribe to both Observables hotObservable$.subscribe(value => console.log(`Hot: ${value}`)); coldObservable$.subscribe(value => console.log(`Cold: ${value}`));
Error handling:
RxJS provides operators for handling errors in Observables, such as catchError
and retry
, which allow you to handle errors and retries in a stream of data.
Example in Angular with error handling:
import { of } from 'rxjs'; import { catchError, retry } from 'rxjs/operators'; // Create an observable that may emit an error const numbers$ = of(1, 2, 3, 4, 5, 'six'); // Use catchError operator to handle errors const numbersWithErrorHandled$ = numbers$.pipe( catchError(err => of('Error occurred:', err)) ); // Use retry operator to retry the observable in case of error const numbersWithRetry$ = numbers$.pipe( retry(2) // Retry 2 times in case of error ); // Subscribe to the error handled and retried observables numbersWithErrorHandled$.subscribe(value => console.log(value)); numbersWithRetry$.subscribe(value => console.log(value));
Custom operators:
RxJS allows you to create your own custom operators by composing existing operators or extending the Observable
class. This gives you flexibility to create reusable and specialized operators for your specific use cases.
Example in Angular with a custom operator:
import { Observable, OperatorFunction } from 'rxjs'; import { map } from 'rxjs/operators'; // Custom operator to multiply each emitted number by a given factor function multiplyBy(factor: number): OperatorFunction<number, number> { return (source: Observable<number>) => source.pipe(map(num => num * factor)); } // Create an observable that emits a stream of numbers const numbers$ = of(1, 2, 3, 4, 5); // Use the custom multiplyBy operator to multiply each number by 10 const multipliedNumbers$ = numbers$.pipe(multiplyBy(10)); // Subscribe to the multiplied numbers multipliedNumbers$.subscribe(value => console.log(value));
Schedulers:
RxJS allows you to control the execution context or scheduler of an Observable. Schedulers provide options for managing concurrency, controlling timing, and executing code on specific threads or contexts.
Example in Angular with schedulers:
import { of, asyncScheduler } from 'rxjs'; import { observeOn } from 'rxjs/operators'; // Create an observable that emits a stream of numbers const numbers$ = of(1, 2, 3, 4, 5); // Use observeOn operator to specify an asyncScheduler for subscription const asyncNumbers$ = numbers$.pipe(observeOn(asyncScheduler)); // Subscribe to the numbers with asyncScheduler asyncNumbers$.subscribe(value => console.log(value)); // Use observeOn operator with other schedulers like asapScheduler or queueScheduler const asapNumbers$ = numbers$.pipe(observeOn(asapScheduler)); const queueNumbers$ = numbers$.pipe(observeOn(queueScheduler));
Multicasting:
By default, Observables are unicast, meaning each subscription creates a separate execution of the Observable.
However, you can multicast Observables to share a single execution among multiple subscribers, which can improve performance and reduce duplicated work.
Example in Angular with multicasting:
import { interval, Subject } from 'rxjs'; import { multicast, refCount } from 'rxjs/operators'; // Create a hot Observable that emits numbers every second const numbers$ = interval(1000).pipe(multicast(() => new Subject()), refCount()); // Subscribe to the hot Observable from multiple subscribers numbers$.subscribe(value => console.log(`Subscriber 1: ${value}`)); numbers$.subscribe(value => console.log(`Subscriber 2: ${value}`)); // Start the execution of the hot Observable numbers$.connect();
Customizing the Observable:
You can also customize the behavior of an Observable by extending the Observable
class and implementing your own logic for emitting values, handling errors, and managing subscriptions.
Example in Angular with a custom Observable:
import { Observable } from 'rxjs'; // Custom Observable that emits a sequence of numbers class MyNumbersObservable extends Observable<number> { private currentNumber = 1; constructor(private maxNumber: number) { super(subscriber => { const intervalId = setInterval(() => { if (this.currentNumber <= maxNumber) { subscriber.next(this.currentNumber++); } else { subscriber.complete(); clearInterval(intervalId); } }, 1000); }); } } // Create an instance of the custom Observable const myNumbers$ = new MyNumbersObservable(5); // Subscribe to the custom Observable myNumbers$.subscribe(value => console.log(value));
Backpressure:
RxJS provides mechanisms for handling backpressure, which occurs when the rate of emission from an Observable is higher than the rate of consumption by subscribers. Backpressure strategies allow you to control how data is buffered, dropped, or managed when dealing with high-rate data streams.
Example in Angular with backpressure:
import { interval, bufferTime } from 'rxjs'; // Create a fast-emitting Observable that emits numbers every 100ms const fastNumbers$ = interval(100); // Use bufferTime operator to buffer emitted numbers for every 1 second const bufferedNumbers$ = fastNumbers$.pipe(bufferTime(1000)); // Subscribe to the buffered numbers bufferedNumbers$.subscribe(values => console.log(values));
Error Handling:
RxJS provides operators for handling errors that may occur in the Observable stream.
You can catch and handle errors, retry failed Observables, and take other actions to gracefully handle errors in your application.
Example in Angular with error handling:
import { of } from 'rxjs'; import { catchError, retry } from 'rxjs/operators'; // Create an Observable that may throw an error const numbers$ = of(1, 2, 3, 4, 5, 'invalid', 7, 8, 9); // Use catchError operator to catch and handle errors const safeNumbers$ = numbers$.pipe( catchError(error => { console.error(`Error: ${error}`); return of('Error occurred. Continuing with default value.'); }) ); // Use retry operator to retry failed Observables const retryNumbers$ = safeNumbers$.pipe( retry(2) // Retry failed Observables up to 2 times ); // Subscribe to the safe and retrying numbers retryNumbers$.subscribe(value => console.log(value));
Custom Operators:
RxJS allows you to create custom operators by combining existing operators or by implementing your own logic for transforming or filtering values in the Observable stream. Custom operators can provide reusable and specialized functionality for your specific use cases.
Example in Angular with a custom operator:
import { Observable, OperatorFunction } from 'rxjs'; import { filter } from 'rxjs/operators'; // Custom operator that filters out odd numbers function filterOutOddNumbers(): OperatorFunction<number, number> { return (source: Observable<number>) => new Observable<number>(subscriber => { return source.subscribe(value => { if (value % 2 === 0) { subscriber.next(value); } }); }); } // Create an Observable that emits a sequence of numbers const numbers$ = of(1, 2, 3, 4, 5); // Use the custom filterOutOddNumbers operator const filteredNumbers$ = numbers$.pipe(filterOutOddNumbers()); // Subscribe to the filtered numbers filteredNumbers$.subscribe(value => console.log(value));
This is just a brief overview of some of the basic to advanced concepts and features of RxJS. RxJS is a powerful and flexible library that can greatly simplify and enhance your asynchronous programming in Angular or any other JavaScript environment. I recommend referring to the official RxJS documentation for more in-depth explanations and examples.
I hope this detailed blog post on Observables and subject in Rxjs with examples. Remember to apply these concepts wisely in your code and make use of the code examples provided to enhance your understanding.
Happy coding!
Please let me know your thoughts on this story by clapping or leaving a comment with suggestions for future topics.
- 登录 发表评论