Skip to main content Link Search Menu Expand Document (external link)

Wrapping

We provide a convenience type item.Item to wrap pipeline items that can either have a Value or an Error:

type Item[T any] struct {
    Value T
    Error error
    Ctx   context.Context
}

Any Channel[T] can be easily converted to a Channel[Item[T]] by calling Wrap:

wrappedChannel := jpipe.Wrap(channel)

jpipe.Item is useful for two things: error handling and context passing. Let’s look into those.

Error handling

Pipeline errors occur when the pipeline gets canceled, either automatically by context cancellation, or manually by calling pipeline.Cancel(error). You may decide to call pipeline.Cancel(error) if you find some condition that you deem as a critical error in the pipeline. Let’s say you need to do 20 calls to a rate-limited API, and you want to stop the pipeline if one call returns a rate-limit error:

channel := jpipe.FromRange(pipeline, 1, 20).
    ForEach(func(id int) {
        value, err := getValueFromAPI(id)
        if err != nil {
            fmt.Printf("Error calling API %v\n", err)
            if err.Error() == "rate limit exceeded" {
                pipeline.Cancel(err)
            }
            return
        }
        fmt.Printf("Got value %s from the API for id %d\n", value, id)
    })

But not all errors are critical, and non-critical errors should probably not cancel the pipeline. Instead, it may be interesting to pass them along the pipeline:

channel := jpipe.FromRange(pipeline, 1, 20)
wrappedChannel := jpipe.Wrap(channel)
<-jpipe.Map(wrappedChannel, func(it item.Item[int]) item.Item[string] {
        value, err := getValueFromAPI(id)
        return item.Item[string]{Value: value, Error: err, Ctx: it.Ctx}
    }).
    Filter(func(item item.Item[string]) bool {
        if item.Error != nil {
            fmt.Printf("Error calling API %v\n", item.Error)
            return false
        }
        return true
    }).
    ForEach(func(item item.Item[string]) { saveValueToDB(item.Value) })

Context passing

We’ve already seen that item.Item contains a context. It is useful in case you want to enrich that context as items pass along the pipeline. Let’s take the same snippet as before, but enrich the context with some data:

channel := jpipe.FromRange(pipeline, 1, 20)
wrappedChannel := jpipe.Wrap(channel)
<-jpipe.Map(wrappedChannel, func(it item.Item[int]) item.Item[string] {
        value, err := getValueFromAPI(id)
        ctx := context.WithValue(item.Context(), "id", id)
        return item.Item[string]{Value: value, Error: err, Ctx: ctx}
    }).
    Filter(func(item item.Item[string]) bool {
        if item.Error != nil {
            fmt.Printf("Error calling API %v\n", item.Error)
            return false
        }
        return true
    }).
    ForEach(func(item item.Item[string]) {
        fmt.Printf("Saving value to DB for id %s", item.Context.Value("id"))
        saveValueToDB(item.Value)
    })

We don’t encourage passing data throught the context if it’s part of the logical flow of the pipeline, but it can be useful to have some values in the context for some cross-cutting concerns like logging and tracing.

Custom item types

You may need a more specialized Item type than item.Item. In that case, don’t hesitate to create your own struct, suitable for your specific needs. item.Item is not meant to get in your way.