Skip to main content Link Search Menu Expand Document (external link)

Usage

JPipe is built with several principles in mind:

  • Type safety: All pipeline operators must be type safe. Go generics allow us to do this, so there must be no instances of interface{} in the API.
  • Safe cancellation: JPipe must guarantee safe cancellation of pipelines, and ensure no goroutines are leaked.
  • Asynchronous operation: Pipeline operators must execute asynchronously. Reading output data from the pipeline also has to be possible asynchronously.
  • Concurrency: It must be easy to set the concurrency of pipeline operators.
  • Lazy execution: Pipeline operators must start working only when the pipeline itself starts.
  • Fluent API: The API must be fluent, allowing to create pipelines with chained operators as a single statement. This is done on best effort, cause some operators just can’t be made chainable with the current Go generics implementation.
  • Non-intrusive operator options: Operator options must be optional(variadic) and not passing options should render the logically default behavior of the operator. This makes the library easier for users who don’t care about tuning operator options.
  • Channel-centric: The main abstraction of the library is the Channel, which is just a wrapper for a Go channel. We don’t want to introduce additional abstractions like e.g. observers/observables, cause that would force Go developers to learn a new concurrency paradigm.

Basic concepts

JPipe revolves around three basic concepts: Pipeline, Channel and operator.

  • Pipelines: A Pipeline is a container for the classic Pipelines and Cancellation pattern. It works as a coordinator or context for multiple operators and Channels.
  • Channels: A Channel is a wrapper for a Go channel. It is the means of passing data between two operators. It provides chainable methods to construct pipelines, but conceptually it must be seen as a nothing but an enhanced Go channel.
  • Operators: An operator or node is a step in the pipeline. It receives data from input Channels, processes it and sends data to output Channels.

A Pipeline is more easily thought of as a directed acyclic graph(DAG), where operators are nodes and Channels are edges:

graph LR;
    A["FromSlice\n(operator)"]--Channel-->B["Filter\n(operator)"];
    B--Channel-->C["Map\n(operator)"];
    C--Channel-->D["ForEach\n(operator)"];

Operator types

There are two special types of operators:

  • Source: Source operators generate data into the pipeline. They have no inputs and one output Channel. In DAG terms, they are root nodes.
  • Sink: Sink operators consume data from the pipeline. They have one input and no output Channels. Processed data is yielded by the operator on a Go channel, but this is not part of the Pipeline anymore. In DAG terms, sink operators are leaf nodes.

The rest of the operators are intermediate operators. They have at least one input and at least one output. Intermediate operators can be further classified in combination, fan-out, filtering, transformation and utility operators, but that’s not relevant at this point.

Creating a pipeline

Creating a pipeline is as simple as:

pipeline = jpipe.New(ctx)

The above creates a pipeline that gets automatically canceled when the context is canceled. The pipeline will automatically start the moment a sink operator is created on it.

If you need more control over the pipeline behaviour, you can use:

pipeline = jpipe.NewPipeline(jpipe.Config{
    Context: ctx,
    StartManually: true
})

The passed context can be nil, and in that case, no automatic cancellation will happen. Cancellation can be triggered manually though, by calling pipeline.Cancel().

If StartManually is true, the pipeline won’t start until pipeline.Start() is called. This can be useful in complex pipelines with several sink operators, if you want to ensure that all pipeline operators will start simultaneously.

pipeline.Done() can be used to check whether the pipeline has stopped. It’s similar to context.Done(). Once done, pipeline.Error() can be used to check if there was an error in the pipeline.

Generating data

Data is injected into the pipeline with source operators. A common way to generate data is by starting from a slice:

channel := jpipe.FromSlice(pipeline, []int{ 1, 2, 3, 4, 5})

The above snippet created a Channel from a slice. No work is being done yet, data will only flow through that channel when the Pipeline starts.

Processing data

Intermediate operators process data from one(or more) input channels to one(or more) output channels. Take e.g. the simple Filter operator, which discards data that does not match a predicate function. The following only passes through odd numbers:

channel := channel.Filter(func(x int) bool { return x % 2 == 1 })

Filter is actually a method on the Channel type, so it’s a chainable operator. Unfortunately, not all operators are chainable, because of limitations in Go generics. The classic Map isn’t:

channel :=  jpipe.Map(channel, func(x int) int { return x * 10 })

Consuming data

Data is consumed from the pipeline with sink operators. The simplest sink operator is ForEach, which just runs a function for each input value:

result :=  channel.ForEach(func(x int) { fmt.Printf("Processing %d\n", x) })

The returned result is a Go channel that will close when either all values have been processed, or the pipeline is canceled. Having it be a Go channel makes the ForEach method asynchronous, so we don’t have to wait for it to complete, and we can do other things. Many times though, we actually want to wait for it to complete, so we just do this:

<-channel.ForEach(func(x int) { fmt.Printf("Processing %d\n", x) })

And finally let’s add some concurrency to it just to showcase the feature:

<-channel.ForEach(func(x int) { fmt.Printf("Processing %d\n", x) }, jpipe.Concurrent(2))

Putting it all together

Now let’s run all the above steps in a single pipeline:

ctx := context.Background()
pipeline = jpipe.New(ctx)
channel := jpipe.FromSlice(pipeline, []int{ 1, 2, 3, 4, 5}).
    Filter(func(x int) bool { return x % 2 == 1 })
<-jpipe.Map(channel, func(x int) int { return x * 10 }).
    ForEach(func(x int) { fmt.Printf("Processing %d\n", x) }, jpipe.Concurrent(2))

if err := pipeline.Error(); err != nil {
    fmt.Printf("Error in pipeline %v\n", err)
}
Console output:
Processing 10
Processing 50
Processing 30

Notice how we couldn’t have the pipeline be a single chained statement because the Map operator is not chainable. That’s a bit of an annoyance, but hopefully something you can live with.

Also notice how the output is not ordered. That’s because the jpipe.Concurrent operator does not guarantee order. That’s a tradeoff that comes with concurrency.

Finally, notice how we check the pipeline for error. The Foreach may have returned, but we don’t know if the pipeline completed successfully or just failed. Always check your pipelines for error, as you would do with any function that returns error.

If you want to get deeper into how to use JPipe, check out the following pages that go into more detail:


Table of contents