Self-stopping Operators
Some operators stop themselves at some point. Our example pipeline uses Take:
pipeline := jpipe.New(ctx)
<-jpipe.FromRange(pipeline, 1, 1000).
Filter(func(id int) bool { return !idExists(id) }).
Take(5).
ForEach(func(id int) { createEntityWithId(id) })
The Take(5) operator stops itself after it has processed 5 elements. Stopping itself involves two important steps:
- It closes its output channel: When
ForEachrealizes its input channel is closed, it will stop itself too. - It unsubscribes from the input channel: When
Filterrealizes it has no downstream operator subscribed, it will stop itself too. This eventually makesFromRangestop. At that point, all operators are stopped, so the pipeline itself will be stopped, andpipeline.Done()’s channel will be closed.
So the main lesson here is: self-stopping operators can potentially stop the whole pipeline. They are particularly useful when the source(FromRange here) may produce a lot of data of which only a subset is needed. Some source operators like FromGenerator produce a virtually unlimited number of items, and in such cases, only a self-stopping operator or a manual pipeline.Cancel(error) can stop the pipeline.