This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 711fac8810 feat(parquet): add `push_decoder` benchmark for 
`PushBuffers` overhead (#9696)
711fac8810 is described below

commit 711fac88104fc27d89faaa221345bc95686cf1e9
Author: Hippolyte Barraud <[email protected]>
AuthorDate: Mon Apr 13 16:33:48 2026 -0400

    feat(parquet): add `push_decoder` benchmark for `PushBuffers` overhead 
(#9696)
    
    # Which issue does this PR close?
    
    - None but relates to #9695.
    
    # Rationale for this change
    
    This PR is meant to document and measure the quadratic behavior reported
    in the above issue.
    
    # What changes are included in this PR?
    
    # Are these changes tested?
    
    Add a benchmark that isolates `PushBuffers` overhead during row group
    construction, independent of page decoding. It calls `try_next_reader`
    to build row group readers without consuming any pages, so the measured
    cost is purely buffer lookup, stitching, and release.
    
    Two benchmark groups exercise different scaling axes:
    
    - `1buf`: pushes the entire file as a single buffer, varying column
    count (100 to 50k). This isolates the per-range cost of
    `has_range`/`get_bytes` lookups and `release_through`.
    
    - `Nbuf`: pushes one buffer per requested range, varying column count
    (100 to 10k). This isolates the cost when buffer count equals range
    count.
    
    Baseline results (Apple M1 Max):
    
    ```
      push_decoder/1buf/1000ranges       323.5 µs
      push_decoder/1buf/10000ranges       3.25 ms
      push_decoder/1buf/100000ranges      34.6 ms
      push_decoder/1buf/500000ranges     185.3 ms
      push_decoder/Nbuf/1000ranges       437.2 µs
      push_decoder/Nbuf/10000ranges       10.7 ms
      push_decoder/Nbuf/100000ranges     711.6 ms
    ```
    # Are there any user-facing changes?
    
    N/A
    
    Signed-off-by: Hippolyte Barraud <[email protected]>
---
 parquet/Cargo.toml              |   5 ++
 parquet/benches/push_decoder.rs | 170 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 175 insertions(+)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 91f21d8a3f..314960adf1 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -221,6 +221,11 @@ name = "arrow_writer"
 required-features = ["arrow"]
 harness = false
 
+[[bench]]
+name = "push_decoder"
+required-features = ["arrow"]
+harness = false
+
 [[bench]]
 name = "arrow_reader"
 required-features = ["arrow", "test_common", "experimental"]
diff --git a/parquet/benches/push_decoder.rs b/parquet/benches/push_decoder.rs
new file mode 100644
index 0000000000..190dd71767
--- /dev/null
+++ b/parquet/benches/push_decoder.rs
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks for the push-based decoder measuring PushBuffers overhead.
+//!
+//! Uses `try_next_reader` to build row group readers without decoding any
+//! pages, isolating PushBuffers operations (has_range, get_bytes, clearing).
+
+use std::hint::black_box;
+use std::sync::Arc;
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow_array::{Float32Array, RecordBatch};
+use bytes::Bytes;
+use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
+use parquet::DecodeResult;
+use parquet::arrow::ArrowWriter;
+use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+use parquet::file::metadata::ParquetMetaDataPushDecoder;
+use parquet::file::properties::WriterProperties;
+
+fn make_wide_schema(num_columns: usize) -> SchemaRef {
+    let fields: Vec<Field> = (0..num_columns)
+        .map(|i| Field::new(format!("c{i}"), DataType::Float32, false))
+        .collect();
+    Arc::new(Schema::new(fields))
+}
+
+/// Write a Parquet file with `num_columns` columns, 10 row groups of 100 rows.
+fn make_test_file(num_columns: usize) -> Bytes {
+    let num_rows = 1_000;
+    let rows_per_rg = 100;
+    let schema = make_wide_schema(num_columns);
+    let columns: Vec<Arc<dyn arrow_array::Array>> = (0..num_columns)
+        .map(|_| Arc::new(Float32Array::from(vec![0.0f32; num_rows])) as _)
+        .collect();
+    let batch = RecordBatch::try_new(schema.clone(), columns).unwrap();
+
+    let mut buf = Vec::new();
+    let props = WriterProperties::builder()
+        .set_max_row_group_row_count(Some(rows_per_rg))
+        .build();
+    let mut writer = ArrowWriter::try_new(&mut buf, schema, 
Some(props)).unwrap();
+    writer.write(&batch).unwrap();
+    writer.close().unwrap();
+    Bytes::from(buf)
+}
+
+fn decode_metadata(file_data: &Bytes) -> 
Arc<parquet::file::metadata::ParquetMetaData> {
+    let file_len = file_data.len() as u64;
+    let mut dec = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+    dec.push_range(0..file_len, file_data.clone()).unwrap();
+    match dec.try_decode().unwrap() {
+        DecodeResult::Data(m) => Arc::new(m),
+        other => panic!("expected metadata, got {other:?}"),
+    }
+}
+
+/// Push the entire file as one buffer, then build all row group readers.
+fn build_readers_single_buffer(
+    file_data: &Bytes,
+    metadata: &Arc<parquet::file::metadata::ParquetMetaData>,
+) {
+    let mut decoder = 
ParquetPushDecoderBuilder::try_new_decoder(metadata.clone())
+        .unwrap()
+        .build()
+        .unwrap();
+
+    decoder
+        .push_range(0..file_data.len() as u64, file_data.clone())
+        .unwrap();
+
+    loop {
+        match decoder.try_next_reader().unwrap() {
+            DecodeResult::Data(reader) => {
+                black_box(reader);
+            }
+            DecodeResult::Finished => break,
+            DecodeResult::NeedsData(r) => panic!("unexpected NeedsData: 
{r:?}"),
+        }
+    }
+}
+
+/// Push one buffer per requested range, then build all row group readers.
+fn build_readers_exact_ranges(
+    file_data: &Bytes,
+    metadata: &Arc<parquet::file::metadata::ParquetMetaData>,
+) {
+    let mut decoder = 
ParquetPushDecoderBuilder::try_new_decoder(metadata.clone())
+        .unwrap()
+        .build()
+        .unwrap();
+
+    loop {
+        match decoder.try_next_reader().unwrap() {
+            DecodeResult::Data(reader) => {
+                black_box(reader);
+            }
+            DecodeResult::Finished => break,
+            DecodeResult::NeedsData(ranges) => {
+                let buffers: Vec<Bytes> = ranges
+                    .iter()
+                    .map(|r| file_data.slice(r.start as usize..r.end as usize))
+                    .collect();
+                decoder.push_ranges(ranges, buffers).unwrap();
+            }
+        }
+    }
+}
+
+fn bench_1buf(c: &mut Criterion) {
+    let mut group = c.benchmark_group("push_decoder/1buf");
+
+    for num_cols in [100, 1_000, 10_000, 50_000] {
+        let file_data = make_test_file(num_cols);
+        let metadata = decode_metadata(&file_data);
+        let num_ranges: usize = metadata
+            .row_groups()
+            .iter()
+            .map(|rg| rg.columns().len())
+            .sum();
+
+        group.bench_with_input(
+            BenchmarkId::from_parameter(format!("{num_ranges}ranges")),
+            &(&file_data, &metadata),
+            |b, &(data, meta)| b.iter(|| build_readers_single_buffer(data, 
meta)),
+        );
+    }
+
+    group.finish();
+}
+
+fn bench_nbuf(c: &mut Criterion) {
+    let mut group = c.benchmark_group("push_decoder/Nbuf");
+
+    for num_cols in [100, 1_000, 10_000] {
+        let file_data = make_test_file(num_cols);
+        let metadata = decode_metadata(&file_data);
+        let num_ranges: usize = metadata
+            .row_groups()
+            .iter()
+            .map(|rg| rg.columns().len())
+            .sum();
+
+        group.bench_with_input(
+            BenchmarkId::from_parameter(format!("{num_ranges}ranges")),
+            &(&file_data, &metadata),
+            |b, &(data, meta)| b.iter(|| build_readers_exact_ranges(data, 
meta)),
+        );
+    }
+
+    group.finish();
+}
+
+criterion_group!(benches, bench_1buf, bench_nbuf);
+criterion_main!(benches);

Reply via email to