Patzifist commented on PR #864:
URL: https://github.com/apache/arrow-go/pull/864#issuecomment-4848546250

   Test for original check
   
   go test -bench=BenchmarkGetColumnBloomFilter_OriginalSyncPool -run=^$ 
-memprofile=mem.out
   goos: linux
   goarch: amd64
   pkg: github.com/apache/arrow-go/v18/parquet/metadata
   cpu: 13th Gen Intel(R) Core(TM) i7-1355U
   BenchmarkGetColumnBloomFilter_OriginalSyncPool-12              1        
1108550680 ns/op        5550647112 B/op   225956 allocs/op
   PASS
   ok      github.com/apache/arrow-go/v18/parquet/metadata 1.262s
   
   
   ```
   package metadata
   
   import (
        "bytes"
        "context"
        "math/rand"
        "sync"
        "testing"
        "time"
   
        "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/apache/arrow-go/v18/parquet"
        format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
        "github.com/apache/arrow-go/v18/parquet/internal/thrift"
        "github.com/apache/arrow-go/v18/parquet/schema"
   )
   
   type fakeReader struct {
        *bytes.Reader
   }
   
   func (f *fakeReader) ReadAt(p []byte, off int64) (int, error) {
        return f.Reader.ReadAt(p, off)
   }
   
   func BenchmarkGetColumnBloomFilter_OriginalSyncPool(b *testing.B) {
        ctx := context.Background()
   
        const (
                workersCount        = 64
                iterationsPerWorker = 100
                maxSize             = 4 * 1024 * 1024
        )
   
        originalArrowPool := &sync.Pool{
                New: func() any {
                        return 
memory.NewResizableBuffer(memory.NewGoAllocator())
                },
        }
   
        node, _ := schema.NewPrimitiveNode("test_col", 
parquet.Repetition(format.FieldRepetitionType_REQUIRED), 
parquet.Type(format.Type_BYTE_ARRAY), -1, -1)
        rootGroup, _ := schema.NewGroupNode("schema", 
parquet.Repetition(format.FieldRepetitionType_REPEATED), 
schema.FieldList{node}, -1)
        sc := schema.NewSchema(rootGroup)
   
        payload := make([]byte, maxSize)
   
        b.ResetTimer()
        b.ReportAllocs()
   
        for n := 0; n < b.N; n++ {
                var wg sync.WaitGroup
                startSignal := make(chan struct{})
   
                for w := 0; w < workersCount; w++ {
                        wg.Add(1)
                        go func(workerID int) {
                                defer wg.Done()
                                <-startSignal
   
                                r := 
rand.New(rand.NewSource(time.Now().UnixNano() + int64(workerID)))
   
                                threadRdr := &RowGroupBloomFilterReader{
                                        input:          &fakeReader{Reader: 
bytes.NewReader(payload)},
                                        sourceFileSize: int64(len(payload)),
                                        bufferPool:     originalArrowPool,
                                }
   
                                for i := 0; i < iterationsPerWorker; i++ {
                                        bloomFilterDataSize := 
int32(1*1024*1024 + r.Intn(200*1024))
                                        bloomFilterReadSize := 
bloomFilterDataSize + 4096
   
                                        header := format.BloomFilterHeader{
                                                NumBytes:    
bloomFilterDataSize,
                                                Algorithm:   &defaultAlgorithm,
                                                Hash:        
&defaultHashStrategy,
                                                Compression: 
&defaultCompression,
                                        }
   
                                        serializer := 
thrift.NewThriftSerializer()
                                        headerBytes, _ := serializer.Write(ctx, 
&header)
   
                                        var offset int64 = 8
                                        copy(payload[offset:], headerBytes)
   
                                        columnMetaData := format.ColumnMetaData{
                                                Type:              
format.Type_BYTE_ARRAY,
                                                PathInSchema:      
[]string{"test_col"},
                                                Codec:             
format.CompressionCodec_UNCOMPRESSED,
                                                BloomFilterOffset: &offset,
                                                BloomFilterLength: 
&bloomFilterReadSize,
                                        }
                                        thriftColumnChunk := 
format.ColumnChunk{MetaData: &columnMetaData}
                                        thriftRowGroup := 
format.RowGroup{Columns: []*format.ColumnChunk{&thriftColumnChunk}}
                                        meta := 
NewRowGroupMetaData(&thriftRowGroup, sc, nil, nil)
   
                                        threadRdr.rgMeta = meta
   
                                        bf, err := 
threadRdr.GetColumnBloomFilter(0)
                                        if err != nil {
                                                return
                                        }
   
                                        _ = bf
                                }
                        }(w)
                }
   
                close(startSignal)
                wg.Wait()
        }
   }
   
   ```
   go tool pprof -http=:8080 mem.out
   
   
github.com/apache/arrow-go/v18/parquet/metadata.(*RowGroupBloomFilterReader).GetColumnBloomFilter
   /home/alekssmirnov/GolandProjects/arrow-go/parquet/metadata/bloom_filter.go
   
     Total:      1.50MB     5.16GB (flat, cum) 99.56%
       490            .          .                              compression:  
*header.Compression, 
       491            .          .                      }, nil 
       492            .          .              } 
       493            .          .            
       494            .          .              headerBuf := 
r.bufferPool.Get().(*memory.Buffer) 
       495            .     4.06GB              
headerBuf.ResizeNoShrink(int(bloomFilterReadSize)) 
       496            .          .              defer func() { 
       497            .          .                      if headerBuf != nil { 
       498            .          .                              
headerBuf.ResizeNoShrink(0) 
       499            .          .                              
r.bufferPool.Put(headerBuf) 
       500            .          .                      } 
       501            .          .              }() 
       502            .          .            
       503            .          .              if _, err = 
sectionRdr.Read(headerBuf.Bytes()); err != nil { 
       504            .          .                      return nil, err 
       505            .          .              } 
       506            .          .            
       507            .        2MB              remaining, err := 
thrift.DeserializeThrift(&header, headerBuf.Bytes()) 
       508            .          .              if err != nil { 
       509            .          .                      return nil, err 
       510            .          .              } 
       511            .          .              headerSize := 
len(headerBuf.Bytes()) - int(remaining) 
       512            .          .            
       513            .          .              if err = 
validateBloomFilterHeader(&header); err != nil { 
       514            .          .                      return nil, err 
       515            .          .              } 
       516            .          .            
       517            .          .              bloomFilterSz := 
header.NumBytes 
       518            .          .              var bitset []byte 
       519            .          .              if 
int(bloomFilterSz)+headerSize <= len(headerBuf.Bytes()) { 
       520            .          .                      // bloom filter data is 
entirely contained in the buffer we just read 
       521            .          .                      bitset = 
headerBuf.Bytes()[headerSize : headerSize+int(bloomFilterSz)] 
       522            .          .              } else { 
       523            .          .                      buf := 
r.bufferPool.Get().(*memory.Buffer) 
       524            .     1.09GB                      
buf.ResizeNoShrink(int(bloomFilterSz)) 
       525            .          .                      filterBytesInHeader := 
headerBuf.Len() - headerSize 
       526            .          .                      if filterBytesInHeader 
> 0 { 
       527            .          .                              
copy(buf.Bytes(), headerBuf.Bytes()[headerSize:]) 
       528            .          .                      } 
       529            .          .            
       530            .          .                      if _, err = 
sectionRdr.Read(buf.Bytes()[filterBytesInHeader:]); err != nil { 
       531            .          .                              return nil, err 
       532            .          .                      } 
       533            .          .                      bitset = buf.Bytes() 
       534            .          .                      
headerBuf.ResizeNoShrink(0) 
       535            .          .                      
r.bufferPool.Put(headerBuf) 
       536            .          .                      headerBuf = buf 
       537            .          .              } 
       538            .          .            
       539       1.50MB     1.50MB              bf := &blockSplitBloomFilter{ 
       540            .          .                      data:         
headerBuf, 
       541            .          .                      bitset32:     
arrow.GetData[uint32](bitset), 
       542            .          .                      hasher:       
xxhasher{}, 
       543            .          .                      algorithm:    
*header.Algorithm, 
       544            .          .                      hashStrategy: 
*header.Hash, 
       545            .          .                      compression:  
*header.Compression, 
       546            .          .              } 
       547            .          .              headerBuf = nil 
       548            .     1.50MB              addCleanup(bf, r.bufferPool) 
       549            .          .              return bf, nil 
       550            .          .           } 
       551            .          .            
       552            .          .           type FileBloomFilterBuilder struct 
{ 
       553            .          .              Schema    *schema.Schema 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to