zeroshade commented on PR #864:
URL: https://github.com/apache/arrow-go/pull/864#issuecomment-4801801399
Ultimately, there is a measurable thing to fix here regarding the latency
and churn of memory and how promptly we return the buffer to the pool. But we
should improve that without breaking the public `BloomFilter` api, bypassing
the reader's allocator, or introducing new process-global pools instead of the
per-reader pool.
For reference, here's the test file I used:
```go
const (
reproGoroutines = 65
reproIters = 100
reproOps = reproGoroutines * reproIters
reproBitsetSize = 1 * 1024 * 1024
reproScratch = 2 * 1024 * 1024
reproReadSize = int32(4096)
)
func reproHeaderBytes(t testing.TB) []byte {
t.Helper()
hb, err := thrift.NewThriftSerializer().Write(context.Background(),
&format.BloomFilterHeader{
NumBytes: int32(reproBitsetSize),
Algorithm: &defaultAlgorithm,
Hash: &defaultHashStrategy,
Compression: &defaultCompression,
})
if err != nil {
t.Fatalf("serialize bloom filter header: %v", err)
}
return hb
}
// reproFileData lays out [8 byte prefix][thrift header][bitset] so a
non-zero
// BloomFilterOffset points at the header.
func reproFileData(t testing.TB) []byte {
t.Helper()
fd := append(make([]byte, 8), reproHeaderBytes(t)...)
return append(fd, make([]byte, reproBitsetSize)...)
}
func reproMeta(t testing.TB, bloomFilterOffset int64) *RowGroupMetaData {
t.Helper()
length := reproReadSize
cmd := format.ColumnMetaData{
Type: format.Type_BYTE_ARRAY,
Encodings: []format.Encoding{format.Encoding_PLAIN},
PathInSchema: []string{"test_col"},
Codec: format.CompressionCodec_UNCOMPRESSED,
BloomFilterOffset: &bloomFilterOffset,
BloomFilterLength: &length,
}
rg := format.RowGroup{Columns: []*format.ColumnChunk{{MetaData: &cmd}},
TotalByteSize: 100, NumRows: 100}
node, err := schema.NewPrimitiveNode("test_col",
parquet.Repetition(format.FieldRepetitionType_REQUIRED),
parquet.Type(format.Type_BYTE_ARRAY), -1, -1)
if err != nil {
t.Fatalf("create primitive node: %v", err)
}
root, err := schema.NewGroupNode("schema",
parquet.Repetition(format.FieldRepetitionType_REPEATED),
schema.FieldList{node}, -1)
if err != nil {
t.Fatalf("create group node: %v", err)
}
return NewRowGroupMetaData(&rg, schema.NewSchema(root), nil, nil)
}
func reproReader(fileData []byte, meta *RowGroupMetaData, pool *sync.Pool)
*RowGroupBloomFilterReader {
return &RowGroupBloomFilterReader{
input: bytes.NewReader(fileData),
sourceFileSize: int64(len(fileData)),
rgMeta: meta,
bufferPool: pool,
}
}
func reproPool() *sync.Pool {
return &sync.Pool{New: func() any {
buf := memory.NewResizableBuffer(memory.NewGoAllocator())
runtime.SetFinalizer(buf, func(o *memory.Buffer) { o.Release()
})
return buf
}}
}
// reproRecycle returns the bloom filter's buffer to the pool immediately,
doing
// exactly what the reader's runtime.AddCleanup callback does
(ResizeNoShrink(0)
// then Put), but synchronously. The registered cleanup is cancelled first
so the
// same buffer is not returned twice.
func reproRecycle(bf BloomFilter, pool *sync.Pool) {
b, ok := bf.(*blockSplitBloomFilter)
if !ok || b.data == nil {
return
}
if b.cancelCleanup != nil {
b.cancelCleanup()
}
b.data.ResizeNoShrink(0)
pool.Put(b.data)
}
func reproMeasure(t *testing.T, name string, run func() (read, nilRet
int64)) (int64, float64) {
t.Helper()
runtime.GC()
var before runtime.MemStats
runtime.ReadMemStats(&before)
start := time.Now()
read, nilRet := run()
elapsed := time.Since(start)
var after runtime.MemStats
runtime.ReadMemStats(&after)
churnMB := float64(after.TotalAlloc-before.TotalAlloc) / 1024 / 1024
t.Logf("%s\n ops=%d filters-read=%d nil-returns=%d
cumulative-alloc(churn)=%.1f MB mallocs=%d elapsed=%v",
name, reproOps, read, nilRet, churnMB,
after.Mallocs-before.Mallocs, elapsed)
return read, churnMB
}
func reproRunParallel(work func() BloomFilter) (read, nilRet int64) {
var wg sync.WaitGroup
gate := make(chan struct{})
for range reproGoroutines {
wg.Add(1)
go func() {
defer wg.Done()
<-gate
for range reproIters {
if work() != nil {
atomic.AddInt64(&read, 1)
} else {
atomic.AddInt64(&nilRet, 1)
}
}
}()
}
close(gate)
wg.Wait()
return read, nilRet
}
// Scenario A: the original reproducer. BloomFilterOffset=0 and a fresh 2
MiB file
// buffer allocated per iteration. Allocates ~13 GB while never reading a
filter.
func TestBloomFilterPoolRepro_A_OriginalHarness(t *testing.T) {
header := reproHeaderBytes(t)
meta := reproMeta(t, 0)
read, _ := reproMeasure(t, "A original harness: offset=0, fresh 2MiB
file per iteration", func() (int64, int64) {
var read, nilRet int64
var wg sync.WaitGroup
gate := make(chan struct{})
for range reproGoroutines {
wg.Add(1)
go func() {
defer wg.Done()
pool := reproPool()
<-gate
for range reproIters {
scratch := make([]byte, reproScratch)
copy(scratch, header)
bf, err := reproReader(scratch, meta,
pool).GetColumnBloomFilter(0)
if err != nil || bf == nil {
atomic.AddInt64(&nilRet, 1)
continue
}
atomic.AddInt64(&read, 1)
reproRecycle(bf, pool)
}
}()
}
close(gate)
wg.Wait()
return read, nilRet
})
if read != 0 {
t.Fatalf("expected 0 filters read with offset=0, got %d", read)
}
}
// Scenario B: corrected harness. Non-zero offset (read path runs), shared
reader,
// buffer returned to the pool promptly. The 1 MiB bitset buffer is reused.
func TestBloomFilterPoolRepro_B_SharedReaderPromptReturn(t *testing.T) {
fileData := reproFileData(t)
meta := reproMeta(t, 8)
pool := reproPool()
rdr := reproReader(fileData, meta, pool)
bf, err := rdr.GetColumnBloomFilter(0)
if err != nil || bf == nil {
t.Fatalf("sanity read failed: bf=%v err=%v", bf, err)
}
if bf.Size() != int64(reproBitsetSize) {
t.Fatalf("unexpected filter size: got %d want %d", bf.Size(),
reproBitsetSize)
}
reproRecycle(bf, pool)
read, churnMB := reproMeasure(t, "B shared reader + prompt return:
offset>0", func() (int64, int64) {
return reproRunParallel(func() BloomFilter {
bf, err := rdr.GetColumnBloomFilter(0)
if err != nil || bf == nil {
return nil
}
reproRecycle(bf, pool)
return bf
})
})
if read != reproOps {
t.Fatalf("expected %d filters read, got %d", reproOps, read)
}
if churnMB > 500 {
t.Fatalf("pool not reusing buffers: churn %.1f MB (1 MiB x %d
ops would be ~%d MB without reuse)",
churnMB, reproOps, reproOps)
}
}
// Scenario C: corrected harness using the reader's real return path
// (runtime.AddCleanup). GC-timing dependent; reported only.
func TestBloomFilterPoolRepro_C_SharedReaderCleanupOnly(t *testing.T) {
fileData := reproFileData(t)
meta := reproMeta(t, 8)
pool := reproPool()
rdr := reproReader(fileData, meta, pool)
reproMeasure(t, "C shared reader + cleanup-only return: offset>0
(reader's real path)", func() (int64, int64) {
return reproRunParallel(func() BloomFilter {
bf, err := rdr.GetColumnBloomFilter(0)
if err != nil {
return nil
}
return bf
})
})
}
// Scenario D1: a new reader is built every iteration (as in the original
harness)
// but the file bytes are shared and the offset is non-zero. A shared pool
still
// recycles, so recreating the reader is not what defeated reuse.
func TestBloomFilterPoolRepro_D1_FreshReaderSharedPool(t *testing.T) {
fileData := reproFileData(t)
meta := reproMeta(t, 8)
pool := reproPool()
read, churnMB := reproMeasure(t, "D1 fresh reader + shared pool:
offset>0, file shared, prompt return", func() (int64, int64) {
return reproRunParallel(func() BloomFilter {
bf, err := reproReader(fileData, meta,
pool).GetColumnBloomFilter(0)
if err != nil || bf == nil {
return nil
}
reproRecycle(bf, pool)
return bf
})
})
if read != reproOps {
t.Fatalf("expected %d filters read, got %d", reproOps, read)
}
if churnMB > 500 {
t.Fatalf("fresh reader defeated shared pool reuse: churn %.1f
MB", churnMB)
}
}
// Scenario D2: a new pool is built per read (i.e. a new file.Reader per
single
// read). This is the only configuration where per-reader pooling cannot
help.
func TestBloomFilterPoolRepro_D2_FreshReaderFreshPool(t *testing.T) {
fileData := reproFileData(t)
meta := reproMeta(t, 8)
reproMeasure(t, "D2 fresh reader + fresh pool per read: offset>0, file
shared", func() (int64, int64) {
return reproRunParallel(func() BloomFilter {
bf, err := reproReader(fileData, meta,
reproPool()).GetColumnBloomFilter(0)
if err != nil {
return nil
}
return bf
})
})
}
// BenchmarkBloomFilterPoolReuse reports allocs/op for the read path with the
// buffer returned to the pool. A reused 1 MiB bitset does not show up per
op.
func BenchmarkBloomFilterPoolReuse(b *testing.B) {
fileData := reproFileData(b)
meta := reproMeta(b, 8)
pool := reproPool()
rdr := reproReader(fileData, meta, pool)
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
bf, err := rdr.GetColumnBloomFilter(0)
if err != nil || bf == nil {
continue
}
reproRecycle(bf, pool)
}
})
}
```
Running it locally myself with go test -bench I get the following results:
| Test Case | filters-read | churn |
| -------------- | ---------------- | --------- |
| A - original test (offset = 0) | 0 | 13002.7 MB |
| B - shared reader + prompt return | 6500 | 77.1 MB |
| C - cleanup-only (real path for main) | 6500 | 3419.5 MB |
| D1 - fresh reader + shared pool | 6500 | 39.0 MB |
| D2 - fresh reader + fresh pool | 6500 | 6617.0 MB |
Let's construct a good solution for getting more prompt returns to the pool
without breaking the API or defeating the pool. Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]