zeroshade commented on PR #864:
URL: https://github.com/apache/arrow-go/pull/864#issuecomment-4801801399

   Ultimately, there is a measurable thing to fix here regarding the latency 
and churn of memory and how promptly we return the buffer to the pool. But we 
should improve that without breaking the public `BloomFilter` api, bypassing 
the reader's allocator, or introducing new process-global pools instead of the 
per-reader pool.
   
   For reference, here's the test file I used:
   
   ```go
   const (
        reproGoroutines = 65
        reproIters      = 100
        reproOps        = reproGoroutines * reproIters
        reproBitsetSize = 1 * 1024 * 1024
        reproScratch    = 2 * 1024 * 1024
        reproReadSize   = int32(4096)
   )
   
   func reproHeaderBytes(t testing.TB) []byte {
        t.Helper()
        hb, err := thrift.NewThriftSerializer().Write(context.Background(), 
&format.BloomFilterHeader{
                NumBytes:    int32(reproBitsetSize),
                Algorithm:   &defaultAlgorithm,
                Hash:        &defaultHashStrategy,
                Compression: &defaultCompression,
        })
        if err != nil {
                t.Fatalf("serialize bloom filter header: %v", err)
        }
        return hb
   }
   
   // reproFileData lays out [8 byte prefix][thrift header][bitset] so a 
non-zero
   // BloomFilterOffset points at the header.
   func reproFileData(t testing.TB) []byte {
        t.Helper()
        fd := append(make([]byte, 8), reproHeaderBytes(t)...)
        return append(fd, make([]byte, reproBitsetSize)...)
   }
   
   func reproMeta(t testing.TB, bloomFilterOffset int64) *RowGroupMetaData {
        t.Helper()
        length := reproReadSize
        cmd := format.ColumnMetaData{
                Type:              format.Type_BYTE_ARRAY,
                Encodings:         []format.Encoding{format.Encoding_PLAIN},
                PathInSchema:      []string{"test_col"},
                Codec:             format.CompressionCodec_UNCOMPRESSED,
                BloomFilterOffset: &bloomFilterOffset,
                BloomFilterLength: &length,
        }
        rg := format.RowGroup{Columns: []*format.ColumnChunk{{MetaData: &cmd}}, 
TotalByteSize: 100, NumRows: 100}
        node, err := schema.NewPrimitiveNode("test_col", 
parquet.Repetition(format.FieldRepetitionType_REQUIRED),
                parquet.Type(format.Type_BYTE_ARRAY), -1, -1)
        if err != nil {
                t.Fatalf("create primitive node: %v", err)
        }
        root, err := schema.NewGroupNode("schema", 
parquet.Repetition(format.FieldRepetitionType_REPEATED),
                schema.FieldList{node}, -1)
        if err != nil {
                t.Fatalf("create group node: %v", err)
        }
        return NewRowGroupMetaData(&rg, schema.NewSchema(root), nil, nil)
   }
   
   func reproReader(fileData []byte, meta *RowGroupMetaData, pool *sync.Pool) 
*RowGroupBloomFilterReader {
        return &RowGroupBloomFilterReader{
                input:          bytes.NewReader(fileData),
                sourceFileSize: int64(len(fileData)),
                rgMeta:         meta,
                bufferPool:     pool,
        }
   }
   
   func reproPool() *sync.Pool {
        return &sync.Pool{New: func() any {
                buf := memory.NewResizableBuffer(memory.NewGoAllocator())
                runtime.SetFinalizer(buf, func(o *memory.Buffer) { o.Release() 
})
                return buf
        }}
   }
   
   // reproRecycle returns the bloom filter's buffer to the pool immediately, 
doing
   // exactly what the reader's runtime.AddCleanup callback does 
(ResizeNoShrink(0)
   // then Put), but synchronously. The registered cleanup is cancelled first 
so the
   // same buffer is not returned twice.
   func reproRecycle(bf BloomFilter, pool *sync.Pool) {
        b, ok := bf.(*blockSplitBloomFilter)
        if !ok || b.data == nil {
                return
        }
        if b.cancelCleanup != nil {
                b.cancelCleanup()
        }
        b.data.ResizeNoShrink(0)
        pool.Put(b.data)
   }
   
   func reproMeasure(t *testing.T, name string, run func() (read, nilRet 
int64)) (int64, float64) {
        t.Helper()
        runtime.GC()
        var before runtime.MemStats
        runtime.ReadMemStats(&before)
   
        start := time.Now()
        read, nilRet := run()
        elapsed := time.Since(start)
   
        var after runtime.MemStats
        runtime.ReadMemStats(&after)
        churnMB := float64(after.TotalAlloc-before.TotalAlloc) / 1024 / 1024
        t.Logf("%s\n    ops=%d filters-read=%d nil-returns=%d 
cumulative-alloc(churn)=%.1f MB mallocs=%d elapsed=%v",
                name, reproOps, read, nilRet, churnMB, 
after.Mallocs-before.Mallocs, elapsed)
        return read, churnMB
   }
   
   func reproRunParallel(work func() BloomFilter) (read, nilRet int64) {
        var wg sync.WaitGroup
        gate := make(chan struct{})
        for range reproGoroutines {
                wg.Add(1)
                go func() {
                        defer wg.Done()
                        <-gate
                        for range reproIters {
                                if work() != nil {
                                        atomic.AddInt64(&read, 1)
                                } else {
                                        atomic.AddInt64(&nilRet, 1)
                                }
                        }
                }()
        }
        close(gate)
        wg.Wait()
        return read, nilRet
   }
   
   // Scenario A: the original reproducer. BloomFilterOffset=0 and a fresh 2 
MiB file
   // buffer allocated per iteration. Allocates ~13 GB while never reading a 
filter.
   func TestBloomFilterPoolRepro_A_OriginalHarness(t *testing.T) {
        header := reproHeaderBytes(t)
        meta := reproMeta(t, 0)
   
        read, _ := reproMeasure(t, "A original harness: offset=0, fresh 2MiB 
file per iteration", func() (int64, int64) {
                var read, nilRet int64
                var wg sync.WaitGroup
                gate := make(chan struct{})
                for range reproGoroutines {
                        wg.Add(1)
                        go func() {
                                defer wg.Done()
                                pool := reproPool()
                                <-gate
                                for range reproIters {
                                        scratch := make([]byte, reproScratch)
                                        copy(scratch, header)
                                        bf, err := reproReader(scratch, meta, 
pool).GetColumnBloomFilter(0)
                                        if err != nil || bf == nil {
                                                atomic.AddInt64(&nilRet, 1)
                                                continue
                                        }
                                        atomic.AddInt64(&read, 1)
                                        reproRecycle(bf, pool)
                                }
                        }()
                }
                close(gate)
                wg.Wait()
                return read, nilRet
        })
   
        if read != 0 {
                t.Fatalf("expected 0 filters read with offset=0, got %d", read)
        }
   }
   
   // Scenario B: corrected harness. Non-zero offset (read path runs), shared 
reader,
   // buffer returned to the pool promptly. The 1 MiB bitset buffer is reused.
   func TestBloomFilterPoolRepro_B_SharedReaderPromptReturn(t *testing.T) {
        fileData := reproFileData(t)
        meta := reproMeta(t, 8)
        pool := reproPool()
        rdr := reproReader(fileData, meta, pool)
   
        bf, err := rdr.GetColumnBloomFilter(0)
        if err != nil || bf == nil {
                t.Fatalf("sanity read failed: bf=%v err=%v", bf, err)
        }
        if bf.Size() != int64(reproBitsetSize) {
                t.Fatalf("unexpected filter size: got %d want %d", bf.Size(), 
reproBitsetSize)
        }
        reproRecycle(bf, pool)
   
        read, churnMB := reproMeasure(t, "B shared reader + prompt return: 
offset>0", func() (int64, int64) {
                return reproRunParallel(func() BloomFilter {
                        bf, err := rdr.GetColumnBloomFilter(0)
                        if err != nil || bf == nil {
                                return nil
                        }
                        reproRecycle(bf, pool)
                        return bf
                })
        })
   
        if read != reproOps {
                t.Fatalf("expected %d filters read, got %d", reproOps, read)
        }
        if churnMB > 500 {
                t.Fatalf("pool not reusing buffers: churn %.1f MB (1 MiB x %d 
ops would be ~%d MB without reuse)",
                        churnMB, reproOps, reproOps)
        }
   }
   
   // Scenario C: corrected harness using the reader's real return path
   // (runtime.AddCleanup). GC-timing dependent; reported only.
   func TestBloomFilterPoolRepro_C_SharedReaderCleanupOnly(t *testing.T) {
        fileData := reproFileData(t)
        meta := reproMeta(t, 8)
        pool := reproPool()
        rdr := reproReader(fileData, meta, pool)
   
        reproMeasure(t, "C shared reader + cleanup-only return: offset>0 
(reader's real path)", func() (int64, int64) {
                return reproRunParallel(func() BloomFilter {
                        bf, err := rdr.GetColumnBloomFilter(0)
                        if err != nil {
                                return nil
                        }
                        return bf
                })
        })
   }
   
   // Scenario D1: a new reader is built every iteration (as in the original 
harness)
   // but the file bytes are shared and the offset is non-zero. A shared pool 
still
   // recycles, so recreating the reader is not what defeated reuse.
   func TestBloomFilterPoolRepro_D1_FreshReaderSharedPool(t *testing.T) {
        fileData := reproFileData(t)
        meta := reproMeta(t, 8)
        pool := reproPool()
   
        read, churnMB := reproMeasure(t, "D1 fresh reader + shared pool: 
offset>0, file shared, prompt return", func() (int64, int64) {
                return reproRunParallel(func() BloomFilter {
                        bf, err := reproReader(fileData, meta, 
pool).GetColumnBloomFilter(0)
                        if err != nil || bf == nil {
                                return nil
                        }
                        reproRecycle(bf, pool)
                        return bf
                })
        })
   
        if read != reproOps {
                t.Fatalf("expected %d filters read, got %d", reproOps, read)
        }
        if churnMB > 500 {
                t.Fatalf("fresh reader defeated shared pool reuse: churn %.1f 
MB", churnMB)
        }
   }
   
   // Scenario D2: a new pool is built per read (i.e. a new file.Reader per 
single
   // read). This is the only configuration where per-reader pooling cannot 
help.
   func TestBloomFilterPoolRepro_D2_FreshReaderFreshPool(t *testing.T) {
        fileData := reproFileData(t)
        meta := reproMeta(t, 8)
   
        reproMeasure(t, "D2 fresh reader + fresh pool per read: offset>0, file 
shared", func() (int64, int64) {
                return reproRunParallel(func() BloomFilter {
                        bf, err := reproReader(fileData, meta, 
reproPool()).GetColumnBloomFilter(0)
                        if err != nil {
                                return nil
                        }
                        return bf
                })
        })
   }
   
   // BenchmarkBloomFilterPoolReuse reports allocs/op for the read path with the
   // buffer returned to the pool. A reused 1 MiB bitset does not show up per 
op.
   func BenchmarkBloomFilterPoolReuse(b *testing.B) {
        fileData := reproFileData(b)
        meta := reproMeta(b, 8)
        pool := reproPool()
        rdr := reproReader(fileData, meta, pool)
   
        b.ReportAllocs()
        b.ResetTimer()
        b.RunParallel(func(pb *testing.PB) {
                for pb.Next() {
                        bf, err := rdr.GetColumnBloomFilter(0)
                        if err != nil || bf == nil {
                                continue
                        }
                        reproRecycle(bf, pool)
                }
        })
   }
   ```
   
   Running it locally myself with go test -bench I get the following results:
   
   |  Test Case | filters-read | churn |
   | -------------- | ---------------- | --------- |
   | A - original test (offset = 0) | 0 | 13002.7 MB |
   | B - shared reader + prompt return | 6500 | 77.1 MB |
   | C - cleanup-only (real path for main) | 6500 | 3419.5 MB |
   | D1 - fresh reader + shared pool | 6500 | 39.0 MB |
   | D2 - fresh reader + fresh pool | 6500 | 6617.0 MB |
   
   Let's construct a good solution for getting more prompt returns to the pool 
without breaking the API or defeating the pool. Thoughts?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to