Improve RxJS performance by 992%!

I have a pipeline that:

function orderedImages(count: number) {
    return getCategories().pipe(
        concatMap((category) => getProducts(category.url)),
        mergeMap((res) => res.products),
        
        concatMap((product) => getImages(product.id)),
        mergeMap((images) => images),
      
        take(count),
    );
}

However, this pipeline takes a full minute (52s to be precise), to complete.

We can improve the performance by replacing concatMap with mergeMap

function orderedImages(count: number) {
    return getCategories().pipe(
        concatMap mergeMap((category) => getProducts(category.url)),
        mergeMap((res) => res.products),
        
        concatMap mergeMap((product) => getImages(product.id)),
        mergeMap((images) => images),
        
        take(count),
    );
}

However, the images are no longer being sent down the pipeline in order. Rather, they are sent according to whichever loads faster.

We need some way to fetch the image concurrently, and yet wait for them down in order.

Solution #1

We are spending most of our time in getImages, since this api is called for each product.

Note: Ideally, you should reconsider your endpoints, but that's not always available.

Therefore, we can map each product to a Promise<Image[]>.

This will start fetching for images (because Promises are eager), but not wait for them.

Then, we can wait foreach Promise with concatMap.

return getCategories().pipe(
    concatMap((category) => getProducts(category.url)),
    mergeMap((res) => res.products),

    map(product => firstValueFrom(getImages(product.id))),
    concatMap((promise) => from(promise)),
    mergeMap((images) => images),
    
    take(count),
);

This drops the time from 52636ms to 5303ms. That's an improvement of 992%!

If you need help with performance in your RxJS-driven applications, feel free to reach out at consult@gdnetwork.co, or leave a comment on any of Hacker News post that comes from this article.

However, there's one glaring issue with solution. It takes a lot of memory, as all products are fetched in parallel. This could potentially be thousands/millions of requests at a time, which can crash the server.

Solution #2

We need to concatMap concurrently, but only to a certain limit. But, as it turns out, there's no clean/intuitive way of doing this with just the operators that RxJS give us.

Thankfully, we can use the concatMapEager operator from the rxjs-etc library.

It allows use to pass a concurrency limit, while at the same time maintaining the order of concatMap.

return getCategories().pipe(
    concatMap((category) => getProducts(category.url)),
    mergeMap((res) => res.products),
    
    // map((product) => firstValueFrom(getImages(product.id))),
    // concatMap((promise) => from(promise)),

    // only 10 items at a time
    concatMapEager((product) => getImages(product.id), 10),
    mergeMap((images) => images),

    take(count),
);

This brings the response time to 9903s (from 5000ms), but keeps our server from crashing.

You can tweak the concurrent parameter (currently at 10), to fit your needs. The higher the limit, the faster the throughput. But that does come with more cpu/memory usage.