Initial Implementation: Correct but Wasteful
Let's suppose you've got one function that produces values and another that reads or inspects them. Here's the function that produces the values:
func write(out chan *bytes.Buffer) {
defer close(out)
for range 10 {
out <- bytes.NewBufferString(randomKey())
}
}
In this case, the 'values' are instances of *bytes.Buffer
with some random data:
func randomKey() string {
for key := range valid {
return key
}
panic("no values")
}
var valid = map[string]struct{}{
"A": {},
"BB": {},
"CCC": {},
}
So, we produce 10 buffers, each of which contains either A
, BB
, or CCC
. It's very important to remember that. Now, here's the function that reads/inspects the values:
func read(in chan *bytes.Buffer) {
for b := range in {
content := b.Bytes()
runtime.Gosched() // simulate some minimal processing
s := string(content)
if _, ok := valid[s]; !ok {
fmt.Println("invalid key:", s)
}
}
}
This code isn't that interesting, and you may be questioning why we ask for the content as .Bytes()
only to convert back to string a few lines later (after some simulated 'processing'), but just go with it (this is a simplified example). The if
statement verifies that the data received in the buffer matches one of the values in the valid
map. Running this code several times produces no output, verifying that all values are valid.
For reference, here's the main
function that kicks off the whole process:
func main() {
ch := make(chan *bytes.Buffer)
go write(ch)
read(ch)
}
Second Draft: Efficient but Incorrect
As this code evolves to handle higher quantities of inputs it will perform a lot of allocations (one per buffer). But we could make this code more memory-efficient if we reused buffers, perhaps via a sync.Pool
. We'd create the pool in the main
function and pass it to the write
and read
functions so they could call pool.Get()
and pool.Put(...)
, respectively:
func main() {
pool := &sync.Pool{
New: func() any { return bytes.NewBuffer(nil) },
}
ch := make(chan *bytes.Buffer)
go write(pool, ch)
read(pool, ch)
}
The write
function now calls pool.Get()
and, like a responsible citizen, calls .Reset()
on the buffer before doing any writing:
func write(pool *sync.Pool, out chan *bytes.Buffer) {
defer close(out)
for range 10 {
b := pool.Get().(*bytes.Buffer)
b.Reset()
b.WriteString(randomKey())
out <- b
}
}
Now, it will be the job of the read
function to call pool.Put(...)
so that each buffer can be reused:
func read(pool *sync.Pool, in chan *bytes.Buffer) {
for b := range in {
content := b.Bytes()
pool.Put(b)
runtime.Gosched() // simulate some minimal processing
s := string(content)
if _, ok := valid[s]; !ok {
fmt.Println("invalid key:", s)
}
}
}
All good, right? Wrong! Here's a sample of the output of executing this code:
invalid key: AB
invalid key: B
invalid key: CC
invalid key: ACC
invalid key: C
How on earth are five keys coming out invalid? That's half our workload since we're loading the channel with 10 items! This simple example is based on a much more complicated scenario I recently encountered1 and it took me a while to grasp the errors in my assumptions, which were that calling bytes.Buffer.Bytes()
would produce a copy of the underlying bytes rather than a reference to them, and that bytes.Buffer.Reset()
would zero out the underlying slice.
Here's the implementation of bytes.Buffer.Bytes()
, which doesn't copy anything:
func (b *Buffer) Bytes() []byte { return b.buf[b.off:] }
And here's bytes.Buffer.Reset()
, which doesn't clear anything:
func (b *Buffer) Reset() {
b.buf = b.buf[:0]
b.off = 0
b.lastRead = opInvalid
}
bytes.Buffer
is a wrapper over a []byte
The important thing to realize is that a slice is a reference. When we assign b.Bytes()
(the underlying slice) to a local variable and then call pool.Put(...)
we allow the write
goroutine to call pool.Get()
, potentially retrieving the very same buffer, which may call Reset()
and Write()
before the read
goroutine has converted the slice to a string. At that point, the damage has been done and the (now shared) data has been corrupted.
My whole goal was to reduce pressure on the garbage collector by reducing allocations via object reuse. So, how would you resolve this issue while retaining the usage of the sync.Pool
in order to reuse the buffers? Take a moment to think about that before reading on.
Toward a Final Draft: correct, but no longer efficient
We must save our own local copy of the bytes from the bytes.Buffer
, perhaps like this:
func read(pool *sync.Pool, in chan *bytes.Buffer) {
for b := range in {
working := make([]byte, b.Len())
_ = copy(working, b.Bytes()) // <-- save our own copy!
pool.Put(b)
runtime.Gosched() // simulate some minimal processing
s := string(working)
if _, ok := valid[s]; !ok {
fmt.Println("invalid key:", s)
}
}
}
This solution, while 'correct', reverts back to allocating a new byte slice for every buffer we process, which completely defeats the purpose of the sync.Pool
.
Final Draft: The best of both worlds
The solution we utilized required the definition of a secondary buffer in the read
goroutine2 which could be Reset()
and then used as a working copy, allowing the pool.Put(...)
to truly take ownership of each buffer:
func read(pool *sync.Pool, in chan *bytes.Buffer) {
working := bytes.NewBuffer(nil)
for b := range in {
working.Reset()
_, _ = io.Copy(working, b) // <-- save our own copy!
pool.Put(b)
content := working.Bytes()
runtime.Gosched() // simulate some minimal processing
s := string(content)
if _, ok := valid[s]; !ok {
fmt.Println("invalid key:", s)
}
}
}
With that change in place, the program goes back to producing no output because the buffers are all managed appropriately. This program can now scale to arbitrarily large quantities of items processed without putting the originally observed linear pressure on the garbage collector.
Conclusion (TIL)
So, if you are trying to reuse a bytes.Buffer
(or a structure that contains one) via a sync.Pool
make sure to copy away the bytes before calling pool.Put(...)
.
If desired, you may review all three separate versions of the above code: