Go Pipelines vs RxJS

How do they scale?

Prerequisites:

Basic Pipeline

Go starts off with more boilerplate

func Pipeline(in <-chan models.Product) <-chan models.Product {
    out := make(chan models.Product)
    go func() {
        for product := range in {
            out <- product
        }
        close(out)
    }()
    return out
}
export function pipeline(in$: Observable<Product>): Observable<Product> {
    return in$;
}

mergeMap

The example is more comparable to concatMap, although there is no functional difference in this case.

func Pipeline(in <-chan models.Product) <-chan string {
	out := make(chan string)
	go func() {
		for product := range in {
			for _, img := range product.Images {
				out <- img
			}
		}
		close(out)
	}()
	return out
}
export function pipeline(in$: Observable<Product>): Observable<string> {
    return in$.pipe(
        mergeMap(product => from(product.Images)),
    );
}

Intertwined map & filter

With rxjs, there are several ways of doing this. None are great, but they work.

type Img struct {
	Dec *Decoder
	URL string
}
func Pipeline(in <-chan models.Product) <-chan Img {
	out := make(chan Img)
	go func() {
		for product := range in {
			for _, img := range product.Images {
				decoder, exists := getDecoder(img)
				if !exists {
					continue // filter out non image extensions
				}
				out <- Img{Dec: decoder, URL: img}
			}
		}
		close(out)
	}()
	return out
}
type Img = {
    url: string;
    decoder: Decoder;
};
export function pipeline(in$: Observable<Product>): Observable<Img> {
    return in$.pipe(
        mergeMap(product => from(product.Images)),
        // -- map valid images to Img, non-valid to null, then filter nulls
        map(img => {
            const decoder = getDecoder(img);
            if (decoder == null) {
                return null;
            }
            return { url: img, decoder: decoder! };
        }),
        filter((img): img is Img => img != null),
    );
}
 
// --- OR
export function pipeline(in$: Observable<Product>): Observable<Img> {
    return in$.pipe(
        mergeMap(product => from(product.Images)),
        mergeMap(img => {
            const decoder = getDecoder(img);
            if (decoder == null) {
                return EMPTY
            }
            return of({ url: img, decoder: decoder! });
        })
    );
}

Scan then Concatmap

Resuse decoders with the same settings before building/reusing it. Although the caching of decoders is consolidated in RxJS, the Golang implementation was admittedly easier/intuitive to add unto.

type Img struct {
	Dec *Decoder
	URL string
}
func Pipeline(in <-chan models.Product) <-chan ProcessedImage {
	out := make(chan ProcessedImage)
	go func() {
		decoders := []*Decoder{}
		for product := range in {
			for _, img := range product.Images {
				decoder, exists := getDecoder(img)
				if !exists {
					continue // filter out non image extensions
				}
				
              reusableDecoderIndex := slices.IndexFunc(decoders, func(dv *Decoder) bool {
					return dv.MatchesSettings(decoder)
				})
				if reusableDecoderIndex == -1 {
					out <- decoder.buildOrReuse().decodeFromUrl(img)
				} else {
					out <- decoders[reusableDecoderIndex].buildOrReuse().decodeFromUrl(img)
				}
			}
		}
		close(out)
	}()
	return out
}
type Img = {
    url: string;
    decoder: Decoder;
};
export function pipeline(in$: Observable): Observable {
    type scanObj = { decoders: Decoder[], out: { decoder: Decoder, url: string } }
    return in$.pipe(
        mergeMap(product => from(product.Images)),
        mergeMap(img => {
            const decoder = getDecoder(img);
            if (decoder == null) {
                return EMPTY
            }
            return of({ url: img, decoder: decoder! } as Img);
        }),
        // reuse decoders with same settings
        scan((acc, img) => {
            const reuseableDecoder = acc.decoders.find(dec => dec.matchesSettings(img.decoder));
            if (reuseableDecoder == undefined) {
                return { decoders: [...acc.decoders, img.decoder], out: { decoder: img.decoder, url: img.url } };
            } else {
                return { decoders: acc.decoders, out: { decoder: reuseableDecoder, url: img.url } };
            }
        }, { decoders: [] as Decoder[], out: null as never } as scanObj),
        concatMap(v => v.out.decoder.buildOrReuse().decodeFromUrl(v.out.url)),
    );
}

PERFORMANCE!!!

Both pipelines handle a single image at a time. Assuming the order of images coming out this pipeline doesn't matter, we can do them concurrently.

type Img struct {
	Dec *Decoder
	URL string
}
func Pipeline(in <-chan models.Product) <-chan ProcessedImage {
	out := make(chan ProcessedImage)
	go func() {
		decoders := []*Decoder{}
		for product := range in {
			for _, img := range product.Images {
				decoder, exists := getDecoder(img)
				if !exists {
					continue // filter out non image extensions
				}
				reusableDecoderIndex := slices.IndexFunc(decoders, func(dv *Decoder) bool {
					return dv.MatchesSettings(decoder)
				})
                go func(){
                    if reusableDecoderIndex == -1 {
                        out <- decoder.buildOrReuse().decodeFromUrl(img)
                    } else {
                        out <- decoders[reusableDecoderIndex].buildOrReuse().decodeFromUrl(img)
                    }
                }()
			}
		}
		close(out)
	}()
	return out
}
type Img = {
    url: string;
    decoder: Decoder;
};
export function pipeline(in$: Observable): Observable {
    type scanObj = { decoders: Decoder[], out: { decoder: Decoder, url: string } }
    return in$.pipe(
        mergeMap(product => from(product.Images)),
        mergeMap(img => {
            const decoder = getDecoder(img);
            if (decoder == null) {
                return EMPTY
            }
            return of({ url: img, decoder: decoder! } as Img);
        }),
        // reuse decoders with same settings
        scan((acc, img) => {
            const reuseableDecoder = acc.decoders.find(dec => dec.matchesSettings(img.decoder));
            if (reuseableDecoder == undefined) {
                return { decoders: [...acc.decoders, img.decoder], out: { decoder: img.decoder, url: img.url } };
            } else {
                return { decoders: acc.decoders, out: { decoder: reuseableDecoder, url: img.url } };
            }
        }, { decoders: [] as Decoder[], out: null as never } as scanObj),
        
        // mergeMap doesn't care about the order
        mergeMap(v => v.out.decoder.buildOrReuse().decodeFromUrl(v.out.url)),
    );
}

MEASURE RXJS PERFORMANCE!!!

concatMap waits for each image to complete, mergeMap does all images concurrently with no respect of order. Warning: timings are randomly generated and real-ish, but not real-world.

const now = Date.now();
pipeline(in$).subscribe({
    next: (img) => console.log(img, "after " + (Date.now() - now).toString() + "ms"),
    complete: () => console.log("Completed in " + (Date.now() - now) + "ms"),
})
concatMap
the sum of time for each image to complete.

            
mergeMap (notice lack of order)
takes as long as the slowest image.

            

Go PIPELINE FAILING!

panic: send on closed channel

goroutine 13 [running]:
example/main/processor.Pipeline.func1.2()
        /home/wawa/Desktop/rxtest/processor/processor.go:31 +0x91
created by example/main/processor.Pipeline.func1 in goroutine 7
        /home/wawa/Desktop/rxtest/processor/processor.go:29 +0x125
exit status 2
-------------------------------------------------------------
// processer.go
go func() {
    if reusableDecoderIndex == -1 {
        // this is where it fails
        out <- decoder.buildOrReuse().decodeFromUrl(img)
    } else {
        // it could fail here too
        out <- decoders[reusableDecoderIndex].buildOrReuse().decodeFromUrl(img)
    }
}()
Where this bug occurs is limited however. HTTP-based backend services likely will not close `in` because you can receive an unlimited number of items to process.

They can, however, exit before the images finish processing.
// main.go
func main(){
    in := make(chan models.Product)
    go func() {
        imgPipeline := processor.Pipeline(in)
        for img := range imgPipeline {
            go uploadToS3(img)
        }
    }()
    http.HandleFunc("/admin/create-product", func(w http.ResponseWriter, r *http.Request) {
        var product models.Product
        /// ...
        go func() {
            <-in
        }()
    })
    // This blocks until it fails. If this fails at the same time images are being processed, those img goroutine will be cancelled, and not uploaded to s3.
    http.ListenAndServe(":8080", http.DefaultServeMux)
}
This is the mock test I'm using and reproduced the error.
func main() {
	products := []models.Product{
		{
			/// ...stuff
			Images: []string{
				"https://picsum.photos/100",
				"https://picsum.photos/200",
				"https://picsum.photos/300",
			},
		},
        // ...repeat
	}
	in := make(chan models.Product)
	go func() {
		for _, p := range products {
			in <- p
		}
		close(in)
	}()
	go func() {
		out := processor.Pipeline(in)
		for v := range out {
			litter.Dump(v)
		}
	}()
    // doing some other quick work
}
However, this is not an issue with RxJS, as mergeMap waits for all inner observables to complete.

In addition, Node.JS completes NOT when the last command in the `main` function is run, but when the event loop is empty. Learn about it here

Go, on the otherhand, will cancel all goroutines when `main` function terminates.

This issue is easy to fix in Go however, with WaitGroups.

Go's mergeMap

Learn about WaitGroups
// processor.go
func Pipeline(in <-chan models.Product) <-chan ProcessedImage {
	out := make(chan ProcessedImage)
	go func() {
		var wg sync.WaitGroup
		decoders := []*Decoder{}
		for product := range in {
			for _, img := range product.Images {
				wg.Add(1)
				decoder, exists := getDecoder(img)
				if !exists {
					continue // filter out non image extensions
				}
				reusableDecoderIndex := slices.IndexFunc(decoders, func(dv *Decoder) bool {
					return dv.MatchesSettings(decoder)
				})
				go func() {
					defer wg.Done()
					if reusableDecoderIndex == -1 {
						out <- decoder.buildOrReuse().decodeFromUrl(img)
					} else {
						out <- decoders[reusableDecoderIndex].buildOrReuse().decodeFromUrl(img)
					}
				}()
			}
		}
		wg.Wait() // wait for all inner goroutines to complete
		close(out)
	}()
	return out
}
How we use it.
// main.go
func main(){
    products := []models.Product{
		{
			/// ...stuff
			Images: []string{
				"https://picsum.photos/100",
				"https://picsum.photos/200",
				"https://picsum.photos/300",
			},
		},
        // ...repeat
	}
	in := make(chan models.Product)
	go func() {
		for _, p := range products {
			in <- p
		}
		close(in)
	}()
	completed := make(chan struct{})
	go func() {
		out := processor.Pipeline(in)
		for v := range out {
			litter.Dump(v)
		}
		completed <- struct{}{}
	}()
	<-completed
}

Broadcast?

Don't

completed := make(chan struct{})
go func() {
    out := processor.Pipeline(in)
    for processedImg := range out {
        go uploadToS3(processedImg)
        go backupToLocalStorage(processedImg)
        go sendStatistics(processedImg)
    }
    completed <- struct{}{}
}()
<-completed
pipeline(in$).subscribe(img => {
    // all of these are asynchronous
    uploadToS3(img);
    backupToLocalStorage(img);
    sendStatistics(img);
});

I need broadcast

No you don't
"Yes I do"
Will you use dart?
"No?"
Okay
No
// RxJS
const broadcast = pipeline(in$).pipe(share());
const subs = [
    broadcast.subscribe(uploadToS3),
    broadcast.subscribe(backupToLocalStorage),
    broadcast.subscribe(sendStatistics),
];
onStatisticsServerCrash(() => subs.push(
    broadcast.subscribe(sendStatisticsToGoogleDrive)
))
onCleanup(() => {
    subs.forEach(sub => sub.unsubscribe());
})

Uncovered points