Site icon FSIBLOG

Implementing Polling With RxJS

Implementing Polling With RxJS

In most front-end applications, fetching data from the backend is a common task to provide users with up-to-date information. While straightforward API calls often suffice, certain scenarios require more advanced techniques to handle real-time or periodic updates effectively. One such scenario is polling repeatedly calling an API at regular intervals. In this blog, we’ll explore how to implement polling using RxJS.

RxJS Refresher

RxJS (Reactive Extensions for JavaScript) is a library for reactive programming that makes working with asynchronous data streams simple and efficient. It provides tools like Observables, Operators, and Subjects to handle complex asynchronous workflows.

Why Use RxJS for Polling?

RxJS is well-suited for polling because it:

Setting Up Polling With RxJS

Here’s how we can create a robust polling mechanism using RxJS operators.

Core Operators in Polling

Implementing Polling With pollWhile

Below is a function that provides a reusable polling mechanism. It allows customization of the polling interval, maximum attempts, and active polling condition.

Code: pollWhile

codeimport { MonoTypeOperatorFunction, timer } from 'rxjs';
import { scan, switchMapTo, takeWhile, tap, last } from 'rxjs/operators';

function attemptsGuardFactory(maxAttempts: number) {
return (attemptsCount: number) => {
if (attemptsCount > maxAttempts) {
throw new Error('Exceeded maxAttempts');
}
};
}

export function pollWhile<T>(
pollInterval: number,
isPollingActive: (res: T) => boolean,
maxAttempts: number = Infinity,
emitOnlyLast: boolean = false
): MonoTypeOperatorFunction<T> {
return (source$) => {
const poll$ = timer(0, pollInterval).pipe(
scan((attempts) => attempts + 1, 0),
tap(attemptsGuardFactory(maxAttempts)),
switchMapTo(source$),
takeWhile(isPollingActive, true)
);

return emitOnlyLast ? poll$.pipe(last()) : poll$;
};
}

Key Features

Using the Polling Function

Here’s an example of how to use pollWhile in your application.

Example: Polling an API Until a Condition Is Met

codeimport { of } from 'rxjs';
import { delay, map } from 'rxjs/operators';
import { pollWhile } from './pollWhile'; // Import the polling function

// Mock API call
function mockApiCall() {
const statuses = ['pending', 'pending', 'complete'];
let index = 0;

return of(null).pipe(
delay(1000), // Simulate network delay
map(() => statuses[index++])
);
}

const isPollingActive = (status: string) => status !== 'complete';

mockApiCall().pipe(
pollWhile<string>(
2000, // Polling interval: 2 seconds
isPollingActive, // Continue polling until status is 'complete'
5, // Maximum attempts
true // Emit only the final result
)
).subscribe({
next: (result) => console.log('Final result:', result),
error: (err) => console.error('Error:', err),
complete: () => console.log('Polling complete.')
});

Advanced Polling: Exponential Backoff

In some cases, such as when a server is overloaded, increasing the delay between requests (exponential backoff) is a better approach to reduce server load.

Code: Exponential Backoff With pollWhile

codeimport { expand, timer } from 'rxjs';
import { scan, switchMapTo, takeWhile, tap, last } from 'rxjs/operators';

export function pollWhile<T>(
pollInterval: number,
growthFactor: number,
isPollingActive: (res: T) => boolean,
maxAttempts: number = Infinity,
emitOnlyLast: boolean = false
): MonoTypeOperatorFunction<T> {
return (source$) => {
const poll$ = timer(0).pipe(
scan((attempts) => attempts + 1, 0),
tap(attemptsGuardFactory(maxAttempts)),
expand((attempts) =>
timer(pollInterval * Math.pow(growthFactor, attempts))
),
switchMapTo(source$),
takeWhile(isPollingActive, true)
);

return emitOnlyLast ? poll$.pipe(last()) : poll$;
};
}

Usage

codemockApiCall().pipe(
pollWhile<string>(
2000, // Initial interval: 2 seconds
1.5, // Growth factor: 1.5
isPollingActive,
5,
true
)
).subscribe({
next: (result) => console.log('Final result:', result),
error: (err) => console.error('Error:', err),
complete: () => console.log('Polling complete.')
});

Benefits of Polling With RxJS

Conclusion

Polling with RxJS provides a robust and flexible way to handle repetitive API calls in modern applications. By leveraging RxJS’s powerful operators, you can build efficient, customizable polling mechanisms that handle real-world challenges like server overload and dynamic stopping conditions. Additionally, features like exponential backoff ensure your application remains performant even under high server loads.

Try implementing polling with RxJS in your next project, and you’ll see how reactive programming can simplify complex asynchronous tasks!

Exit mobile version