Reactive Programming — From JS to ReactJS

Programming paradigms are the styles and approaches to conceptualize a problem and design a suitable solution using different methodologies and philosophies for writing clean, efficient, structured and well organized code. The number of programming paradigms is sort of fluid and there are many of them including Procedural programming, Functional programming, Event-Driven programming, Object Oriented programming and etc.

Basically Reactive Programming is also a programming paradigm that focuses on propagation of changes and asynchronous data streams. The blog will try to keep it as simple and easy as possible. So take a moment to savor your favorite caffeinated treat — because every great read deserves a great drink.

Change propagation and Asynchronous data stream.

First of all, lets understand what does it mean by asynchronous data and change propagation.

Updating the dependent parts of a system when the source data changes is referred to as “Change Propagation”.Typically, asynchronous means that the tasks or processes are happening independently of each other, without needing to wait for one to finish before the other starts. The term data stream implies the sequence of data being emitted. Collectively, the term “Asynchronous Data Stream” means that the data items are being emitted over time being independent of the main program’s flow and execution.

In the modern development era, we will hardly find a system that does not need the change propagation and asynchronous data stream to be managed. This is rather a difficult task since we find the following problems dealing with asynchronous data stream and change propagation.

The Problems

Following are some problems that makes it difficult for us to manage our async data and changes.

Complexity in Managing State: Asynchronous data streams can make it difficult to manage state because data can arrive at any time, potentially leading to inconsistent or unpredictable state if not handled correctly.Callback Hell: Traditional methods of handling asynchronous data, such as callbacks, can lead to deeply nested and hard-to-read code structures.Concurrency Issues: Handling multiple asynchronous streams concurrently can introduce issues such as race conditions, where the timing of events causes unexpected behavior.Error Handling: Managing errors in asynchronous flows can be complex, especially when errors can occur at multiple points in the data stream.Backpressure: When the rate of incoming data exceeds the system’s ability to process it, it can lead to resource exhaustion and degraded performance.

Reactive Programming — The Solution

Centered around async data streams and change propagation, Reactive Programming is a paradigm that not only helps in managing these aspects but also diligently solves the issues caused by them.

Core Concepts — The overview

In the core of this paradigm, are the four basic concepts that helps us achieving our goals to manage the two aspects being under discussion.

Observables: Represent data streams or events that can be observed. An observable emits data, which can be subscribed to by observers.Observers: Objects that subscribe to observables. They react to the data emitted by them.Operators: Functions that allow us to transform, filter, combine, and manipulate data streams.Schedulers: Control the execution context of the observables, managing threading, concurrency, and time-based operations.

Importance of Reactive Programming

We already discussed the problems caused by async data streams and change propagation. Lets see what good does this paradigm do to us.

Asynchronous Data Handling: Traditional imperative programming models can struggle with asynchronous data, leading to callback hell or complex state management. Reactive programming simplifies this by treating asynchronous events as streams that can be composed and manipulated declaratively.Composition of Data Streams: Reactive programming allows the composition of complex data processing pipelines from simple, reusable operators.Automatic Propagation of Changes: When source data changes, all dependent parts of the system are automatically updated, reducing the need for manual state management.Complex Event Handling: In applications with complex event handling, such as user interfaces with multiple interactions or real-time data streams, reactive programming provides a structured and consistent approach to handle these events.Backpressure Management: In systems where the production of data can outpace its consumption (e.g., data streams, server requests), reactive programming can manage backpressure, ensuring that the system remains responsive and stable.

Reactive Programming in JS

Having the problem statement and the solution understood, lets go through the library that makes reactive programming in JS easier for us. This JS library, whose basics are just couple sentences ahead, is known as RxJs. It works on the same core concepts and principals of Reactive Programming as we have already discussed above. In this post, we will try to cover up the basic and popular functions and aspects of RxJs.

Observables

Observables represent data streams that can emit multiple values over time. These values can be numbers, strings, objects, or even streams of other observables. They are versatile and powerful, allowing us to work with asynchronous data flows. Regarding the values, following points should be kept in mind.

An observable can emit any number of values (Zero or more).Values can be emitted immediately (synchronous) or over time (asynchronous).Values are taken care of by three types of notifications i.e. next, error and complete.

Creating Observables:
Observables can be created using a couples of creator functions like of, from, interval, fromEvent, timer, range, ajax and EMPTY etc. Although there are roughly 20 commonly used creator functions for observables as of RxJs 7. But most likely you will be seeing from the above mentioned functions more often. We can also use our own creator function aside from the built-in ones.

of
Creates an observable that emits the arguments you provide, then completes.const { of } = require(‘rxjs’);
const observable = of(1, 2, 3); // Emits 1, 2, 3 sequentially

2. from
Converts an array, promise, or iterate-able into an observable.

const { from } = require(‘rxjs’);
const observable = from([10, 20, 30]); // Emits 10, 20, 30 sequentially

3. interval
Creates an observable that emits sequential numbers every specified interval of time.

import { interval } from ‘rxjs’;
const observable = interval(1000); // Emits 0, 1, 2, … every second

4. fromEvent
Creates an observable from an event target, such as a DOM element.

import { fromEvent } from ‘rxjs’;
const observable = fromEvent(document, ‘click’);
// Basically you pass the DOM Element above, like buttons etc.

5. timer
Creates an observable that waits for a specified time, then emits a single value and completes.

const { timer } = require(‘rxjs’);
const observable = timer(2000); // Emits after 2 seconds

6. range
Creates an observable that emits a sequence of numbers within a specified range.

import { range } from ‘rxjs’;
const observable = range(1, 5); // Emits 1, 2, 3, 4, 5

7. ajax
Creates an observable for making HTTP requests.

import { ajax } from ‘rxjs/ajax’;
const observable = ajax(‘https://api.example.com/data’);

8. EMPTY
Creates an observable that emits no items and immediately completes.

import { EMPTY } from ‘rxjs’;
const observable = EMPTY;

9. Providing Custom Creator Function

we can also use our own creator function to create observables as you can understand with following example.

const { Observable } = require(‘rxjs’);

// Creating an observable
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
});

// emits 1, 2, 3

Observers

Observers, also known as subscribers, do just what they are called exactly. They subscribe to the observables or to put it another way, observe them. Central to the observer pattern, which is the foundation of reactive programming in RxJS, an observer is an object that “listens” to the data emissions from an observable and reacts to them. It defines how to handle the notifications delivered by an observable. As we have already discussed that there are three types of notifications, so an observer (or a subscriber) can have “up to” three methods, corresponding to the types of notifications.

next(value)
This method is called whenever the observable emits a value. It handles each value in the sequence emitted by the observable.error(err)
This method is called if the observable encounters an error. It handles any errors that occur during the execution of the observable. Once an error is emitted, the observable stops emitting further values.complete()
This method is called when the observable has finished emitting all values. It handles the completion notification. Once the observable completes, it stops emitting any more values.

Hence, typically an observer looks like the following.

const observer = {
next(value) {
console.log(‘Next value: ‘, value);
},
error(err) {
console.error(‘Error: ‘, err);
},
complete() {
console.log(‘Completed’);
}
};

This object is then passed to subscribe() while subscribing to an observable and for each emitted value, we have a suitable notification. We can either directly pass the comma separated method implementations or we can define it beforehand and then pass the object to the method. The later one is more cleaner and easier to read though. We can deal with the notifications as per our use cases and requirements.

Subscribing to an Observable

This is just as simple as it sounds. We just have to create the observable first then craft an observer object , then calling the subscribe() method on the observable and pass it the observer object (or just provide the implementations on the go while calling the method). Voila, its done! Lets see an example to make it more obvious to understand.

import { of } from ‘rxjs’;

// Approach #1: Defining the Object.
const observable = of(1, 2, 3);

const observer = {
next(value) {
console.log(‘Next value: ‘, value);
},
error(err) {
console.error(‘Error: ‘, err);
},
complete() {
console.log(‘Completed’);
}
};

observable.subscribe(observer);

// Approach #2: Providing the implementations on the go.
observable.subscribe(
value => console.log(‘Next value: ‘, value),
err => console.error(‘Error: ‘, err),
() => console.log(‘Completed’)
);

// Output (same for both approaches)
// Next value: 1
// Next value: 2
// Next value: 3
// Completed

In the above example, our observable will emit the values consecutively all at once. This is because it didn’t had to wait for anything to emit values. This behavior was quite synchronous. Lets see an example of an observable that emits synchronous values and asynchronous as well. This way we can have a nice understanding of how these things work.

const { Observable } = require(‘rxjs’);

// Creating an observable by using our own creator function
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
subscriber.next(5);
});

// Subscribing to the observable
observable.subscribe({
next(x) { console.log(‘Got value ‘ + x); },
error(err) { console.error(‘Something went wrong: ‘ + err); },
complete() { console.log(‘Done’); }
});

The provided code creates an observable that emits values synchronously at first and then asynchronously after a delay using setTimeout.

Synchronous Emissions:
The observable immediately emits values 1, 2, and 3. Each emitted value is logged to the console by the next handler.Asynchronous Emissions:
After a delay of 1 second (1000 milliseconds), the observable emits value 4. It then calls the complete method, indicating that no more values will be emitted. Both the emitted value and the completion notification are logged to the console.Got value 1
Got value 2
Got value 3
Got value 5
** 1 second Gap **
Got value 4
Done

Observers, async operations and complete()

Enough about the observers already. Its time for a quick and easy riddle. In the example above we have just one async operation that emitted value 4 and since we knew that we already placed the complete statement right after our operation has emitted some async value. If we just place the complete() at the end of constructor function instead, the output will be a but different.

Example Problem:

const { Observable } = require(‘rxjs’);

// Creating an observable by using our own creator function
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
}, 1000);
subscriber.next(5);
subscriber.complete();
});

// Subscribing to the observable
observable.subscribe({
next(x) { console.log(‘Got value ‘ + x); },
error(err) { console.error(‘Something went wrong: ‘ + err); },
complete() { console.log(‘Done’); }
});

The output would look like this:

Got value 1
Got value 2
Got value 3
Got value 5
Done

Noticed something ? We didn’t get value 4 that was being emitted by async operation, setTimeout in our case. Lets have a closer look on what’s happening there.

The observable immediately emits the values 1, 2 and 3.setTimeout schedules the emission of the value 4 to the observer after a delay of 0 milliseconds (this means it will be executed after the current call stack is empty).Observable emits the value 5complete() concludes the emissions and any pending tells the observer that no value will be emitted anymore.Now the current call stack is empty and the scheduled setTimeout callback is executed, but since the observable has already completed, the value 4 will not be emitted.

But that is not what we want to see, right? What if we want to write our custom constructor function that has mix of several sync and async operations. And we might don’t know which process take how much time to finally emit a value. Regardless, we want a functions that completes the emissions once all the async operations are done. That sure sounds goods. To achieve this we just need to adjust our function accordingly. Lets see how to get this done.

Proposed Solution:

One way is to keep track of ongoing async operations with the help of some local variable or counter. In this approach, we can just increment the counter every time we start an async operation, and when it emits a value, we can decrement the counter. This way we know exactly how many of async process do we need to wait for before completing the emissions. Since sync operations are completed first anyway, so we dont need to keep their track. Only async operations are important to keep an eye on. Here is a simple way to do that.

// Creating an observable by using our own creator function
const observable = new Observable(subscriber => {
let asyncOperations = 0;

function asyncStart() {
asyncOperations++;
}

function asyncComplete() {
asyncOperations–;
if (asyncOperations === 0) {
subscriber.complete();
}
}

subscriber.next(1);
subscriber.next(2);
subscriber.next(3);

asyncStart();
setTimeout(() => {
subscriber.next(4);
asyncComplete();
}, 10);

subscriber.next(5);
subscriber.next(6);

asyncStart();
setTimeout(() => {
subscriber.next(7);
asyncComplete();
}, 0);

});

// Subscribing to the observable
observable.subscribe({
next(x) { console.log(‘Got value ‘ + x); },
error(err) { console.error(‘Something went wrong: ‘ + err); },
complete() { console.log(‘Done’); }
});

The output of this code snippet is as follows

Got value 1
Got value 2
Got value 3
Got value 5
Got value 6
Got value 7
Got value 4
DoneLogs show that 1 , 2 and 3 are the first synchronously emitted values. Then we have 4 which is to be emitted after delay of 10 milli seconds. Further, we see 5 , and 6 being emitted in sync manner. Then again we have 7 which will be emitted after delay of 0 milli seconds, and will execute after current call stack is empty. Since 4 has to wait 10 ms so 7 comes right after 6 due to its 0 ms wait time.

Output Logic Explanation:

asyncOperations is used for refence counting of async operations. Its value tells us about the number of async operations that we need to wait for.asyncStart() and asyncComplete() are two methods that increases or decreases our reference counter by 1 every time an async operation starts or completes, respectively.Our completion logic lies in asyncComplete() where we check reference count has dropped to 0 after the recent decrement in value. To put it another way, we check if number of ongoing async operations has dropped to 0 so that we can call complete() to conclude any further emissions.

RxJS Operators

RxJS operators are functions that enable the transformation, filtering, and combination of observables. They allow us to handle complex async operations in a declarative and concise way. In RxJS, the pipe function is used to chain and apply operators to observable streams. Out of many operators we will discus some basic operators with some self explanatory code examples to get started.

map
It’s similar to the Array map method but for observables. It transforms each value emitted by an observable using a given function.const { of } = require(‘rxjs’);
const { map } = require(‘rxjs/operators’);

of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(value => console.log(value));
// Output: 2, 4, 6filter
Emits only those values from an observable that pass a specified predicate function.const { of } = require(‘rxjs’);
const { filter } = require(‘rxjs/operators’);

of(1, 2, 3, 4, 5).pipe(
filter(x => x % 2 === 0)
).subscribe(value => console.log(value));
// Output: 2, 4mergeMap
Projects each source value to an observable which is merged in the output observable.const { of } = require(‘rxjs’);
const { mergeMap } = require(‘rxjs/operators’);

of(‘a’, ‘b’).pipe(
mergeMap(x => of(x + ‘1’, x + ‘2’))
).subscribe(value => console.log(value));
// Output: ‘a1’, ‘a2’, ‘b1’, ‘b2’catchError
Catches errors on the observable to handle them gracefully.const { of, throwError } = require(‘rxjs’);
const { catchError } = require(‘rxjs/operators’);

throwError(‘error’).pipe(
catchError(err => of(‘Error handled’))
).subscribe(value => console.log(value));
// Output: ‘Error handled’debounceTime
Useful for rate-limiting events. Delays values emitted by the source observable for a given time span.const { fromEvent } = require(‘rxjs’);
const { debounceTime } = require(‘rxjs/operators’);

fromEvent(document, ‘click’).pipe(
debounceTime(1000)
).subscribe(() => console.log(‘Clicked!’));
// Output: ‘Clicked!’ (only if there are no further clicks within 1 second)

RxJS and ReactJS

Its very rare to see RxJS being used with ReactJs. In-fact, I haven’t seen anything like that yet. But that does not mean that it cant be done. Doing such thing has its pros and cons obviously. But the reason we don’t actually see such combination is that we use other tools with reactJs that serves the purpose and also help us maintain separation of concerns as well. These tools most often includes Redux for maintaining the state globally and using Thunk to help us do our tasks asynchronously.

When a change is generated or we get some data using Thunk, it is typically taken care of using redux. This makes the changes or the data available globaly. And now the only thing left is to update the changes appropriately throughout our application. This is something that our reactJs takes care of itself. When some data is saved or changed in our redux store variable, reactJs automatically updates or re-renders all the relevant components where that redux variable was used. Having such an application, we dont feel a need for using RxJS generally. Also, these days, when we have performance critical applications and dependencies can slow us down, using RxJS might not sound a good idea, especially when we can get things done without it.

Example Use case Scenario

But lets see how can we use RxJS with reactJs application that also uses Redux and ThunkApi. For this purpose we can assume a common scenerio where our user needs to search some data using search input. To get our filtered data we need to make API call, using thunk to backend with appropriate information and our response data is stored in Redux store. We also want to ensure that rapid typing does not overwhelm the backend with search requests while still providing responsive search functionality.

The React Way

To achieve this in reactJs we can do something similar to the following.

import React, { useState, useEffect, useCallback } from ‘react’;
import { useDispatch } from ‘react-redux’;
// Assume following is our async action that will get response
// and save it in Redux store.
import { searchItemsAsync } from ‘./redux/slices/itemsSlice’;

const SearchComponent = () => {
const dispatch = useDispatch();
const [searchTerm, setSearchTerm] = useState(”);
const [lastSearchTerm, setLastSearchTerm] = useState(”);
const [lastRequestTime, setLastRequestTime] = useState(0);

const handleInputChange = useCallback((event) => {
const term = event.target.value;

if (term !== lastSearchTerm) {
setSearchTerm(term);
}
}, [lastSearchTerm]);

useEffect(() => {
const now = Date.now();
const timeSinceLastRequest = now – lastRequestTime;

if (timeSinceLastRequest >= 500 && searchTerm !== lastSearchTerm) {
dispatch(searchItemsAsync(searchTerm));
setLastSearchTerm(searchTerm);
setLastRequestTime(now);
} else if (timeSinceLastRequest < 500) {
const timeoutId = setTimeout(() => {
dispatch(searchItemsAsync(searchTerm));
setLastSearchTerm(searchTerm);
setLastRequestTime(Date.now());
}, 500 – timeSinceLastRequest);

return () => clearTimeout(timeoutId);
}
}, [searchTerm, dispatch, lastSearchTerm, lastRequestTime]);

return (
<div>
<input
id=”searchInput”
type=”text”
value={searchTerm}
onChange={handleInputChange}
/>
</div>
);
};

export default SearchComponent;Controlled Input: The input field is controlled by the searchTerm state, ensuring that the component re-renders with the latest input value.Avoid Redundant Updates: By comparing searchTerm with lastSearchTerm, the component avoids dispatching the same search term multiple times.Debouncing: By using the combination of immediate and delayed dispatching logic, the component ensures that the search action is not called more frequently than once per 500 milliseconds.handleInputChange:
This function is called whenever the input field changes. It updates searchTerm only if the new term is different from lastSearchTerm, ensuring that repeated typing of the same term doesn’t cause unnecessary updates.useEffect():
It calculates the time elapsed since the last search request. If more than 500 milliseconds have passed since the last request and searchTerm is different from lastSearchTerm, it immediately dispatches the searchItemsAsync action with the current searchTerm. If less than 500 milliseconds have passed, it sets a timeout to dispatch the search action after the remaining time to reach 500 milliseconds. This implements the debounce functionality. The effect cleans up any pending timeouts when the component unmounts or before the next effect execution to avoid memory leaks and ensure proper timing.

The RxJS Way

Honestly speaking, the above code seems a bit messy and less readable. But now we the code, that we are going to discuss below, feels more readable and easy to understand at the same time. It looks short, concise and interesting at same time. Without further delay, lets check out ourselves.

import React, { useEffect } from ‘react’;
import { useDispatch } from ‘react-redux’;
import { searchItemsAsync } from ‘./redux/slices/itemsSlice’;
import { fromEvent } from ‘rxjs’;
import { debounceTime, distinctUntilChanged, switchMap } from ‘rxjs/operators’;

const SearchComponent = () => {
const dispatch = useDispatch();

useEffect(() => {
const subscription = fromEvent(document.getElementById(‘searchInput’), ‘input’)
.pipe(
debounceTime(500), // Debounce input events to wait for 500ms after the last input
distinctUntilChanged(), // Only emit if the current value is different from the previous value
switchMap((event) => {
const term = event.target.value;
return dispatch(searchItemsAsync(term)); // Dispatch thunk action creator with search term
})
)
.subscribe();

return () => subscription.unsubscribe();
}, [dispatch]);

return (
<div>
<input id=”searchInput” type=”text”/>
</div>
);
};

export default SearchComponent;useEffect():
Sets up an RxJS observable on the input event of the #searchInput DOM element when the component mounts.
– Uses debounceTime to delay the emission of the event by 500 milliseconds after the last input event.
– Uses distinctUntilChanged to emit events only if the current value is different from the previous value.
– Uses switchMap to switch to a new observable (dispatching the Redux action) every time the input event emits a new value.
– Subscribes to the observable to start listening to events and handle side effects.

With and Without RxJS — Final Comparison

Now, when we have observed the both codes that serves the same purpose. All we need is to compare the both of the above two approaches and try to understand the differences and pros and cons in a comprehensive way.

Plain React Approach

Performance

Manual debouncing and time-check logic are efficient and straightforward.Uses setTimeout to handle debouncing, which is performant for this use case.

Simplicity and Readability

Uses familiar React hooks (useState, useEffect, useCallback ).Logic for debouncing and checking the time since the last request is handled manually.Slightly more verbose due to manual implementation but may be easier for those familiar with React but not RxJS.

React + RxJS Approach

Performance

RxJS is optimized for handling streams and asynchronous events.Operators like debounceTime, distinctUntilChanged, and switchMap are highly optimized and can handle a large number of events efficiently.Potentially more performant in complex scenarios involving multiple streams or more sophisticated event handling.

Simplicity and Readability

Uses RxJS operators which are concise and powerful.Requires understanding of RxJS operators like debounceTime, distinctUntilChanged, and switchMap.May be less readable to those not familiar with RxJS, but more elegant and concise once understood.

Which One is Better and Faster? — Conclusion

For Simplicity and Maintainability:

If your are more comfortable with React hooks and you prefer not to introduce an additional library, the plain React approach may be better. It is straightforward and does not require learning RxJS.

For Performance and Elegance:

If you are familiar with RxJS or willing to learn, the RxJS approach can be more performant and elegant. It leverages powerful operators to handle streams of events efficiently.

Reactive Programming — From JS to ReactJs was originally published in Level Up Coding on Medium, where people are continuing the conversation by highlighting and responding to this story.

​ Level Up Coding – Medium

about Infinite Loop Digital

We support businesses by identifying requirements and helping clients integrate AI seamlessly into their operations.

Gartner
Gartner Digital Workplace Summit Generative Al

GenAI sessions:

  • 4 Use Cases for Generative AI and ChatGPT in the Digital Workplace
  • How the Power of Generative AI Will Transform Knowledge Management
  • The Perils and Promises of Microsoft 365 Copilot
  • How to Be the Generative AI Champion Your CIO and Organization Need
  • How to Shift Organizational Culture Today to Embrace Generative AI Tomorrow
  • Mitigate the Risks of Generative AI by Enhancing Your Information Governance
  • Cultivate Essential Skills for Collaborating With Artificial Intelligence
  • Ask the Expert: Microsoft 365 Copilot
  • Generative AI Across Digital Workplace Markets
10 – 11 June 2024

London, U.K.