Batch A Channel By Size Or Time in Go

Elliot Chance
3 min readJul 23, 2019

--

Photo by Perry Grone on Unsplash

There are some cases where you need to read enough items from a channel to begin processing. However, you also want to a TTL to expire with a smaller batch if there are not more items for a while.

Here’s an example of streaming a channel of string into an output channel of []string that guarantees:

  1. Each batch contains at least one string.
  2. A batch will never contain more than maxItems.
  3. If there is at least one string to be read at any given time, it will always send a new batch within maxTimeout .
  4. A batch will contain less than maxItems if maxTimeout has occurred since the the last batch finished (not since the first item in the batch). This is important because it means that if the stream is scarce and infrequent — such as one item per maxTimeoutBatchStrings will prefer to send a consistently timed batch of one item rather than more volatile intervals that might contain two items.
package main

import (
"fmt"
"time"
)

func BatchStrings(values <-chan string, maxItems int, maxTimeout time.Duration) chan []string {
batches := make(chan []string)

go func() {
defer close(batches)

for keepGoing := true; keepGoing; {
var batch []string
expire := time.After(maxTimeout)
for {
select {
case value, ok := <-values:
if !ok {
keepGoing = false
goto done
}

batch = append(batch, value)
if len(batch) == maxItems {
goto done
}

case <-expire:
goto done
}
}

done:
if len(batch) > 0 {
batches <- batch
}
}
}()

return batches
}

func main() {
strings := make(chan string)

go func() {
strings <- "hello"
strings <- "there" // hit limit of 2

strings <- "how"
time.Sleep(15 * time.Millisecond) // hit timeout

strings <- "are"
strings <- "you" // hit limit of 2

// A really long time without any new values.
time.Sleep(500 * time.Millisecond)

strings <- "doing?" // the last incomplete batch

close(strings)
}()

start := time.Now()
batches := BatchStrings(strings, 2, 10*time.Millisecond)
for batch := range batches {
fmt.Println(time.Now().Sub(start), batch)
}
}

Try it now: https://play.golang.org/p/5NqYwehuxr-

The above example will halt when the channel is closed. It can also be written in a way that is context aware:

package main

import (
"context"
"fmt"
"time"
)

func BatchStringsCtx(ctx context.Context, values <-chan string, maxItems int, maxTimeout time.Duration) chan []string {
batches := make(chan []string)

go func() {
defer close(batches)

for keepGoing := true; keepGoing; {
var batch []string
expire := time.After(maxTimeout)

for {
select {
case <-ctx.Done():
keepGoing = false
goto done

case value, ok := <-values:
if !ok {
keepGoing = false
goto done
}

batch = append(batch, value)
if len(batch) == maxItems {
goto done
}
case <-expire:
goto done
}
}

done:
if len(batch) > 0 {
batches <- batch
}
}
}()
return batches
}

func main() {
strings := make(chan string)

go func() {
strings <- "hello"
strings <- "there" // hit limit of 2

strings <- "how"
time.Sleep(15 * time.Millisecond) // hit timeout

strings <- "are"
strings <- "you" // hit limit of 2

// A really long time without any new values.
// The context was cancelled around 300ms,
// before this sleep finished.
time.Sleep(500 * time.Millisecond)

strings <- "doing?" // never read

close(strings)
}()

ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel()

start := time.Now()
batches := BatchStringsCtx(ctx, strings, 2, 10*time.Millisecond)
for batch := range batches {
fmt.Println(time.Now().Sub(start), batch)
}
}

Now the stream can be stopped by either cancelling the context or closing the channel.

Try it now: https://play.golang.org/p/O7Wq1faP5qw

--

--

Elliot Chance
Elliot Chance

Written by Elliot Chance

I’m a data nerd and TDD enthusiast originally from Sydney. Currently working for Uber in New York. My thoughts here are my own. 🤓 elliotchance@gmail.com

Responses (1)