Go vs RxJS
How do they scale comparatively?
Prerequisites:
- You know RxJS
- You know Go
- You've used both
- This is not a tutorial, don't copy code from here
Basic Pipeline
Go starts off with more boilerplate
// Golang func Pipeline(in <-chan models.Product) <-chan models.Product { out := make(chan models.Product) go func() { for product := range in { out <- product } close(out) }() return out }
// JavaScript export function pipeline(in$: Observable<Product>): Observable<Product> { return in$; }
mergeMap
The example is more comparable to concatMap, although there is no functional difference in this case.
func Pipeline(in <-chan models.Product) <-chanstring
{ out := make(chan string) go func() { for product := range in {out <- product
for _, img := range product.Images { out <- img }
} close(out) }() return out }
// JavaScript
export function pipeline(in$: Observable<Product>): Observable<string> {
return in$.pipe(
mergeMap(product => from(product.Images)),
)
;
}
Intertwined map & filter
With rxjs, there are several ways of doing this. None are great, but they work.
type Img struct { Dec *Decoder URL string }
<- chan string
func Pipeline(in <-chan models.Product)<-chan Img
{ out := make(chan Img) go func() { for product := range in { for _, img := range product.Images {out <- img
decoder, exists := getDecoder(img) if !exists { continue
// filter out non image extensions} out <- Img{Dec: decoder, URL: img}
} } close(out) }() return out }
type Img = { url: string; decoder: Decoder; };
Observable<string>
export function pipeline(in$: Observable<Product>): Observable<Img
> { return in$.pipe( mergeMap(product => from(product.Images)), // -- map valid images to Img, non-valid to null, then filter nullsmap(img => { const decoder = getDecoder(img); if (decoder == null) { return null; } return { url: img, decoder: decoder! }; }), filter((img): img is Img => img != null),
); } // --- OR export function pipeline(in$: Observable<Product>): Observable<Img> { return in$.pipe( mergeMap(product => from(product.Images)),mergeMap(img => { const decoder = getDecoder(img); if (decoder == null) { return EMPTY } return of({ url: img, decoder: decoder! }); })
); }
Scan then Concatmap
Resuse decoders with the same settings before building/reusing it. Although the caching of decoders is consolidated in RxJS, the Golang implementation was admittedly easier/intuitive to add unto.
type Img struct { Dec *Decoder URL string }<-chan Img
func Pipeline(in <-chan models.Product)<-chan ProcessedImage
{ out := make(chan ProcessedImage) go func() {decoders := []*Decoder{}
for product := range in { for _, img := range product.Images { decoder, exists := getDecoder(img) if !exists { continue // filter out non image extensions }reusableDecoderIndex := slices.IndexFunc(decoders, func(dv *Decoder) bool { return dv.MatchesSettings(decoder) }) if reusableDecoderIndex == -1 { out <- decoder.buildOrReuse().decodeFromUrl(img) } else { out <- decoders[reusableDecoderIndex].buildOrReuse().decodeFromUrl(img) }
} } close(out) }() return out }
type Img = {
url: string;
decoder: Decoder;
};
export function pipeline(in$: Observable<ProcessedImage>): Observable<ProcessedImage> {
type scanObj = { decoders: Decoder[], out: { decoder: Decoder, url: string } }
return in$.pipe(
mergeMap(product => from(product.Images)),
mergeMap(img => {
const decoder = getDecoder(img);
if (decoder == null) {
return EMPTY
}
return of({ url: img, decoder: decoder! } as Img);
}),
// reuse decoders with same settings
scan((acc, img) => {
const reuseableDecoder = acc.decoders.find(dec => dec.matchesSettings(img.decoder));
if (reuseableDecoder == undefined) {
return { decoders: [...acc.decoders, img.decoder], out: { decoder: img.decoder, url: img.url } };
} else {
return { decoders: acc.decoders, out: { decoder: reuseableDecoder, url: img.url } };
}
}, { decoders: [] as Decoder[], out: null as never } as scanObj),
concatMap(v => v.out.decoder.buildOrReuse().decodeFromUrl(v.out.url)),
);
}
Performance with Concurrency
Both pipelines handle a single image at a time. Assuming the order of images coming out this pipeline doesn't matter, we can do them concurrently.
type Img struct { Dec *Decoder URL string } func Pipeline(in <-chan models.Product) <-chan ProcessedImage { out := make(chan ProcessedImage) go func() { decoders := []*Decoder{} for product := range in { for _, img := range product.Images { decoder, exists := getDecoder(img) if !exists { continue // filter out non image extensions } reusableDecoderIndex := slices.IndexFunc(decoders, func(dv *Decoder) bool { return dv.MatchesSettings(decoder) })go func(){
if reusableDecoderIndex == -1 { out <- decoder.buildOrReuse().decodeFromUrl(img) } else { out <- decoders[reusableDecoderIndex].buildOrReuse().decodeFromUrl(img) }}()
} } close(out) }() return out }
type Img = {
url: string;
decoder: Decoder;
};
export function pipeline(in$: Observable): Observable {
type scanObj = { decoders: Decoder[], out: { decoder: Decoder, url: string } }
return in$.pipe(
mergeMap(product => from(product.Images)),
mergeMap(img => {
const decoder = getDecoder(img);
if (decoder == null) {
return EMPTY
}
return of({ url: img, decoder: decoder! } as Img);
}),
// reuse decoders with same settings
scan((acc, img) => {
const reuseableDecoder = acc.decoders.find(dec => dec.matchesSettings(img.decoder));
if (reuseableDecoder == undefined) {
return { decoders: [...acc.decoders, img.decoder], out: { decoder: img.decoder, url: img.url } };
} else {
return { decoders: acc.decoders, out: { decoder: reuseableDecoder, url: img.url } };
}
}, { decoders: [] as Decoder[], out: null as never } as scanObj),
// mergeMap doesn't care about the order
mergeMap(v => v.out.decoder.buildOrReuse().decodeFromUrl(v.out.url)),
);
}
Measure Rxjs Performance
concatMap waits for each image to complete, mergeMap does all images concurrently with no respect of order. Warning: timings are randomly generated and real-ish, but not real-world.
const now = Date.now(); pipeline(in$).subscribe({ next: (img) => console.log(img, "after " + (Date.now() - now).toString() + "ms"), complete: () => console.log("Completed in " + (Date.now() - now) + "ms"), })
ConcatMap
the sum of time for each image to complete.
MergeMap
Lack of order, takes as long as the slowest image.
Go PIPELINE FAILING!
panic: send on closed channel goroutine 13 [running]: example/main/processor.Pipeline.func1.2() /home/wawa/Desktop/rxtest/processor/processor.go:31 +0x91 created by example/main/processor.Pipeline.func1 in goroutine 7 /home/wawa/Desktop/rxtest/processor/processor.go:29 +0x125 exit status 2 // processer.go go func() { if reusableDecoderIndex == -1 { // this is where it failsout <- decoder.buildOrReuse().decodeFromUrl(img)
} else { // it could fail here tooout <- decoders[reusableDecoderIndex].buildOrReuse().decodeFromUrl(img)
} }()
Where this bug occurs is limited however. HTTP-based backend services likely will not close `in` because you can receive an unlimited number of items to process. They can, however, exit before the images finish processing.
// main.go func main(){ in := make(chan models.Product) go func() { imgPipeline := processor.Pipeline(in) for img := range imgPipeline { go uploadToS3(img) } }() http.HandleFunc("/admin/create-product", func(w http.ResponseWriter, r *http.Request) { var product models.Product /// ... go func() { <-in }() }) // This blocks until it fails. If this fails at the same time images are being processed, those img goroutine will be cancelled, and not uploaded to s3. http.ListenAndServe(":8080", http.DefaultServeMux) }
This is the mock test I'm using and reproduced the error.
func main() { products := []models.Product{ { /// ...stuff Images: []string{ "https://picsum.photos/100", "https://picsum.photos/200", "https://picsum.photos/300", }, }, // ...repeat } in := make(chan models.Product) go func() { for _, p := range products { in <- p } close(in) }() go func() { out := processor.Pipeline(in) for v := range out { litter.Dump(v) } }() // doing some other quick work }
However, this is not an issue with RxJS, as mergeMap waits for all inner observables to complete.
In addition, Node.JS completes NOT when the last command in the `main` function is run, but when the event loop is empty. Learn about it here
Go, on the otherhand, will cancel all goroutines when `main` function terminates.
This issue is easy to fix in Go however, with WaitGroups.
Go's mergeMap
// processor.go func Pipeline(in <-chan models.Product) <-chan ProcessedImage { out := make(chan ProcessedImage) go func() {var wg sync.WaitGroup
decoders := []*Decoder{} for product := range in { for _, img := range product.Images {wg.Add(1)
decoder, exists := getDecoder(img) if !exists { continue // filter out non image extensions } reusableDecoderIndex := slices.IndexFunc(decoders, func(dv *Decoder) bool { return dv.MatchesSettings(decoder) }) go func() {defer wg.Done()
if reusableDecoderIndex == -1 { out <- decoder.buildOrReuse().decodeFromUrl(img) } else { out <- decoders[reusableDecoderIndex].buildOrReuse().decodeFromUrl(img) } }() } }wg.Wait() // wait for all inner goroutines to complete
close(out) }() return out }
How we use it.
// main.go func main(){ products := []models.Product{ { /// ...stuff Images: []string{ "https://picsum.photos/100", "https://picsum.photos/200", "https://picsum.photos/300", }, }, // ...repeat } in := make(chan models.Product) go func() { for _, p := range products { in <- p } close(in) }() completed := make(chan struct{}) go func() { out := processor.Pipeline(in) for v := range out { litter.Dump(v) } completed <- struct{}{} }() <-completed }
Broadcast?
Don't
completed := make(chan struct{}) go func() { out := processor.Pipeline(in) for processedImg := range out { go uploadToS3(processedImg) go backupToLocalStorage(processedImg) go sendStatistics(processedImg) } completed <- struct{}{} }() <-completed
pipeline(in$).subscribe(img => { // all of these are asynchronous uploadToS3(img); backupToLocalStorage(img); sendStatistics(img); });
I need broadcast
No you don't
"Yes I do"
Will you use dart?
"No?"
Okay
// Golang No
// RxJS const broadcast = pipeline(in$).pipe(share()); const subs = [ broadcast.subscribe(uploadToS3), broadcast.subscribe(backupToLocalStorage), broadcast.subscribe(sendStatistics), ]; onStatisticsServerCrash(() => subs.push( broadcast.subscribe(sendStatisticsToGoogleDrive) )) onCleanup(() => { subs.forEach(sub => sub.unsubscribe()); })
Uncovered points
Go read Go Concurrency Patterns: Pipelines and cancellation.