This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 6b441673 feat(parquet/file): pre-allocate BinaryBuilder data buffer
using column chunk metadata to eliminate resize overhead (#689)
6b441673 is described below
commit 6b4416735ef322a234bd915d8721a2c69561689f
Author: junyan-ling <[email protected]>
AuthorDate: Thu Mar 5 14:22:07 2026 -0800
feat(parquet/file): pre-allocate BinaryBuilder data buffer using column
chunk metadata to eliminate resize overhead (#689)
### Rationale for this change
This PR is to address issue
https://github.com/apache/arrow-go/issues/688
`byteArrayRecordReader` builds binary/string Arrow arrays using
`array.BinaryBuilder`, but the builder's data buffer starts empty and
grows via repeated doublings as values are appended. For large binary
columns this causes O(log n) realloc+copy cycles per row group, wasting
both time and memory.
This PR threads column chunk size metadata (`TotalUncompressedSize`,
`NumRows`) from `columnIterator.NextChunk()` down to `leafReader`, and
uses it to pre-allocate the builder's data buffer at the start of each
`LoadBatch` call via `BinaryBuilder.ReserveData`.
### What changes are included in this PR?
- **`parquet/file/record_reader.go`**: adds `ReserveData(int64)` to
`BinaryRecordReader` interface and implements it on
`byteArrayRecordReader`; adds a no-op implementation on
`flbaRecordReader`.
- **`parquet/pqarrow/file_reader.go`**: `columnIterator.NextChunk()` now
returns `(PageReader, uncompressedBytes, numRows, error)`.
- **`parquet/pqarrow/column_readers.go`**: `leafReader` stores current
row group metadata; `LoadBatch` calls
`reserveBinaryData(nrecords)` after each reset; `nextRowGroup` takes a
`remainingRows` parameter to extend the reservation when crossing
row group boundaries mid-batch.
- **`parquet/pqarrow/properties.go`**: adds `PreAllocBinaryData bool` to
`ArrowReadProperties` (default: `false`).
Opt in via:
```go
props := pqarrow.ArrowReadProperties{
PreAllocBinaryData: true,
}
reader, err := pqarrow.NewFileReader(pf, props, mem)
```
### Are these changes tested?
Yes. parquet/pqarrow/binary_prealloc_test.go covers:
- Default flag value is false (no behaviour change for existing callers)
- Correctness of output for binary, string, nullable, int32, FLBA, and
dict-encoded columns
- All batch size configurations: unbounded, one batch per row group,
multiple batches per row group, and batches that span row group
boundaries
Benchmark in parquet/pqarrow/reader_writer_test.go
(BenchmarkPreAllocBinaryData) compares prealloc=false vs prealloc=true on a
two-column
schema (slim string id + fat binary blob, 5 KB–50 KB values, Zstd, 2 row
groups × 484 rows):
Environment: Apple M1 Max · count=3 · medians reported
```
┌────────────────┬─────────────┬─────────────┬────────┬─────────────┬─────────────┬────────┬────────────────┬───────────────┬─────────┐
│ Sub-benchmark │ ns/op │ ns/op │ Δ │ B/op │
B/op (true) │ Δ B/op │ allocs/op │ allocs/op │ Δ │
│ │ (false) │ (true) │ ns/op │ (false) │
│ │ (false) │ (true) │ allocs │
├────────────────┼─────────────┼─────────────┼────────┼─────────────┼─────────────┼────────┼────────────────┼───────────────┼─────────┤
│ batchAll │ 9,117,272 │ 7,993,732 │ -12.3% │ 144,021,824 │
115,098,562 │ -20.1% │ 511 │ 494 │ -3.3% │
├────────────────┼─────────────┼─────────────┼────────┼─────────────┼─────────────┼────────┼────────────────┼───────────────┼─────────┤
│ batchPerRG │ 9,190,661 │ 8,083,567 │ -12.0% │ 144,024,680 │
115,096,686 │ -20.1% │ 513 │ 493 │ -3.9% │
├────────────────┼─────────────┼─────────────┼────────┼─────────────┼─────────────┼────────┼────────────────┼───────────────┼─────────┤
│ batchQuarterRG │ 9,116,379 │ 7,896,174 │ -13.4% │ 144,023,299 │
115,097,206 │ -20.1% │ 512 │ 493 │ -3.7% │
└────────────────┴─────────────┴─────────────┴────────┴─────────────┴─────────────┴────────┴────────────────┴───────────────┴─────────┘
```
Note: production workloads with larger values (~250 KB/row) will see
larger improvements - more reallocation doublings are eliminated at
greater value sizes. This benchmark uses 5–50 KB values to keep runtime
practical.
### Are there any user-facing changes?
Yes, opt-in. A new field PreAllocBinaryData bool is added to
ArrowReadProperties. It defaults to false, so all existing code is
unaffected without any changes. Users with large binary or string
columns can enable it to reduce memory allocations and improve read
throughput.
---------
Co-authored-by: Junyan Ling <[email protected]>
---
parquet/file/record_reader.go | 22 ++
parquet/pqarrow/binary_prealloc_test.go | 395 ++++++++++++++++++++++++++++++++
parquet/pqarrow/column_readers.go | 43 +++-
parquet/pqarrow/file_reader.go | 14 +-
parquet/pqarrow/properties.go | 5 +
parquet/pqarrow/reader_writer_test.go | 138 +++++++++++
6 files changed, 610 insertions(+), 7 deletions(-)
diff --git a/parquet/file/record_reader.go b/parquet/file/record_reader.go
index 9129b8e3..e74f17a6 100644
--- a/parquet/file/record_reader.go
+++ b/parquet/file/record_reader.go
@@ -91,6 +91,9 @@ type BinaryRecordReader interface {
RecordReader
GetBuilderChunks() []arrow.Array
ReadDictionary() bool
+ // ReserveData pre-allocates nbytes in the underlying data buffer to
reduce
+ // reallocations when the total data size is known in advance.
+ ReserveData(int64)
}
// recordReaderImpl is the internal interface implemented for different types
@@ -117,6 +120,7 @@ type binaryRecordReaderImpl interface {
recordReaderImpl
GetBuilderChunks() []arrow.Array
ReadDictionary() bool
+ ReserveData(int64)
}
// primitiveRecordReader is a record reader for primitive types, ie: not byte
array or fixed len byte array
@@ -343,6 +347,10 @@ func (b *binaryRecordReader) GetBuilderChunks()
[]arrow.Array {
return b.recordReaderImpl.(binaryRecordReaderImpl).GetBuilderChunks()
}
+func (b *binaryRecordReader) ReserveData(nbytes int64) {
+ b.recordReaderImpl.(binaryRecordReaderImpl).ReserveData(nbytes)
+}
+
func newRecordReader(descr *schema.Column, info LevelInfo, mem
memory.Allocator, bufferPool *sync.Pool) RecordReader {
if mem == nil {
mem = memory.DefaultAllocator
@@ -758,6 +766,8 @@ func (fr *flbaRecordReader) GetBuilderChunks()
[]arrow.Array {
func (fr *flbaRecordReader) ReadDictionary() bool { return false }
+func (fr *flbaRecordReader) ReserveData(int64) {}
+
func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem
memory.Allocator, bufferPool *sync.Pool) RecordReader {
if mem == nil {
mem = memory.DefaultAllocator
@@ -817,6 +827,18 @@ func (br *byteArrayRecordReader) ReserveValues(extra
int64, hasNullable bool) er
return br.primitiveRecordReader.ReserveValues(extra, hasNullable)
}
+// ReserveData pre-allocates nbytes in the builder's data buffer.
+// This reduces reallocations when the total binary payload size is known in
advance,
+// e.g. from TotalUncompressedSize in the column chunk metadata.
+func (br *byteArrayRecordReader) ReserveData(nbytes int64) {
+ if nbytes <= 0 {
+ return
+ }
+ if binaryBldr, ok := br.bldr.(*array.BinaryBuilder); ok {
+ binaryBldr.ReserveData(int(nbytes))
+ }
+}
+
func (br *byteArrayRecordReader) Retain() {
br.bldr.Retain()
br.primitiveRecordReader.Retain()
diff --git a/parquet/pqarrow/binary_prealloc_test.go
b/parquet/pqarrow/binary_prealloc_test.go
new file mode 100644
index 00000000..0addaff7
--- /dev/null
+++ b/parquet/pqarrow/binary_prealloc_test.go
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pqarrow_test
+
+import (
+ "bytes"
+ "context"
+ "math/rand/v2"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/parquet"
+ "github.com/apache/arrow-go/v18/parquet/compress"
+ "github.com/apache/arrow-go/v18/parquet/file"
+ "github.com/apache/arrow-go/v18/parquet/pqarrow"
+ "github.com/stretchr/testify/require"
+)
+
+// writeParquetTable writes tbl to an in-memory parquet file using the given
+// row group size and writer properties, returning the raw bytes.
+func writeParquetTable(t *testing.T, tbl arrow.Table, rowGroupSize int64,
writerProps *parquet.WriterProperties) []byte {
+ t.Helper()
+ var buf bytes.Buffer
+ require.NoError(t, pqarrow.WriteTable(tbl, &buf, rowGroupSize,
writerProps, pqarrow.DefaultWriterProps()))
+ return buf.Bytes()
+}
+
+// readParquetTable reads a parquet file from data using ReadTable (single
pass,
+// ignores BatchSize) with the given read properties.
+func readParquetTable(t *testing.T, data []byte, props
pqarrow.ArrowReadProperties) arrow.Table {
+ t.Helper()
+ pf, err := file.NewParquetReader(bytes.NewReader(data))
+ require.NoError(t, err)
+ reader, err := pqarrow.NewFileReader(pf, props, memory.DefaultAllocator)
+ require.NoError(t, err)
+ tbl, err := reader.ReadTable(context.Background())
+ require.NoError(t, err)
+ return tbl
+}
+
+// readParquetRecords reads a parquet file by streaming record batches via
+// GetRecordReader (which honours BatchSize) and returns all rows as a table.
+func readParquetRecords(t *testing.T, data []byte, props
pqarrow.ArrowReadProperties) arrow.Table {
+ t.Helper()
+ pf, err := file.NewParquetReader(bytes.NewReader(data))
+ require.NoError(t, err)
+ reader, err := pqarrow.NewFileReader(pf, props, memory.DefaultAllocator)
+ require.NoError(t, err)
+ rr, err := reader.GetRecordReader(context.Background(), nil, nil)
+ require.NoError(t, err)
+ defer rr.Release()
+
+ var batches []arrow.RecordBatch
+ for rr.Next() {
+ rec := rr.RecordBatch()
+ rec.Retain()
+ batches = append(batches, rec)
+ }
+ require.NoError(t, rr.Err())
+ require.NotEmpty(t, batches, "expected at least one record batch")
+
+ tbl := array.NewTableFromRecords(rr.Schema(), batches)
+ for _, b := range batches {
+ b.Release()
+ }
+ return tbl
+}
+
+// assertTableColumnsEqual compares two arrow tables column by column.
+// Chunks are concatenated before comparison so differences in chunking layout
+// do not cause false failures.
+func assertTableColumnsEqual(t *testing.T, want, got arrow.Table) {
+ t.Helper()
+ require.Equal(t, want.NumRows(), got.NumRows(), "row count mismatch")
+ require.Equal(t, want.NumCols(), got.NumCols(), "column count mismatch")
+ mem := memory.DefaultAllocator
+ for i := 0; i < int(want.NumCols()); i++ {
+ wantArr, err :=
array.Concatenate(want.Column(i).Data().Chunks(), mem)
+ require.NoError(t, err)
+ defer wantArr.Release()
+ gotArr, err := array.Concatenate(got.Column(i).Data().Chunks(),
mem)
+ require.NoError(t, err)
+ defer gotArr.Release()
+ require.True(t, array.Equal(wantArr, gotArr),
+ "column %d (%s) data mismatch", i,
want.Schema().Field(i).Name)
+ }
+}
+
+// buildBinaryTable builds a single-column binary table with totalRows rows,
+// value lengths drawn uniformly from [minLen, maxLen], and nullFrac fraction
+// of rows set to null. Uses a fixed seed for reproducibility.
+func buildBinaryTable(t *testing.T, totalRows, minLen, maxLen int, nullFrac
float64) arrow.Table {
+ t.Helper()
+ mem := memory.DefaultAllocator
+ rng := rand.New(rand.NewPCG(42, 0))
+ bldr := array.NewBinaryBuilder(mem, arrow.BinaryTypes.Binary)
+ defer bldr.Release()
+ for i := 0; i < totalRows; i++ {
+ if nullFrac > 0 && rng.Float64() < nullFrac {
+ bldr.AppendNull()
+ continue
+ }
+ length := minLen + int(rng.IntN(maxLen-minLen+1))
+ val := make([]byte, length)
+ for j := range val {
+ val[j] = byte(rng.IntN(256))
+ }
+ bldr.Append(val)
+ }
+ arr := bldr.NewArray()
+ defer arr.Release()
+ sc := arrow.NewSchema([]arrow.Field{{Name: "payload", Type:
arrow.BinaryTypes.Binary, Nullable: true}}, nil)
+ col := arrow.NewColumnFromArr(sc.Field(0), arr)
+ defer col.Release()
+ return array.NewTable(sc, []arrow.Column{col}, int64(totalRows))
+}
+
+// TestPreAllocBinaryData_DefaultIsDisabled verifies that the zero value of
+// ArrowReadProperties leaves PreAllocBinaryData as false, so existing callers
+// are unaffected without any code changes.
+func TestPreAllocBinaryData_DefaultIsDisabled(t *testing.T) {
+ props := pqarrow.ArrowReadProperties{}
+ require.False(t, props.PreAllocBinaryData)
+}
+
+// TestPreAllocBinaryData_CorrectOutput verifies that enabling
PreAllocBinaryData
+// produces bit-identical output to reading with the flag disabled, across a
+// range of column types and configurations.
+func TestPreAllocBinaryData_CorrectOutput(t *testing.T) {
+ const (
+ numRGs = 2
+ rowsPerRG = 100
+ totalRows = numRGs * rowsPerRG
+ )
+ mem := memory.DefaultAllocator
+ ctx := context.Background()
+ writerProps := parquet.NewWriterProperties(
+ parquet.WithDictionaryDefault(false),
+ parquet.WithCompression(compress.Codecs.Zstd),
+ )
+
+ t.Run("binary_column", func(t *testing.T) {
+ tbl := buildBinaryTable(t, totalRows, 100, 1000, 0)
+ defer tbl.Release()
+ data := writeParquetTable(t, tbl, rowsPerRG, writerProps)
+
+ baseline := readParquetTable(t, data,
pqarrow.ArrowReadProperties{PreAllocBinaryData: false})
+ defer baseline.Release()
+ got := readParquetTable(t, data,
pqarrow.ArrowReadProperties{PreAllocBinaryData: true})
+ defer got.Release()
+
+ assertTableColumnsEqual(t, baseline, got)
+ })
+
+ t.Run("string_column", func(t *testing.T) {
+ rng := rand.New(rand.NewPCG(7, 0))
+ bldr := array.NewStringBuilder(mem)
+ defer bldr.Release()
+ for i := 0; i < totalRows; i++ {
+ length := 5 + int(rng.IntN(50))
+ val := make([]byte, length)
+ for j := range val {
+ val[j] = byte('a' + rng.IntN(26))
+ }
+ bldr.Append(string(val))
+ }
+ arr := bldr.NewArray()
+ defer arr.Release()
+ sc := arrow.NewSchema([]arrow.Field{{Name: "s", Type:
arrow.BinaryTypes.String, Nullable: false}}, nil)
+ col := arrow.NewColumnFromArr(sc.Field(0), arr)
+ defer col.Release()
+ tbl := array.NewTable(sc, []arrow.Column{col}, int64(totalRows))
+ defer tbl.Release()
+
+ data := writeParquetTable(t, tbl, rowsPerRG, writerProps)
+
+ baseline := readParquetTable(t, data,
pqarrow.ArrowReadProperties{PreAllocBinaryData: false})
+ defer baseline.Release()
+ got := readParquetTable(t, data,
pqarrow.ArrowReadProperties{PreAllocBinaryData: true})
+ defer got.Release()
+
+ assertTableColumnsEqual(t, baseline, got)
+ })
+
+ t.Run("nullable_with_nulls", func(t *testing.T) {
+ // 10% null values — verifies null positions are preserved
exactly.
+ tbl := buildBinaryTable(t, totalRows, 50, 500, 0.10)
+ defer tbl.Release()
+ data := writeParquetTable(t, tbl, rowsPerRG, writerProps)
+
+ baseline := readParquetTable(t, data,
pqarrow.ArrowReadProperties{PreAllocBinaryData: false})
+ defer baseline.Release()
+ got := readParquetTable(t, data,
pqarrow.ArrowReadProperties{PreAllocBinaryData: true})
+ defer got.Release()
+
+ assertTableColumnsEqual(t, baseline, got)
+ })
+
+ t.Run("int32_column_unaffected", func(t *testing.T) {
+ // Non-binary columns must be a no-op; the type assertion in
+ // reserveBinaryData should not panic or corrupt data.
+ bldr := array.NewInt32Builder(mem)
+ defer bldr.Release()
+ for i := 0; i < totalRows; i++ {
+ bldr.Append(int32(i))
+ }
+ arr := bldr.NewArray()
+ defer arr.Release()
+ sc := arrow.NewSchema([]arrow.Field{{Name: "n", Type:
arrow.PrimitiveTypes.Int32, Nullable: false}}, nil)
+ col := arrow.NewColumnFromArr(sc.Field(0), arr)
+ defer col.Release()
+ tbl := array.NewTable(sc, []arrow.Column{col}, int64(totalRows))
+ defer tbl.Release()
+
+ data := writeParquetTable(t, tbl, rowsPerRG, writerProps)
+
+ baseline := readParquetTable(t, data,
pqarrow.ArrowReadProperties{PreAllocBinaryData: false})
+ defer baseline.Release()
+ got := readParquetTable(t, data,
pqarrow.ArrowReadProperties{PreAllocBinaryData: true})
+ defer got.Release()
+
+ assertTableColumnsEqual(t, baseline, got)
+ })
+
+ t.Run("flba_column_unaffected", func(t *testing.T) {
+ // Fixed-length byte array columns use a no-op ReserveData on
+ // flbaRecordReader; verify no panic and correct output.
+ const byteWidth = 16
+ bldr := array.NewFixedSizeBinaryBuilder(mem,
&arrow.FixedSizeBinaryType{ByteWidth: byteWidth})
+ defer bldr.Release()
+ rng := rand.New(rand.NewPCG(13, 0))
+ for i := 0; i < totalRows; i++ {
+ val := make([]byte, byteWidth)
+ for j := range val {
+ val[j] = byte(rng.IntN(256))
+ }
+ bldr.Append(val)
+ }
+ arr := bldr.NewArray()
+ defer arr.Release()
+ sc := arrow.NewSchema([]arrow.Field{{Name: "fixed", Type:
&arrow.FixedSizeBinaryType{ByteWidth: byteWidth}, Nullable: false}}, nil)
+ col := arrow.NewColumnFromArr(sc.Field(0), arr)
+ defer col.Release()
+ tbl := array.NewTable(sc, []arrow.Column{col}, int64(totalRows))
+ defer tbl.Release()
+
+ data := writeParquetTable(t, tbl, rowsPerRG, writerProps)
+
+ baseline := readParquetTable(t, data,
pqarrow.ArrowReadProperties{PreAllocBinaryData: false})
+ defer baseline.Release()
+ got := readParquetTable(t, data,
pqarrow.ArrowReadProperties{PreAllocBinaryData: true})
+ defer got.Release()
+
+ assertTableColumnsEqual(t, baseline, got)
+ })
+
+ t.Run("dict_encoded_binary", func(t *testing.T) {
+ // Dict-encoded binary columns have a no-op ReserveData
(BinaryDictionaryBuilder
+ // does not expose ReserveData); verify no panic and correct
output.
+ dictWriterProps :=
parquet.NewWriterProperties(parquet.WithDictionaryDefault(true))
+ bldr := array.NewBinaryBuilder(mem, arrow.BinaryTypes.Binary)
+ defer bldr.Release()
+ words := [][]byte{[]byte("foo"), []byte("bar"), []byte("baz")}
+ rng := rand.New(rand.NewPCG(99, 0))
+ for i := 0; i < totalRows; i++ {
+ bldr.Append(words[rng.IntN(len(words))])
+ }
+ arr := bldr.NewArray()
+ defer arr.Release()
+ sc := arrow.NewSchema([]arrow.Field{{Name: "d", Type:
arrow.BinaryTypes.Binary, Nullable: false}}, nil)
+ col := arrow.NewColumnFromArr(sc.Field(0), arr)
+ defer col.Release()
+ tbl := array.NewTable(sc, []arrow.Column{col}, int64(totalRows))
+ defer tbl.Release()
+
+ data := writeParquetTable(t, tbl, rowsPerRG, dictWriterProps)
+
+ readProps := pqarrow.ArrowReadProperties{PreAllocBinaryData:
true}
+ readProps.SetReadDict(0, true)
+
+ pf, err := file.NewParquetReader(bytes.NewReader(data))
+ require.NoError(t, err)
+ reader, err := pqarrow.NewFileReader(pf, readProps, mem)
+ require.NoError(t, err)
+ got, err := reader.ReadTable(ctx)
+ require.NoError(t, err)
+ defer got.Release()
+
+ // Row count correctness is sufficient — dict vs non-dict
comparison
+ // is not the focus of this test.
+ require.Equal(t, int64(totalRows), got.NumRows())
+ })
+}
+
+// TestPreAllocBinaryData_BatchSizes verifies correctness across the full range
+// of batch size configurations when PreAllocBinaryData is enabled, exercising
+// the proportional pre-allocation, multi-batch-per-row-group, and row group
+// boundary crossing paths.
+func TestPreAllocBinaryData_BatchSizes(t *testing.T) {
+ const (
+ numRGs = 3
+ rowsPerRG = 48
+ totalRows = numRGs * rowsPerRG
+ )
+ writerProps := parquet.NewWriterProperties(
+ parquet.WithDictionaryDefault(false),
+ parquet.WithCompression(compress.Codecs.Zstd),
+ )
+ // Use 5% nulls to exercise nullable code paths alongside the
pre-allocation.
+ tbl := buildBinaryTable(t, totalRows, 200, 2000, 0.05)
+ defer tbl.Release()
+ data := writeParquetTable(t, tbl, rowsPerRG, writerProps)
+
+ // Baseline: read everything in one pass with the flag off.
+ baseline := readParquetTable(t, data,
pqarrow.ArrowReadProperties{PreAllocBinaryData: false})
+ defer baseline.Release()
+
+ tests := []struct {
+ name string
+ batchSize int64
+ }{
+ // batchSize=0: entire file in one RecordReader pass;
pre-allocation
+ // reserves the full row group on entry.
+ {"batchAll", 0},
+ // batchSize=rowsPerRG: exactly one batch per row group;
builder reset
+ // and re-reserved once per RG.
+ {"batchPerRG", rowsPerRG},
+ // batchSize=rowsPerRG/2: two batches per row group;
proportional
+ // estimate used for batches 2+.
+ {"batchHalfRG", rowsPerRG / 2},
+ // batchSize=rowsPerRG/4: four batches per row group; most
stress on
+ // the proportional path — the key case fixed by this feature.
+ {"batchQuarterRG", rowsPerRG / 4},
+ // batchSize spans a row group boundary; nextRowGroup is called
+ // mid-batch and extends the reservation for the incoming row
group.
+ {"batchSpansBoundary", rowsPerRG * 3 / 2},
+ }
+
+ for _, tt := range tests {
+ tt := tt
+ t.Run(tt.name, func(t *testing.T) {
+ props := pqarrow.ArrowReadProperties{
+ PreAllocBinaryData: true,
+ BatchSize: tt.batchSize,
+ }
+ got := readParquetRecords(t, data, props)
+ defer got.Release()
+ assertTableColumnsEqual(t, baseline, got)
+ })
+ }
+}
+
+// TestPreAllocBinaryData_SingleRow verifies that a single-row file (where
+// numRows == 1 and the guard numRows > 0 is exercised) reads correctly.
+func TestPreAllocBinaryData_SingleRow(t *testing.T) {
+ mem := memory.DefaultAllocator
+ bldr := array.NewBinaryBuilder(mem, arrow.BinaryTypes.Binary)
+ defer bldr.Release()
+ bldr.Append([]byte("hello world"))
+ arr := bldr.NewArray()
+ defer arr.Release()
+ sc := arrow.NewSchema([]arrow.Field{{Name: "x", Type:
arrow.BinaryTypes.Binary, Nullable: false}}, nil)
+ col := arrow.NewColumnFromArr(sc.Field(0), arr)
+ defer col.Release()
+ tbl := array.NewTable(sc, []arrow.Column{col}, 1)
+ defer tbl.Release()
+
+ writerProps :=
parquet.NewWriterProperties(parquet.WithDictionaryDefault(false))
+ data := writeParquetTable(t, tbl, 1, writerProps)
+
+ got := readParquetTable(t, data,
pqarrow.ArrowReadProperties{PreAllocBinaryData: true})
+ defer got.Release()
+ require.Equal(t, int64(1), got.NumRows())
+
+ gotArr, err := array.Concatenate(got.Column(0).Data().Chunks(), mem)
+ require.NoError(t, err)
+ defer gotArr.Release()
+ require.Equal(t, []byte("hello world"), gotArr.(*array.Binary).Value(0))
+}
diff --git a/parquet/pqarrow/column_readers.go
b/parquet/pqarrow/column_readers.go
index 1be8a48e..522e29c5 100644
--- a/parquet/pqarrow/column_readers.go
+++ b/parquet/pqarrow/column_readers.go
@@ -49,6 +49,10 @@ type leafReader struct {
recordRdr file.RecordReader
props ArrowReadProperties
+ // current row group size metadata, used to proportion binary data
pre-allocation
+ curRGUncompressedBytes int64
+ curRGNumRows int64
+
refCount atomic.Int64
}
@@ -63,7 +67,7 @@ func newLeafReader(rctx *readerCtx, field *arrow.Field, input
*columnIterator, l
}
ret.refCount.Add(1)
- err := ret.nextRowGroup()
+ err := ret.nextRowGroup(0)
return &ColumnReader{ret}, err
}
@@ -94,6 +98,9 @@ func (lr *leafReader) IsOrHasRepeatedChild() bool { return
false }
func (lr *leafReader) LoadBatch(nrecords int64) (err error) {
lr.releaseOut()
lr.recordRdr.Reset()
+ // The binary builder was reset by GetBuilderChunks() at the end of the
+ // previous LoadBatch. Pre-allocate its data buffer now, while it's
fresh.
+ lr.reserveBinaryData(nrecords)
if err := lr.recordRdr.Reserve(nrecords); err != nil {
return err
@@ -108,7 +115,7 @@ func (lr *leafReader) LoadBatch(nrecords int64) (err error)
{
}
nrecords -= numRead
if numRead == 0 {
- if err = lr.nextRowGroup(); err != nil {
+ if err = lr.nextRowGroup(nrecords); err != nil {
return err
}
}
@@ -121,6 +128,25 @@ func (lr *leafReader) BuildArray(int64) (*arrow.Chunked,
error) {
return lr.clearOut(), nil
}
+// reserveBinaryData pre-allocates the underlying BinaryBuilder's data buffer
+// proportionally: (rowsToRead / curRGNumRows) * curRGUncompressedBytes.
+// It is a no-op for non-binary columns, when size metadata is unavailable,
+// or when PreAllocBinaryData is not enabled in the read properties.
+func (lr *leafReader) reserveBinaryData(rowsToRead int64) {
+ if !lr.props.PreAllocBinaryData {
+ return
+ }
+ brdr, ok := lr.recordRdr.(file.BinaryRecordReader)
+ if !ok || lr.curRGNumRows <= 0 || lr.curRGUncompressedBytes <= 0 {
+ return
+ }
+ effective := rowsToRead
+ if effective <= 0 || effective > lr.curRGNumRows {
+ effective = lr.curRGNumRows
+ }
+ brdr.ReserveData(lr.curRGUncompressedBytes * effective /
lr.curRGNumRows)
+}
+
// releaseOut will clear lr.out as well as release it if it wasn't nil
func (lr *leafReader) releaseOut() {
if out := lr.clearOut(); out != nil {
@@ -146,12 +172,21 @@ func (lr *leafReader) SeekToRow(rowIdx int64) error {
return lr.recordRdr.SeekToRow(offset)
}
-func (lr *leafReader) nextRowGroup() error {
- pr, err := lr.input.NextChunk()
+// nextRowGroup advances to the next row group. remainingRows is the number of
+// records still to be read in the current batch; pass 0 during initialization
+// (no batch is in progress yet, so no pre-allocation is needed).
+func (lr *leafReader) nextRowGroup(remainingRows int64) error {
+ pr, uncompressedBytes, numRows, err := lr.input.NextChunk()
if err != nil {
return err
}
lr.recordRdr.SetPageReader(pr)
+ lr.curRGUncompressedBytes = uncompressedBytes
+ lr.curRGNumRows = numRows
+ // When called mid-batch, extend the builder's data buffer for the new
row group.
+ if remainingRows > 0 {
+ lr.reserveBinaryData(remainingRows)
+ }
return nil
}
diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go
index 6ca3c2fd..6fba9b53 100644
--- a/parquet/pqarrow/file_reader.go
+++ b/parquet/pqarrow/file_reader.go
@@ -717,14 +717,22 @@ func (c *columnIterator) FindChunkForRow(rowIdx int64)
(file.PageReader, int64,
arrow.ErrInvalid, rowIdx, idx)
}
-func (c *columnIterator) NextChunk() (file.PageReader, error) {
+func (c *columnIterator) NextChunk() (file.PageReader, int64, int64, error) {
if len(c.rowGroups) == 0 || c.rgIdx >= len(c.rowGroups) {
- return nil, nil
+ return nil, 0, 0, nil
}
rgr := c.rdr.RowGroup(c.rowGroups[c.rgIdx])
+ numRows := rgr.NumRows()
+
+ var uncompressedBytes int64
+ if colMeta, err := rgr.MetaData().ColumnChunk(c.index); err == nil {
+ uncompressedBytes = colMeta.TotalUncompressedSize()
+ }
+
c.rgIdx++
- return rgr.GetColumnPageReader(c.index)
+ pr, err := rgr.GetColumnPageReader(c.index)
+ return pr, uncompressedBytes, numRows, err
}
func (c *columnIterator) Descr() *schema.Column { return
c.schema.Column(c.index) }
diff --git a/parquet/pqarrow/properties.go b/parquet/pqarrow/properties.go
index d349a398..0ead4cb9 100644
--- a/parquet/pqarrow/properties.go
+++ b/parquet/pqarrow/properties.go
@@ -165,6 +165,11 @@ type ArrowReadProperties struct {
Parallel bool
// BatchSize is the size used for calls to NextBatch when reading whole
columns
BatchSize int64
+ // PreAllocBinaryData, if true, pre-allocates the BinaryBuilder data
buffer at
+ // the start of each batch using TotalUncompressedSize and NumRows from
the
+ // column chunk metadata. This reduces O(log n) realloc cycles per row
group
+ // for large binary/string columns. Disabled by default.
+ PreAllocBinaryData bool
readDictIndices map[int]struct{}
forceLargeIndices map[int]struct{}
diff --git a/parquet/pqarrow/reader_writer_test.go
b/parquet/pqarrow/reader_writer_test.go
index 29867cd2..eed2b6c9 100644
--- a/parquet/pqarrow/reader_writer_test.go
+++ b/parquet/pqarrow/reader_writer_test.go
@@ -355,6 +355,144 @@ func BenchmarkWriteTableCompressed(b *testing.B) {
}
}
+// BenchmarkPreAllocBinaryData measures the read throughput of a two-column
+// parquet file — a slim string id column and a fat binary blob column — with
+// and without the PreAllocBinaryData optimisation enabled.
+//
+// # Workload
+//
+// The file uses 2 row groups of 484 rows each, with blob values of 5 KB–50 KB
+// (avg ~27 KB/row, ~26 MB total uncompressed). This is proportionally
identical
+// to the production workload (~105 MB/RG) while keeping benchmark setup fast.
+//
+// id column: 10–50 B ASCII strings, no nulls
+// blob column: 5 KB–50 KB per value, 5% nulls, Zstd compression
+//
+// # Sub-benchmarks
+//
+// prealloc=false/batchAll baseline: no pre-allocation, entire file
in one pass
+// prealloc=false/batchPerRG baseline: no pre-allocation, one batch
per row group
+// prealloc=false/batchQuarterRG baseline: no pre-allocation, four
batches per row group
+// prealloc=true/batchAll optimised: full file in one pass
+// prealloc=true/batchPerRG optimised: one batch per row group
+// prealloc=true/batchQuarterRG optimised: four batches per row group
+//
+// Run with:
+//
+// go test ./parquet/pqarrow/ -run='^$' -bench=BenchmarkPreAllocBinaryData
-benchmem -count=3
+//
+// Compare B/op and allocs/op between prealloc=false and prealloc=true to
+// quantify the reduction in intermediate BinaryBuilder reallocations.
+func BenchmarkPreAllocBinaryData(b *testing.B) {
+ const (
+ numRGs = 2
+ rowsPerRG = 484
+ totalRows = numRGs * rowsPerRG
+ )
+
+ mem := memory.DefaultAllocator
+ writerProps := parquet.NewWriterProperties(
+ parquet.WithDictionaryDefault(false),
+ parquet.WithCompression(compress.Codecs.Zstd),
+ )
+
+ // --- build slim id column: 10–50 B ASCII strings, no nulls ---
+ idRng := rand.New(rand.NewPCG(11, 0))
+ idBldr := array.NewStringBuilder(mem)
+ for i := 0; i < totalRows; i++ {
+ length := 10 + int(idRng.IntN(41))
+ val := make([]byte, length)
+ for j := range val {
+ val[j] = byte('a' + idRng.IntN(26))
+ }
+ idBldr.Append(string(val))
+ }
+ idArr := idBldr.NewArray()
+ idBldr.Release()
+ b.Cleanup(func() { idArr.Release() })
+
+ // --- build fat blob column: 5 KB–50 KB per value, 5% nulls ---
+ // avg ~27 KB/row × 484 rows ≈ 13 MB uncompressed per row group.
+ // Proportionally identical to the production workload while keeping
+ // benchmark setup fast.
+ blobRng := rand.New(rand.NewPCG(22, 0))
+ blobBldr := array.NewBinaryBuilder(mem, arrow.BinaryTypes.Binary)
+ for i := 0; i < totalRows; i++ {
+ if blobRng.Float64() < 0.05 {
+ blobBldr.AppendNull()
+ continue
+ }
+ length := 5_000 + int(blobRng.IntN(45_001)) // 5 KB–50 KB
+ val := make([]byte, length)
+ for j := range val {
+ val[j] = byte(blobRng.IntN(256))
+ }
+ blobBldr.Append(val)
+ }
+ blobArr := blobBldr.NewArray()
+ blobBldr.Release()
+ b.Cleanup(func() { blobArr.Release() })
+
+ sc := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.BinaryTypes.String, Nullable: false},
+ {Name: "blob", Type: arrow.BinaryTypes.Binary, Nullable: true},
+ }, nil)
+
+ idChk := arrow.NewChunked(sc.Field(0).Type, []arrow.Array{idArr})
+ blobChk := arrow.NewChunked(sc.Field(1).Type, []arrow.Array{blobArr})
+ idCol := arrow.NewColumn(sc.Field(0), idChk)
+ blobCol := arrow.NewColumn(sc.Field(1), blobChk)
+ idChk.Release()
+ blobChk.Release()
+ tbl := array.NewTable(sc, []arrow.Column{*idCol, *blobCol},
int64(totalRows))
+ idCol.Release()
+ blobCol.Release()
+ b.Cleanup(func() { tbl.Release() })
+
+ // Write once; reuse parquetData across all sub-benchmarks and b.N
iterations.
+ var buf bytes.Buffer
+ require.NoError(b, pqarrow.WriteTable(tbl, &buf, int64(rowsPerRG),
writerProps, pqarrow.DefaultWriterProps()))
+ parquetData := buf.Bytes()
+
+ batchSizes := []struct {
+ name string
+ size int64
+ }{
+ {"batchAll", 0},
+ {"batchPerRG", rowsPerRG},
+ {"batchQuarterRG", rowsPerRG / 4},
+ }
+
+ for _, prealloc := range []bool{false, true} {
+ prealloc := prealloc
+ b.Run(fmt.Sprintf("prealloc=%v", prealloc), func(b *testing.B) {
+ for _, bs := range batchSizes {
+ bs := bs
+ b.Run(bs.name, func(b *testing.B) {
+ ctx := context.Background()
+ b.ResetTimer()
+ b.ReportAllocs()
+ b.SetBytes(int64(len(parquetData)))
+
+ for n := 0; n < b.N; n++ {
+ pf, err :=
file.NewParquetReader(bytes.NewReader(parquetData))
+ require.NoError(b, err)
+ props :=
pqarrow.ArrowReadProperties{
+ PreAllocBinaryData:
prealloc,
+ BatchSize:
bs.size,
+ }
+ reader, err :=
pqarrow.NewFileReader(pf, props, mem)
+ require.NoError(b, err)
+ out, err :=
reader.ReadTable(ctx)
+ require.NoError(b, err)
+ out.Release()
+ }
+ })
+ }
+ })
+ }
+}
+
func BenchmarkReadTableCompressed(b *testing.B) {
ctx := context.Background()
mem := memory.DefaultAllocator