Skip to main content Link Search Menu Expand Document (external link)

Stateful operators

Some operators are inherently stateful. Distinct e.g. keeps an internal map with all distinct values seen. Reduce keeps the state internally and updates it on every value with the reducer function.

Many other operators, like Filter and Map are inherently stateless. They execute a function to filter/map values and pass them to the output channel, and they keep no internal state at all. That doesn’t mean they can’t be made stateful though. One can simply provide an external state in the function closure. Let’s see a couple of examples.

Distinct can easily be implemented by using Filter in a stateful manner:

allSeen := map[any]bool{}
jpipe.FromGenerator(pipeline, func(i uint64) int { return rand.Intn(100) }).
    Filter(func(x int) bool{
        if _, seen := allSeen[x]; seen {
			return false
		}
        seen[x] = true
        return true
    })

The full distinct logic could be encapsulated in a helper function:

func distinctFilter() func(x int) bool {
    allSeen := map[any]bool{}
    return func(x int) bool {
        if _, seen := allSeen[x]; seen {
			return false
		}
        seen[x] = true
        return true
    }
}

distinctFilter could be easily reused then:

jpipe.FromGenerator(pipeline, func(i uint64) int { return rand.Intn(100) }).
    Filter(distinctFilter())

Let’s see an example of stateful Map now. Imagine we want to transform a stream of integers into a stream of their moving average. We create a movingAvg function like this:

func movingAvg(windowSize int32) func(int) float64 {
    window := make([]int, windowSize)
    sum := int64(0)
    index := 0
    currentWindowSize := 0
    return func(x int) float64 {
        sum -= window[index]
        sum += int64(x)
        window[index] = x

        index = (index + 1) % windowSize
        if currentWindowSize < windowSize {
            currentWindowSize++
        }

        return float64(sum) / currentWindowSize
    }
}

Usage is then simply:

channel := jpipe.FromGenerator(pipeline, func(i uint64) int { return rand.Intn(100) })
movingAverageChannel := jpipe.Map(channel, movingAvg(5))