Reactive Programming – Working with RxJS

By.

min read

RxJS is a library for composing asynchronous and event-based programs by using observable sequences. It provides one core type, the Observable, satellite types (Observer, Schedulers, Subjects) and operators inspired by  Array methods (map, filter, reduce, every, etc) to allow handling asynchronous events as collections.

RxJS and Angular

RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using observables that makes it easier to compose asynchronous or callback-based code. Angular is made up of RxJs and it’s around concepts. Angular is composed of components and we see the importance of data streams in programming. Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change.

In general, We can say that any change that happens to the data streams will affect the values and other data streams which is related.

Observables and Observer

Observables are lazy Push collections of multiple values.

Example. The following is an Observable that pushes the values 123 immediately (synchronously) when subscribed, and the value 4 after one second has passed since the subscribe call, then completes:

import { Observable } from 'rxjs';

 const observable = new Observable(subscriber => {
   subscriber.next(1);
   subscriber.next(2);
   subscriber.next(3);
   setTimeout(() => {
     subscriber.next(4);
     subscriber.complete();
   }, 1000);
 });
Observer

An interface for a consumer of push-based notifications delivered by an Observable.

interface Observer<T> {   
   closed?: boolean;
   next: (value: T) => void;
   error: (err: any) => void;
   complete: () => void; }

Observables are data streams which may change value, and observer are the consumers whiich uses the values in obseevables.

What is a Subscription?

To execute the observable you have created and begin receiving notifications, you call it’s subscribe() method. Subscribing is “kicks off” the observable stream. Without a subscribe (or an async pipe) the stream won’t start emitting values. It’s similar to subscribing to a newspaper or magazine … you won’t start getting them until you subscribe.

observable.subscribe( 
 (data: any) => console.log(data), // for handling data
)

When to use BehaviorSubject?

At first, what is BehaviorSubject , A BehaviorSubject is basically just a standard observable, except that it will always return a value. 

The idea of building BehaviorSubject is to avoid getting undefined values while subscribing. ie, We want to fetch some value from an HTTP request, it gives an observable, if we use the value, there isn’t really any way for us to know if the data has finished loading or not. If we try to access the data anywhere in the code, then we are going to get a value of undefined. Or if the data has changed, the change won’t reflect in the app.

Why BehaviorSubject
  • It will always return a value, even if no data has been emitted from its stream yet
  • When you subscribe to it, it will immediately return the last value that was emitted immediately (or the initial value if no data has been emitted yet)
  • Instantly notify anything that is subscribed to the BehaviorSubject when the data changes
import { Injectable } from '@angular/core';
import { BehaviorSubject } from 'rxjs'; 
import { Storage } from '@ionic/storage'; 
import { SomeType } from '../interfaces/movie-category'; 

@Injectable({   providedIn: 'root' }) 

export class DataService {     
public myData: BehaviorSubject<SomeType[]> = new BehaviorSubject<SomeType[]>([]);  
   
constructor(private storage: Storage) {      } 
    
load(): void {         
this.storage.get('myData').then((data) => {             this.myData.next(data);
});     
}  
updateData(data): void { 
  this.storage.set('myData', data);         this.myData.next(data);     
} 
}

Unsubscribe: Preventing Memory Leaks

Don’t forget to Unsubscribe. Use the async pipe whenever necessary

If we subscribe to a stream the stream will be left open and the callback will be called when values are emitted into it anywhere in the app until they are closed by calling the unsubscribe method. If a subscription is not closed the function callback attached to it will be continuously called, this poses a huge memory leak and performance issue.

There are several methods for unsubscribing, one of them is to use takeUntil() method in the pipe before your “subscribe”. ie,

ngOnInit() { 

this.service.Subject1.pipe(takeUntil(this.ngUnsubscribe)).subscribe(() => {});   

this.service.Subject2.pipe(takeUntil(this.ngUnsubscribe)).subscribe(() => {});  

this.service.Subject3.pipe(takeUntil(this.ngUnsubscribe)).subscribe(() => {}); }

Here if the component gets destroyed the ngUnsubscribe.next()/complete() calls in the base class will end the subscription in an RxJS way.

Use Async | Pipe

Another way is using async pipe in templates. The async pipe takes care of subscribing and unwrapping the data as well as unsubscribing when the component is destroyed.

@Component({
selector: 'async-observable-pipe',
template: '<div><code>observable|async</code>: Time: {{ time | async }}</div>' }) 

export class AsyncObservablePipeComponent {   
time = new Observable<string>((observer: Observer<string>) => {
  setInterval(() => observer.next(new Date().toString()),1000);   
}); }

unSubscibe()

We can unsubscribe the subscribtion using the unsubscribe method.

ngOnDestroy() {
  this.subscription.unsubscribe()
}

Leave a Reply

Your email address will not be published. Required fields are marked *