This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 711fac8810 feat(parquet): add `push_decoder` benchmark for
`PushBuffers` overhead (#9696)
711fac8810 is described below
commit 711fac88104fc27d89faaa221345bc95686cf1e9
Author: Hippolyte Barraud <[email protected]>
AuthorDate: Mon Apr 13 16:33:48 2026 -0400
feat(parquet): add `push_decoder` benchmark for `PushBuffers` overhead
(#9696)
# Which issue does this PR close?
- None but relates to #9695.
# Rationale for this change
This PR is meant to document and measure the quadratic behavior reported
in the above issue.
# What changes are included in this PR?
# Are these changes tested?
Add a benchmark that isolates `PushBuffers` overhead during row group
construction, independent of page decoding. It calls `try_next_reader`
to build row group readers without consuming any pages, so the measured
cost is purely buffer lookup, stitching, and release.
Two benchmark groups exercise different scaling axes:
- `1buf`: pushes the entire file as a single buffer, varying column
count (100 to 50k). This isolates the per-range cost of
`has_range`/`get_bytes` lookups and `release_through`.
- `Nbuf`: pushes one buffer per requested range, varying column count
(100 to 10k). This isolates the cost when buffer count equals range
count.
Baseline results (Apple M1 Max):
```
push_decoder/1buf/1000ranges 323.5 µs
push_decoder/1buf/10000ranges 3.25 ms
push_decoder/1buf/100000ranges 34.6 ms
push_decoder/1buf/500000ranges 185.3 ms
push_decoder/Nbuf/1000ranges 437.2 µs
push_decoder/Nbuf/10000ranges 10.7 ms
push_decoder/Nbuf/100000ranges 711.6 ms
```
# Are there any user-facing changes?
N/A
Signed-off-by: Hippolyte Barraud <[email protected]>
---
parquet/Cargo.toml | 5 ++
parquet/benches/push_decoder.rs | 170 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 175 insertions(+)
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 91f21d8a3f..314960adf1 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -221,6 +221,11 @@ name = "arrow_writer"
required-features = ["arrow"]
harness = false
+[[bench]]
+name = "push_decoder"
+required-features = ["arrow"]
+harness = false
+
[[bench]]
name = "arrow_reader"
required-features = ["arrow", "test_common", "experimental"]
diff --git a/parquet/benches/push_decoder.rs b/parquet/benches/push_decoder.rs
new file mode 100644
index 0000000000..190dd71767
--- /dev/null
+++ b/parquet/benches/push_decoder.rs
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks for the push-based decoder measuring PushBuffers overhead.
+//!
+//! Uses `try_next_reader` to build row group readers without decoding any
+//! pages, isolating PushBuffers operations (has_range, get_bytes, clearing).
+
+use std::hint::black_box;
+use std::sync::Arc;
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow_array::{Float32Array, RecordBatch};
+use bytes::Bytes;
+use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
+use parquet::DecodeResult;
+use parquet::arrow::ArrowWriter;
+use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+use parquet::file::metadata::ParquetMetaDataPushDecoder;
+use parquet::file::properties::WriterProperties;
+
+fn make_wide_schema(num_columns: usize) -> SchemaRef {
+ let fields: Vec<Field> = (0..num_columns)
+ .map(|i| Field::new(format!("c{i}"), DataType::Float32, false))
+ .collect();
+ Arc::new(Schema::new(fields))
+}
+
+/// Write a Parquet file with `num_columns` columns, 10 row groups of 100 rows.
+fn make_test_file(num_columns: usize) -> Bytes {
+ let num_rows = 1_000;
+ let rows_per_rg = 100;
+ let schema = make_wide_schema(num_columns);
+ let columns: Vec<Arc<dyn arrow_array::Array>> = (0..num_columns)
+ .map(|_| Arc::new(Float32Array::from(vec![0.0f32; num_rows])) as _)
+ .collect();
+ let batch = RecordBatch::try_new(schema.clone(), columns).unwrap();
+
+ let mut buf = Vec::new();
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(Some(rows_per_rg))
+ .build();
+ let mut writer = ArrowWriter::try_new(&mut buf, schema,
Some(props)).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+ Bytes::from(buf)
+}
+
+fn decode_metadata(file_data: &Bytes) ->
Arc<parquet::file::metadata::ParquetMetaData> {
+ let file_len = file_data.len() as u64;
+ let mut dec = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+ dec.push_range(0..file_len, file_data.clone()).unwrap();
+ match dec.try_decode().unwrap() {
+ DecodeResult::Data(m) => Arc::new(m),
+ other => panic!("expected metadata, got {other:?}"),
+ }
+}
+
+/// Push the entire file as one buffer, then build all row group readers.
+fn build_readers_single_buffer(
+ file_data: &Bytes,
+ metadata: &Arc<parquet::file::metadata::ParquetMetaData>,
+) {
+ let mut decoder =
ParquetPushDecoderBuilder::try_new_decoder(metadata.clone())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ decoder
+ .push_range(0..file_data.len() as u64, file_data.clone())
+ .unwrap();
+
+ loop {
+ match decoder.try_next_reader().unwrap() {
+ DecodeResult::Data(reader) => {
+ black_box(reader);
+ }
+ DecodeResult::Finished => break,
+ DecodeResult::NeedsData(r) => panic!("unexpected NeedsData:
{r:?}"),
+ }
+ }
+}
+
+/// Push one buffer per requested range, then build all row group readers.
+fn build_readers_exact_ranges(
+ file_data: &Bytes,
+ metadata: &Arc<parquet::file::metadata::ParquetMetaData>,
+) {
+ let mut decoder =
ParquetPushDecoderBuilder::try_new_decoder(metadata.clone())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ loop {
+ match decoder.try_next_reader().unwrap() {
+ DecodeResult::Data(reader) => {
+ black_box(reader);
+ }
+ DecodeResult::Finished => break,
+ DecodeResult::NeedsData(ranges) => {
+ let buffers: Vec<Bytes> = ranges
+ .iter()
+ .map(|r| file_data.slice(r.start as usize..r.end as usize))
+ .collect();
+ decoder.push_ranges(ranges, buffers).unwrap();
+ }
+ }
+ }
+}
+
+fn bench_1buf(c: &mut Criterion) {
+ let mut group = c.benchmark_group("push_decoder/1buf");
+
+ for num_cols in [100, 1_000, 10_000, 50_000] {
+ let file_data = make_test_file(num_cols);
+ let metadata = decode_metadata(&file_data);
+ let num_ranges: usize = metadata
+ .row_groups()
+ .iter()
+ .map(|rg| rg.columns().len())
+ .sum();
+
+ group.bench_with_input(
+ BenchmarkId::from_parameter(format!("{num_ranges}ranges")),
+ &(&file_data, &metadata),
+ |b, &(data, meta)| b.iter(|| build_readers_single_buffer(data,
meta)),
+ );
+ }
+
+ group.finish();
+}
+
+fn bench_nbuf(c: &mut Criterion) {
+ let mut group = c.benchmark_group("push_decoder/Nbuf");
+
+ for num_cols in [100, 1_000, 10_000] {
+ let file_data = make_test_file(num_cols);
+ let metadata = decode_metadata(&file_data);
+ let num_ranges: usize = metadata
+ .row_groups()
+ .iter()
+ .map(|rg| rg.columns().len())
+ .sum();
+
+ group.bench_with_input(
+ BenchmarkId::from_parameter(format!("{num_ranges}ranges")),
+ &(&file_data, &metadata),
+ |b, &(data, meta)| b.iter(|| build_readers_exact_ranges(data,
meta)),
+ );
+ }
+
+ group.finish();
+}
+
+criterion_group!(benches, bench_1buf, bench_nbuf);
+criterion_main!(benches);