Batch A Channel By Size Or Time in Go
There are some cases where you need to read enough items from a channel to begin processing. However, you also want to a TTL to expire with a smaller batch if there are not more items for a while.
Here’s an example of streaming a channel of string
into an output channel of []string
that guarantees:
- Each batch contains at least one string.
- A batch will never contain more than
maxItems
. - If there is at least one string to be read at any given time, it will always send a new batch within
maxTimeout
. - A batch will contain less than
maxItems
ifmaxTimeout
has occurred since the the last batch finished (not since the first item in the batch). This is important because it means that if the stream is scarce and infrequent — such as one item permaxTimeout
—BatchStrings
will prefer to send a consistently timed batch of one item rather than more volatile intervals that might contain two items.
package main
import (
"fmt"
"time"
)
func BatchStrings(values <-chan string, maxItems int, maxTimeout time.Duration) chan []string {
batches := make(chan []string)
go func() {
defer close(batches)
for keepGoing := true; keepGoing; {
var batch []string
expire := time.After(maxTimeout)
for {
select {
case value, ok := <-values:
if !ok {
keepGoing = false
goto done
}
batch = append(batch, value)
if len(batch) == maxItems {
goto done
}
case <-expire:
goto done
}
}
done:
if len(batch) > 0 {
batches <- batch
}
}
}()
return batches
}
func main() {
strings := make(chan string)
go func() {
strings <- "hello"
strings <- "there" // hit limit of 2
strings <- "how"
time.Sleep(15 * time.Millisecond) // hit timeout
strings <- "are"
strings <- "you" // hit limit of 2
// A really long time without any new values.
time.Sleep(500 * time.Millisecond)
strings <- "doing?" // the last incomplete batch
close(strings)
}()
start := time.Now()
batches := BatchStrings(strings, 2, 10*time.Millisecond)
for batch := range batches {
fmt.Println(time.Now().Sub(start), batch)
}
}
Try it now: https://play.golang.org/p/5NqYwehuxr-
The above example will halt when the channel is closed. It can also be written in a way that is context aware:
package main
import (
"context"
"fmt"
"time"
)
func BatchStringsCtx(ctx context.Context, values <-chan string, maxItems int, maxTimeout time.Duration) chan []string {
batches := make(chan []string)
go func() {
defer close(batches)
for keepGoing := true; keepGoing; {
var batch []string
expire := time.After(maxTimeout)
for {
select {
case <-ctx.Done():
keepGoing = false
goto done
case value, ok := <-values:
if !ok {
keepGoing = false
goto done
}
batch = append(batch, value)
if len(batch) == maxItems {
goto done
}
case <-expire:
goto done
}
}
done:
if len(batch) > 0 {
batches <- batch
}
}
}()
return batches
}
func main() {
strings := make(chan string)
go func() {
strings <- "hello"
strings <- "there" // hit limit of 2
strings <- "how"
time.Sleep(15 * time.Millisecond) // hit timeout
strings <- "are"
strings <- "you" // hit limit of 2
// A really long time without any new values.
// The context was cancelled around 300ms,
// before this sleep finished.
time.Sleep(500 * time.Millisecond)
strings <- "doing?" // never read
close(strings)
}()
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel()
start := time.Now()
batches := BatchStringsCtx(ctx, strings, 2, 10*time.Millisecond)
for batch := range batches {
fmt.Println(time.Now().Sub(start), batch)
}
}
Now the stream can be stopped by either cancelling the context or closing the channel.
Try it now: https://play.golang.org/p/O7Wq1faP5qw