This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1efcbf57dd Add benchmark for struct field filter pushdown in Parquet 
(#20829)
1efcbf57dd is described below

commit 1efcbf57dd1897a5e01b2d5d09e8ec38a130ef57
Author: Matthew Kim <[email protected]>
AuthorDate: Wed Mar 11 12:13:00 2026 -0400

    Add benchmark for struct field filter pushdown in Parquet (#20829)
    
    ## Which issue does this PR close?
    
    - Closes https://github.com/apache/datafusion/issues/20828
    
    ## Rationale for this change
    
    This PR adds a series of benchmarks that compare Parquet row-level
    filter pushdown for struct field predicates. This establishes a baseline
    so we can measure the impact of
    https://github.com/apache/datafusion/issues/20828
    
    To run, use:
    ```sh
    cargo bench -p datafusion-datasource-parquet --bench 
parquet_struct_filter_pushdown
    ```
---
 datafusion/datasource-parquet/Cargo.toml           |   5 +
 .../benches/parquet_struct_filter_pushdown.rs      | 353 +++++++++++++++++++++
 2 files changed, 358 insertions(+)

diff --git a/datafusion/datasource-parquet/Cargo.toml 
b/datafusion/datasource-parquet/Cargo.toml
index 4889059b16..a5855af17a 100644
--- a/datafusion/datasource-parquet/Cargo.toml
+++ b/datafusion/datasource-parquet/Cargo.toml
@@ -58,6 +58,7 @@ tokio = { workspace = true }
 [dev-dependencies]
 chrono = { workspace = true }
 criterion = { workspace = true }
+datafusion-functions = { workspace = true }
 datafusion-functions-nested = { workspace = true }
 tempfile = { workspace = true }
 
@@ -81,3 +82,7 @@ parquet_encryption = [
 [[bench]]
 name = "parquet_nested_filter_pushdown"
 harness = false
+
+[[bench]]
+name = "parquet_struct_filter_pushdown"
+harness = false
diff --git 
a/datafusion/datasource-parquet/benches/parquet_struct_filter_pushdown.rs 
b/datafusion/datasource-parquet/benches/parquet_struct_filter_pushdown.rs
new file mode 100644
index 0000000000..b52408d422
--- /dev/null
+++ b/datafusion/datasource-parquet/benches/parquet_struct_filter_pushdown.rs
@@ -0,0 +1,353 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks for struct field filter pushdown in Parquet.
+//!
+//! Compares scanning with vs without row-level filter pushdown for
+//! predicates on struct sub-fields (e.g. `get_field(s, 'id') = 42`).
+//!
+//! The dataset schema (in SQL-like notation):
+//!
+//! ```sql
+//! CREATE TABLE t (
+//!     id       INT,          -- top-level id, useful for correctness checks
+//!     large_string TEXT,     -- wide column so SELECT * is expensive
+//!     s STRUCT<
+//!         id: INT,           -- mirrors top-level id
+//!         large_string: TEXT -- wide sub-field; pushdown with proper 
projection
+//!                            -- should avoid reading this when filtering on 
s.id
+//!     >
+//! );
+//! ```
+//!
+//! Benchmark queries:
+//!
+//! 1. `SELECT * FROM t WHERE get_field(s, 'id') = 42`
+//!     - no pushdown vs. row-level filter pushdown
+//! 2. `SELECT * FROM t WHERE get_field(s, 'id') = id`
+//!     - cross-column predicate; no pushdown vs. row-level filter pushdown
+//! 3. `SELECT id FROM t WHERE get_field(s, 'id') = 42`
+//!     - narrow projection; pushdown should avoid reading s.large_string
+
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, LazyLock};
+
+use arrow::array::{BooleanArray, Int32Array, RecordBatch, StringBuilder, 
StructArray};
+use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
+use criterion::{Criterion, Throughput, criterion_group, criterion_main};
+use datafusion_common::ScalarValue;
+use datafusion_datasource_parquet::{ParquetFileMetrics, build_row_filter};
+use datafusion_expr::{Expr, col};
+use datafusion_physical_expr::planner::logical2physical;
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+use parquet::arrow::{ArrowWriter, ProjectionMask};
+use parquet::file::properties::WriterProperties;
+use tempfile::TempDir;
+
+const ROW_GROUP_ROW_COUNT: usize = 10_000;
+const TOTAL_ROW_GROUPS: usize = 10;
+const TOTAL_ROWS: usize = ROW_GROUP_ROW_COUNT * TOTAL_ROW_GROUPS;
+/// Only one row group will contain the target value.
+const TARGET_VALUE: i32 = 42;
+const ID_COLUMN_NAME: &str = "id";
+const LARGE_STRING_COLUMN_NAME: &str = "large_string";
+const STRUCT_COLUMN_NAME: &str = "s";
+// Large string payload to emphasize decoding overhead when pushdown is 
disabled.
+const LARGE_STRING_LEN: usize = 8 * 1024;
+
+struct BenchmarkDataset {
+    _tempdir: TempDir,
+    file_path: PathBuf,
+}
+
+impl BenchmarkDataset {
+    fn path(&self) -> &Path {
+        &self.file_path
+    }
+}
+
+static DATASET: LazyLock<BenchmarkDataset> = LazyLock::new(|| {
+    create_dataset().expect("failed to prepare parquet benchmark dataset")
+});
+
+fn parquet_struct_filter_pushdown(c: &mut Criterion) {
+    let dataset_path = DATASET.path().to_owned();
+    let mut group = c.benchmark_group("parquet_struct_filter_pushdown");
+    group.throughput(Throughput::Elements(TOTAL_ROWS as u64));
+
+    // Scenario 1: SELECT * FROM t WHERE get_field(s, 'id') = 42
+    group.bench_function("select_star/no_pushdown", |b| {
+        let file_schema = setup_reader(&dataset_path);
+        let predicate = logical2physical(&struct_id_eq_literal(), 
&file_schema);
+        b.iter(|| {
+            let matched = scan(&dataset_path, &predicate, false, 
ProjectionMask::all())
+                .expect("scan succeeded");
+            assert_eq!(matched, ROW_GROUP_ROW_COUNT);
+        });
+    });
+
+    group.bench_function("select_star/with_pushdown", |b| {
+        let file_schema = setup_reader(&dataset_path);
+        let predicate = logical2physical(&struct_id_eq_literal(), 
&file_schema);
+        b.iter(|| {
+            let matched = scan(&dataset_path, &predicate, true, 
ProjectionMask::all())
+                .expect("scan succeeded");
+            assert_eq!(matched, ROW_GROUP_ROW_COUNT);
+        });
+    });
+
+    // Scenario 2: SELECT * FROM t WHERE get_field(s, 'id') = id
+    group.bench_function("select_star_cross_col/no_pushdown", |b| {
+        let file_schema = setup_reader(&dataset_path);
+        let predicate = logical2physical(&struct_id_eq_top_id(), &file_schema);
+        b.iter(|| {
+            let matched = scan(&dataset_path, &predicate, false, 
ProjectionMask::all())
+                .expect("scan succeeded");
+            assert_eq!(matched, TOTAL_ROWS);
+        });
+    });
+
+    group.bench_function("select_star_cross_col/with_pushdown", |b| {
+        let file_schema = setup_reader(&dataset_path);
+        let predicate = logical2physical(&struct_id_eq_top_id(), &file_schema);
+        b.iter(|| {
+            let matched = scan(&dataset_path, &predicate, true, 
ProjectionMask::all())
+                .expect("scan succeeded");
+            assert_eq!(matched, TOTAL_ROWS);
+        });
+    });
+
+    // Scenario 3: SELECT id FROM t WHERE get_field(s, 'id') = 42
+    group.bench_function("select_id/no_pushdown", |b| {
+        let file_schema = setup_reader(&dataset_path);
+        let predicate = logical2physical(&struct_id_eq_literal(), 
&file_schema);
+        b.iter(|| {
+            // Without pushdown we must read all columns to evaluate the 
predicate.
+            let matched = scan(&dataset_path, &predicate, false, 
ProjectionMask::all())
+                .expect("scan succeeded");
+            assert_eq!(matched, ROW_GROUP_ROW_COUNT);
+        });
+    });
+
+    group.bench_function("select_id/with_pushdown", |b| {
+        let file_schema = setup_reader(&dataset_path);
+        let predicate = logical2physical(&struct_id_eq_literal(), 
&file_schema);
+        let id_only = id_projection(&dataset_path);
+        b.iter(|| {
+            // With pushdown the filter runs first, then we only project `id`.
+            let matched = scan(&dataset_path, &predicate, true, 
id_only.clone())
+                .expect("scan succeeded");
+            assert_eq!(matched, ROW_GROUP_ROW_COUNT);
+        });
+    });
+
+    group.finish();
+}
+
+fn setup_reader(path: &Path) -> SchemaRef {
+    let file = std::fs::File::open(path).expect("failed to open file");
+    let builder =
+        ParquetRecordBatchReaderBuilder::try_new(file).expect("failed to build 
reader");
+    Arc::clone(builder.schema())
+}
+
+/// `get_field(s, 'id') = TARGET_VALUE`
+fn struct_id_eq_literal() -> Expr {
+    let get_field_expr = datafusion_functions::core::get_field().call(vec![
+        col(STRUCT_COLUMN_NAME),
+        Expr::Literal(ScalarValue::Utf8(Some("id".to_string())), None),
+    ]);
+    get_field_expr.eq(Expr::Literal(ScalarValue::Int32(Some(TARGET_VALUE)), 
None))
+}
+
+/// `get_field(s, 'id') = id`
+fn struct_id_eq_top_id() -> Expr {
+    let get_field_expr = datafusion_functions::core::get_field().call(vec![
+        col(STRUCT_COLUMN_NAME),
+        Expr::Literal(ScalarValue::Utf8(Some("id".to_string())), None),
+    ]);
+    get_field_expr.eq(col(ID_COLUMN_NAME))
+}
+
+/// Build a [`ProjectionMask`] that only reads the top-level `id` leaf column.
+fn id_projection(path: &Path) -> ProjectionMask {
+    let file = std::fs::File::open(path).expect("failed to open file");
+    let builder =
+        ParquetRecordBatchReaderBuilder::try_new(file).expect("failed to build 
reader");
+    let parquet_schema = builder.metadata().file_metadata().schema_descr_ptr();
+    // Leaf index 0 corresponds to the top-level `id` column.
+    ProjectionMask::leaves(&parquet_schema, [0])
+}
+
+fn scan(
+    path: &Path,
+    predicate: &Arc<dyn datafusion_physical_expr::PhysicalExpr>,
+    pushdown: bool,
+    projection: ProjectionMask,
+) -> datafusion_common::Result<usize> {
+    let file = std::fs::File::open(path)?;
+    let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
+    let metadata = builder.metadata().clone();
+    let file_schema = builder.schema();
+
+    let metrics = ExecutionPlanMetricsSet::new();
+    let file_metrics = ParquetFileMetrics::new(0, &path.display().to_string(), 
&metrics);
+
+    let mut filter_applied = false;
+    let builder = if pushdown {
+        if let Some(row_filter) =
+            build_row_filter(predicate, file_schema, &metadata, false, 
&file_metrics)?
+        {
+            filter_applied = true;
+            builder.with_row_filter(row_filter)
+        } else {
+            builder
+        }
+    } else {
+        builder
+    };
+
+    // Only apply a narrow projection when the filter was actually pushed down.
+    // Otherwise we need all columns to evaluate the predicate manually.
+    let output_projection = if filter_applied {
+        projection
+    } else {
+        ProjectionMask::all()
+    };
+    let reader = builder.with_projection(output_projection).build()?;
+
+    let mut matched_rows = 0usize;
+    for batch in reader {
+        let batch = batch?;
+        if filter_applied {
+            // When the row filter was applied, rows are already filtered.
+            matched_rows += batch.num_rows();
+        } else {
+            matched_rows += count_matches(predicate, &batch)?;
+        }
+    }
+
+    Ok(matched_rows)
+}
+
+fn count_matches(
+    expr: &Arc<dyn datafusion_physical_expr::PhysicalExpr>,
+    batch: &RecordBatch,
+) -> datafusion_common::Result<usize> {
+    let values = expr.evaluate(batch)?.into_array(batch.num_rows())?;
+    let bools = values
+        .as_any()
+        .downcast_ref::<BooleanArray>()
+        .expect("boolean filter result");
+
+    Ok(bools.iter().filter(|v| matches!(v, Some(true))).count())
+}
+
+fn schema() -> SchemaRef {
+    let struct_fields = Fields::from(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new(LARGE_STRING_COLUMN_NAME, DataType::Utf8, false),
+    ]);
+    Arc::new(Schema::new(vec![
+        Field::new(ID_COLUMN_NAME, DataType::Int32, false),
+        Field::new(LARGE_STRING_COLUMN_NAME, DataType::Utf8, false),
+        Field::new(STRUCT_COLUMN_NAME, DataType::Struct(struct_fields), false),
+    ]))
+}
+
+fn create_dataset() -> datafusion_common::Result<BenchmarkDataset> {
+    let tempdir = TempDir::new()?;
+    let file_path = tempdir.path().join("struct_filter.parquet");
+
+    let schema = schema();
+    let writer_props = WriterProperties::builder()
+        .set_max_row_group_row_count(Some(ROW_GROUP_ROW_COUNT))
+        .build();
+
+    let mut writer = ArrowWriter::try_new(
+        std::fs::File::create(&file_path)?,
+        Arc::clone(&schema),
+        Some(writer_props),
+    )?;
+
+    // Each row group has a distinct `s.id` value. Only one row group
+    // matches the target, so pushdown should prune 90% of rows.
+    for rg_idx in 0..TOTAL_ROW_GROUPS {
+        let id_value = if rg_idx == TOTAL_ROW_GROUPS - 1 {
+            TARGET_VALUE
+        } else {
+            (rg_idx as i32 + 1) * 1000
+        };
+        let batch = build_struct_batch(&schema, id_value, 
ROW_GROUP_ROW_COUNT)?;
+        writer.write(&batch)?;
+    }
+
+    writer.close()?;
+
+    let reader =
+        
ParquetRecordBatchReaderBuilder::try_new(std::fs::File::open(&file_path)?)?;
+    assert_eq!(reader.metadata().row_groups().len(), TOTAL_ROW_GROUPS);
+
+    Ok(BenchmarkDataset {
+        _tempdir: tempdir,
+        file_path,
+    })
+}
+
+fn build_struct_batch(
+    schema: &SchemaRef,
+    id_value: i32,
+    len: usize,
+) -> datafusion_common::Result<RecordBatch> {
+    let large_string: String = "x".repeat(LARGE_STRING_LEN);
+
+    // Top-level columns
+    let top_id_array = Arc::new(Int32Array::from(vec![id_value; len]));
+    let mut top_string_builder = StringBuilder::new();
+    for _ in 0..len {
+        top_string_builder.append_value(&large_string);
+    }
+    let top_string_array = Arc::new(top_string_builder.finish());
+
+    // Struct sub-fields: s.id mirrors top-level id, s.large_string is the 
same payload
+    let struct_id_array = Arc::new(Int32Array::from(vec![id_value; len]));
+    let mut struct_string_builder = StringBuilder::new();
+    for _ in 0..len {
+        struct_string_builder.append_value(&large_string);
+    }
+    let struct_string_array = Arc::new(struct_string_builder.finish());
+
+    let struct_array = StructArray::from(vec![
+        (
+            Arc::new(Field::new("id", DataType::Int32, false)),
+            struct_id_array as Arc<dyn arrow::array::Array>,
+        ),
+        (
+            Arc::new(Field::new(LARGE_STRING_COLUMN_NAME, DataType::Utf8, 
false)),
+            struct_string_array as Arc<dyn arrow::array::Array>,
+        ),
+    ]);
+
+    Ok(RecordBatch::try_new(
+        Arc::clone(schema),
+        vec![top_id_array, top_string_array, Arc::new(struct_array)],
+    )?)
+}
+
+criterion_group!(benches, parquet_struct_filter_pushdown);
+criterion_main!(benches);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to