This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ee3a8d8a92 ARROW-16790: [Go][Parquet] Avoid unnecessary memory 
allocations for skipping rows (#13887)
ee3a8d8a92 is described below

commit ee3a8d8a92e5d01c3c3846fb400c5c523c625642
Author: Matt Topol <[email protected]>
AuthorDate: Tue Aug 16 14:24:59 2022 -0400

    ARROW-16790: [Go][Parquet] Avoid unnecessary memory allocations for 
skipping rows (#13887)
    
    Authored-by: Matt Topol <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 go/parquet/file/column_reader.go      | 18 +++++++++++++-----
 go/parquet/file/column_reader_test.go | 19 ++++++++++++++++++-
 go/parquet/file/column_writer_test.go | 20 +++++++++++++++++++-
 go/parquet/file/file_reader.go        | 23 +++++++++++++++++++++++
 go/parquet/file/record_reader.go      | 25 +++++++++++++------------
 go/parquet/file/row_group_reader.go   |  5 ++++-
 go/parquet/pqarrow/column_readers.go  |  5 +++--
 go/parquet/pqarrow/file_reader.go     |  6 +++---
 8 files changed, 96 insertions(+), 25 deletions(-)

diff --git a/go/parquet/file/column_reader.go b/go/parquet/file/column_reader.go
index 32fa21adea..cfc0b22432 100644
--- a/go/parquet/file/column_reader.go
+++ b/go/parquet/file/column_reader.go
@@ -18,6 +18,7 @@ package file
 
 import (
        "fmt"
+       "sync"
 
        "github.com/apache/arrow/go/v10/arrow/memory"
        "github.com/apache/arrow/go/v10/internal/utils"
@@ -125,6 +126,7 @@ type columnChunkReader struct {
        // the number of values we've decoded so far
        numDecoded int64
        mem        memory.Allocator
+       bufferPool *sync.Pool
 
        decoders      map[format.Encoding]encoding.TypedDecoder
        decoderTraits encoding.DecoderTraits
@@ -136,8 +138,12 @@ type columnChunkReader struct {
 
 // NewColumnReader returns a column reader for the provided column initialized 
with the given pagereader that will
 // provide the pages of data for this column. The type is determined from the 
column passed in.
-func NewColumnReader(descr *schema.Column, pageReader PageReader, mem 
memory.Allocator) ColumnChunkReader {
-       base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, 
decoders: make(map[format.Encoding]encoding.TypedDecoder)}
+//
+// In addition to the page reader and allocator, a pointer to a shared 
sync.Pool is expected to provide buffers for temporary
+// usage to minimize allocations. The bufferPool should provide *memory.Buffer 
objects that can be resized as necessary, buffers
+// should have `ResizeNoShrink(0)` called on them before being put back into 
the pool.
+func NewColumnReader(descr *schema.Column, pageReader PageReader, mem 
memory.Allocator, bufferPool *sync.Pool) ColumnChunkReader {
+       base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, 
decoders: make(map[format.Encoding]encoding.TypedDecoder), bufferPool: 
bufferPool}
        switch descr.PhysicalType() {
        case parquet.Types.FixedLenByteArray:
                base.decoderTraits = &encoding.FixedLenByteArrayDecoderTraits
@@ -435,15 +441,17 @@ func (c *columnChunkReader) skipValues(nvalues int64, 
readFn func(batch int64, b
                                valsRead  int64 = 0
                        )
 
-                       // TODO(ARROW-16790): ideally we should re-use a shared 
pool of buffers to avoid unnecessary memory allocation for skips
-                       scratch := memory.NewResizableBuffer(c.mem)
+                       scratch := c.bufferPool.Get().(*memory.Buffer)
+                       defer func() {
+                               scratch.ResizeNoShrink(0)
+                               c.bufferPool.Put(scratch)
+                       }()
                        bufMult := 1
                        if c.descr.PhysicalType() == parquet.Types.Boolean {
                                // for bools, BytesRequired returns 1 byte per 
8 bool, but casting []byte to []bool requires 1 byte per 1 bool
                                bufMult = 8
                        }
                        
scratch.Reserve(c.decoderTraits.BytesRequired(int(batchSize) * bufMult))
-                       defer scratch.Release()
 
                        for {
                                batchSize = utils.Min(batchSize, toskip)
diff --git a/go/parquet/file/column_reader_test.go 
b/go/parquet/file/column_reader_test.go
index eb3409942f..c0b727ed0c 100755
--- a/go/parquet/file/column_reader_test.go
+++ b/go/parquet/file/column_reader_test.go
@@ -20,6 +20,8 @@ import (
        "math"
        "math/rand"
        "reflect"
+       "runtime"
+       "sync"
        "testing"
 
        "github.com/apache/arrow/go/v10/arrow/memory"
@@ -173,10 +175,25 @@ type PrimitiveReaderSuite struct {
        nvalues         int
        maxDefLvl       int16
        maxRepLvl       int16
+
+       bufferPool sync.Pool
+}
+
+func (p *PrimitiveReaderSuite) SetupTest() {
+       p.bufferPool = sync.Pool{
+               New: func() interface{} {
+                       buf := memory.NewResizableBuffer(mem)
+                       runtime.SetFinalizer(buf, func(obj *memory.Buffer) {
+                               obj.Release()
+                       })
+                       return buf
+               },
+       }
 }
 
 func (p *PrimitiveReaderSuite) TearDownTest() {
        p.clear()
+       p.bufferPool = sync.Pool{}
 }
 
 func (p *PrimitiveReaderSuite) initReader(d *schema.Column) {
@@ -185,7 +202,7 @@ func (p *PrimitiveReaderSuite) initReader(d *schema.Column) 
{
        m.TestData().Set("pages", p.pages)
        m.On("Err").Return((error)(nil))
        p.pager = m
-       p.reader = file.NewColumnReader(d, m, mem)
+       p.reader = file.NewColumnReader(d, m, mem, &p.bufferPool)
 }
 
 func (p *PrimitiveReaderSuite) checkResults(typ reflect.Type) {
diff --git a/go/parquet/file/column_writer_test.go 
b/go/parquet/file/column_writer_test.go
index dc141ba1d3..39eeb06f23 100755
--- a/go/parquet/file/column_writer_test.go
+++ b/go/parquet/file/column_writer_test.go
@@ -20,6 +20,8 @@ import (
        "bytes"
        "math"
        "reflect"
+       "runtime"
+       "sync"
        "testing"
 
        "github.com/apache/arrow/go/v10/arrow/bitutil"
@@ -223,6 +225,8 @@ type PrimitiveWriterTestSuite struct {
        metadata   *metadata.ColumnChunkMetaDataBuilder
        sink       *encoding.BufferWriter
        readbuffer *memory.Buffer
+
+       bufferPool sync.Pool
 }
 
 func (p *PrimitiveWriterTestSuite) SetupTest() {
@@ -230,12 +234,26 @@ func (p *PrimitiveWriterTestSuite) SetupTest() {
        p.props = parquet.NewWriterProperties()
        p.SetupSchema(parquet.Repetitions.Required, 1)
        p.descr = p.Schema.Column(0)
+
+       p.bufferPool = sync.Pool{
+               New: func() interface{} {
+                       buf := memory.NewResizableBuffer(mem)
+                       runtime.SetFinalizer(buf, func(obj *memory.Buffer) {
+                               obj.Release()
+                       })
+                       return buf
+               },
+       }
+}
+
+func (p *PrimitiveWriterTestSuite) TearDownTest() {
+       p.bufferPool = sync.Pool{}
 }
 
 func (p *PrimitiveWriterTestSuite) buildReader(nrows int64, compression 
compress.Compression) file.ColumnChunkReader {
        p.readbuffer = p.sink.Finish()
        pagereader, _ := 
file.NewPageReader(arrutils.NewBufferedReader(bytes.NewReader(p.readbuffer.Bytes()),
 p.readbuffer.Len()), nrows, compression, mem, nil)
-       return file.NewColumnReader(p.descr, pagereader, mem)
+       return file.NewColumnReader(p.descr, pagereader, mem, &p.bufferPool)
 }
 
 func (p *PrimitiveWriterTestSuite) buildWriter(_ int64, columnProps 
parquet.ColumnProperties, version parquet.Version) file.ColumnChunkWriter {
diff --git a/go/parquet/file/file_reader.go b/go/parquet/file/file_reader.go
index a7e6525ace..d9a73faa63 100644
--- a/go/parquet/file/file_reader.go
+++ b/go/parquet/file/file_reader.go
@@ -22,6 +22,8 @@ import (
        "fmt"
        "io"
        "os"
+       "runtime"
+       "sync"
 
        "github.com/apache/arrow/go/v10/arrow/memory"
        "github.com/apache/arrow/go/v10/parquet"
@@ -47,6 +49,8 @@ type Reader struct {
        metadata      *metadata.FileMetaData
        footerOffset  int64
        fileDecryptor encryption.FileDecryptor
+
+       bufferPool sync.Pool
 }
 
 type ReadOption func(*Reader)
@@ -113,6 +117,16 @@ func NewParquetReader(r parquet.ReaderAtSeeker, opts 
...ReadOption) (*Reader, er
                f.props = parquet.NewReaderProperties(memory.NewGoAllocator())
        }
 
+       f.bufferPool = sync.Pool{
+               New: func() interface{} {
+                       buf := memory.NewResizableBuffer(f.props.Allocator())
+                       runtime.SetFinalizer(buf, func(obj *memory.Buffer) {
+                               obj.Release()
+                       })
+                       return buf
+               },
+       }
+
        if f.metadata == nil {
                return f, f.parseMetaData()
        }
@@ -120,6 +134,14 @@ func NewParquetReader(r parquet.ReaderAtSeeker, opts 
...ReadOption) (*Reader, er
        return f, nil
 }
 
+// BufferPool returns the internal buffer pool being utilized by this reader.
+// This is primarily for use by the pqarrow.FileReader or anything that builds
+// on top of the Reader and constructs their own ColumnReaders (like the
+// RecordReader)
+func (f *Reader) BufferPool() *sync.Pool {
+       return &f.bufferPool
+}
+
 // Close will close the current reader, and if the underlying reader being used
 // is an `io.Closer` then Close will be called on it too.
 func (f *Reader) Close() error {
@@ -290,5 +312,6 @@ func (f *Reader) RowGroup(i int) *RowGroupReader {
                r:             f.r,
                sourceSz:      f.footerOffset,
                fileDecryptor: f.fileDecryptor,
+               bufferPool:    &f.bufferPool,
        }
 }
diff --git a/go/parquet/file/record_reader.go b/go/parquet/file/record_reader.go
index 7daefac457..3e45ee915f 100755
--- a/go/parquet/file/record_reader.go
+++ b/go/parquet/file/record_reader.go
@@ -18,6 +18,7 @@ package file
 
 import (
        "fmt"
+       "sync"
        "sync/atomic"
        "unsafe"
 
@@ -127,9 +128,9 @@ type primitiveRecordReader struct {
        useValues bool
 }
 
-func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator) 
primitiveRecordReader {
+func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator, 
bufferPool *sync.Pool) primitiveRecordReader {
        return primitiveRecordReader{
-               ColumnChunkReader: NewColumnReader(descr, nil, mem),
+               ColumnChunkReader: NewColumnReader(descr, nil, mem, bufferPool),
                values:            memory.NewResizableBuffer(mem),
                validBits:         memory.NewResizableBuffer(mem),
                mem:               mem,
@@ -326,12 +327,12 @@ func (b *binaryRecordReader) GetBuilderChunks() 
[]arrow.Array {
        return b.recordReaderImpl.(binaryRecordReaderImpl).GetBuilderChunks()
 }
 
-func newRecordReader(descr *schema.Column, info LevelInfo, mem 
memory.Allocator) RecordReader {
+func newRecordReader(descr *schema.Column, info LevelInfo, mem 
memory.Allocator, bufferPool *sync.Pool) RecordReader {
        if mem == nil {
                mem = memory.DefaultAllocator
        }
 
-       pr := createPrimitiveRecordReader(descr, mem)
+       pr := createPrimitiveRecordReader(descr, mem, bufferPool)
        return &recordReader{
                refCount:         1,
                recordReaderImpl: &pr,
@@ -722,7 +723,7 @@ func (fr *flbaRecordReader) GetBuilderChunks() 
[]arrow.Array {
        return []arrow.Array{fr.bldr.NewArray()}
 }
 
-func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem 
memory.Allocator) RecordReader {
+func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem 
memory.Allocator, bufferPool *sync.Pool) RecordReader {
        if mem == nil {
                mem = memory.DefaultAllocator
        }
@@ -731,7 +732,7 @@ func newFLBARecordReader(descr *schema.Column, info 
LevelInfo, mem memory.Alloca
 
        return &binaryRecordReader{&recordReader{
                recordReaderImpl: &flbaRecordReader{
-                       createPrimitiveRecordReader(descr, mem),
+                       createPrimitiveRecordReader(descr, mem, bufferPool),
                        array.NewFixedSizeBinaryBuilder(mem, 
&arrow.FixedSizeBinaryType{ByteWidth: byteWidth}),
                        nil,
                },
@@ -750,7 +751,7 @@ type byteArrayRecordReader struct {
        valueBuf []parquet.ByteArray
 }
 
-func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem 
memory.Allocator) RecordReader {
+func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem 
memory.Allocator, bufferPool *sync.Pool) RecordReader {
        if mem == nil {
                mem = memory.DefaultAllocator
        }
@@ -762,7 +763,7 @@ func newByteArrayRecordReader(descr *schema.Column, info 
LevelInfo, mem memory.A
 
        return &binaryRecordReader{&recordReader{
                recordReaderImpl: &byteArrayRecordReader{
-                       createPrimitiveRecordReader(descr, mem),
+                       createPrimitiveRecordReader(descr, mem, bufferPool),
                        array.NewBinaryBuilder(mem, dt),
                        nil,
                },
@@ -840,13 +841,13 @@ func (br *byteArrayRecordReader) GetBuilderChunks() 
[]arrow.Array {
 
 // TODO(mtopol): create optimized readers for dictionary types after 
ARROW-7286 is done
 
-func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem 
memory.Allocator) RecordReader {
+func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem 
memory.Allocator, bufferPool *sync.Pool) RecordReader {
        switch descr.PhysicalType() {
        case parquet.Types.ByteArray:
-               return newByteArrayRecordReader(descr, info, mem)
+               return newByteArrayRecordReader(descr, info, mem, bufferPool)
        case parquet.Types.FixedLenByteArray:
-               return newFLBARecordReader(descr, info, mem)
+               return newFLBARecordReader(descr, info, mem, bufferPool)
        default:
-               return newRecordReader(descr, info, mem)
+               return newRecordReader(descr, info, mem, bufferPool)
        }
 }
diff --git a/go/parquet/file/row_group_reader.go 
b/go/parquet/file/row_group_reader.go
index 71c71ec38e..b2b5bcf155 100644
--- a/go/parquet/file/row_group_reader.go
+++ b/go/parquet/file/row_group_reader.go
@@ -18,6 +18,7 @@ package file
 
 import (
        "fmt"
+       "sync"
 
        "github.com/apache/arrow/go/v10/internal/utils"
        "github.com/apache/arrow/go/v10/parquet"
@@ -38,6 +39,8 @@ type RowGroupReader struct {
        rgMetadata    *metadata.RowGroupMetaData
        props         *parquet.ReaderProperties
        fileDecryptor encryption.FileDecryptor
+
+       bufferPool *sync.Pool
 }
 
 // MetaData returns the metadata of the current Row Group
@@ -65,7 +68,7 @@ func (r *RowGroupReader) Column(i int) (ColumnChunkReader, 
error) {
        if err != nil {
                return nil, fmt.Errorf("parquet: unable to initialize page 
reader: %w", err)
        }
-       return NewColumnReader(descr, pageRdr, r.props.Allocator()), nil
+       return NewColumnReader(descr, pageRdr, r.props.Allocator(), 
r.bufferPool), nil
 }
 
 func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) {
diff --git a/go/parquet/pqarrow/column_readers.go 
b/go/parquet/pqarrow/column_readers.go
index b298e2b4c9..73577b616e 100644
--- a/go/parquet/pqarrow/column_readers.go
+++ b/go/parquet/pqarrow/column_readers.go
@@ -20,6 +20,7 @@ import (
        "encoding/binary"
        "fmt"
        "reflect"
+       "sync"
        "sync/atomic"
        "time"
        "unsafe"
@@ -50,13 +51,13 @@ type leafReader struct {
        refCount int64
 }
 
-func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator, 
leafInfo file.LevelInfo, props ArrowReadProperties) (*ColumnReader, error) {
+func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator, 
leafInfo file.LevelInfo, props ArrowReadProperties, bufferPool *sync.Pool) 
(*ColumnReader, error) {
        ret := &leafReader{
                rctx:      rctx,
                field:     field,
                input:     input,
                descr:     input.Descr(),
-               recordRdr: file.NewRecordReader(input.Descr(), leafInfo, 
field.Type.ID() == arrow.DICTIONARY, rctx.mem),
+               recordRdr: file.NewRecordReader(input.Descr(), leafInfo, 
field.Type.ID() == arrow.DICTIONARY, rctx.mem, bufferPool),
                props:     props,
                refCount:  1,
        }
diff --git a/go/parquet/pqarrow/file_reader.go 
b/go/parquet/pqarrow/file_reader.go
index 7d345d6187..f62b4571b8 100755
--- a/go/parquet/pqarrow/file_reader.go
+++ b/go/parquet/pqarrow/file_reader.go
@@ -210,7 +210,7 @@ func (fr *FileReader) GetFieldReaders(ctx context.Context, 
colIndices, rowGroups
        // greatly improves performance.
        // GetFieldReader causes read operations, when issued serially on large 
numbers of columns,
        // this is super time consuming. Get field readers concurrently.
-       g,gctx := errgroup.WithContext(ctx)
+       g, gctx := errgroup.WithContext(ctx)
        if !fr.Props.Parallel {
                g.SetLimit(1)
        }
@@ -482,7 +482,7 @@ func (fr *FileReader) getReader(ctx context.Context, field 
*SchemaField, arrowFi
                        return nil, nil
                }
 
-               out, err = newLeafReader(&rctx, field.Field, 
rctx.colFactory(field.ColIndex, rctx.rdr), field.LevelInfo, fr.Props)
+               out, err = newLeafReader(&rctx, field.Field, 
rctx.colFactory(field.ColIndex, rctx.rdr), field.LevelInfo, fr.Props, 
fr.rdr.BufferPool())
                return
        }
 
@@ -499,7 +499,7 @@ func (fr *FileReader) getReader(ctx context.Context, field 
*SchemaField, arrowFi
                // When reading structs with large numbers of columns, the 
serial load is very slow.
                // This is especially true when reading Cloud Storage. Loading 
concurrently
                // greatly improves performance.
-               g,gctx := errgroup.WithContext(ctx)
+               g, gctx := errgroup.WithContext(ctx)
                if !fr.Props.Parallel {
                        g.SetLimit(1)
                }

Reply via email to