This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 48199b9da3 Use `ParquetPushDecoder` in `ParquetOpener` (#20839)
48199b9da3 is described below

commit 48199b9da3341100eb754cd569c008936b3f32a7
Author: Daniël Heres <[email protected]>
AuthorDate: Wed Mar 11 07:15:06 2026 +0100

    Use `ParquetPushDecoder` in `ParquetOpener` (#20839)
    
    ## Which issue does this PR close?
    
    - Closes #20841
    
    ## Rationale for this change
    
    We want to split IO and CPU to allow for more (NUMA-aware) parallelism
    and utilizing IO and CPU better.
    This allows for e.g. more coalescing, prefetching, parallel IO, more
    parallel / incremental decoding etc.
    Also this allows doing morsels only on a CPU level and not doing IO
    multiple times for each morsel.
    
    ## What changes are included in this PR?
    
    Just refactor `ParquetOpener` to use `ParquetPushDecoder`. I used claude
    to rewrite it and to keep changes small.
    
    ## Are these changes tested?
    
    Existing tests. Nothing should change, the arrow-rs code also uses
    `ParquetPushDecoder`.
    
    ## Are there any user-facing changes?
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 datafusion/datasource-parquet/src/opener.rs | 302 +++++++++++++++++-----------
 1 file changed, 183 insertions(+), 119 deletions(-)

diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index 108e8c5752..0d8e825a89 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -24,11 +24,12 @@ use crate::{
     apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
 };
 use arrow::array::{RecordBatch, RecordBatchOptions};
-use arrow::datatypes::DataType;
+use arrow::datatypes::{DataType, Schema};
 use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
-use datafusion_physical_expr::projection::ProjectionExprs;
+use datafusion_physical_expr::projection::{ProjectionExprs, Projector};
 use datafusion_physical_expr::utils::reassign_expr_columns;
 use datafusion_physical_expr_adapter::replace_columns_with_literals;
+use parquet::errors::ParquetError;
 use std::collections::HashMap;
 use std::pin::Pin;
 use std::sync::Arc;
@@ -56,13 +57,15 @@ use crate::sort::reverse_row_selection;
 use datafusion_common::config::EncryptionFactoryOptions;
 #[cfg(feature = "parquet_encryption")]
 use datafusion_execution::parquet_encryption::EncryptionFactory;
-use futures::{Stream, StreamExt, TryStreamExt, ready};
+use futures::{Stream, StreamExt, ready};
 use log::debug;
+use parquet::DecodeResult;
 use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
 use parquet::arrow::arrow_reader::{
     ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
 };
 use parquet::arrow::async_reader::AsyncFileReader;
+use parquet::arrow::push_decoder::{ParquetPushDecoder, 
ParquetPushDecoderBuilder};
 use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
 use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, 
RowGroupMetaData};
 
@@ -167,17 +170,6 @@ impl PreparedAccessPlan {
 
         Ok(self)
     }
-
-    /// Apply this access plan to a ParquetRecordBatchStreamBuilder
-    fn apply_to_builder(
-        self,
-        mut builder: ParquetRecordBatchStreamBuilder<Box<dyn AsyncFileReader>>,
-    ) -> ParquetRecordBatchStreamBuilder<Box<dyn AsyncFileReader>> {
-        if let Some(row_selection) = self.row_selection {
-            builder = builder.with_row_selection(row_selection);
-        }
-        builder.with_row_groups(self.row_group_indexes)
-    }
 }
 
 impl FileOpener for ParquetOpener {
@@ -267,6 +259,9 @@ impl FileOpener for ParquetOpener {
         let enable_bloom_filter = self.enable_bloom_filter;
         let enable_row_group_stats_pruning = 
self.enable_row_group_stats_pruning;
         let limit = self.limit;
+        let parquet_file_reader_factory = 
Arc::clone(&self.parquet_file_reader_factory);
+        let partition_index = self.partition_index;
+        let metrics = self.metrics.clone();
 
         let predicate_creation_errors = MetricBuilder::new(&self.metrics)
             .global_counter("num_predicate_creation_errors");
@@ -444,57 +439,14 @@ impl FileOpener for ParquetOpener {
 
             metadata_timer.stop();
 
-            // ---------------------------------------------------------
-            // Step: construct builder for the final RecordBatch stream
-            // ---------------------------------------------------------
-
-            let mut builder = 
ParquetRecordBatchStreamBuilder::new_with_metadata(
-                async_file_reader,
-                reader_metadata,
-            );
-
-            // 
---------------------------------------------------------------------
-            // Step: optionally add row filter to the builder
-            //
-            // Row filter is used for late materialization in parquet 
decoding, see
-            // `row_filter` for details.
-            // 
---------------------------------------------------------------------
-
-            // Filter pushdown: evaluate predicates during scan
-            if let Some(predicate) = 
pushdown_filters.then_some(predicate).flatten() {
-                let row_filter = row_filter::build_row_filter(
-                    &predicate,
-                    &physical_file_schema,
-                    builder.metadata(),
-                    reorder_predicates,
-                    &file_metrics,
-                );
-
-                match row_filter {
-                    Ok(Some(filter)) => {
-                        builder = builder.with_row_filter(filter);
-                    }
-                    Ok(None) => {}
-                    Err(e) => {
-                        debug!(
-                            "Ignoring error building row filter for 
'{predicate:?}': {e}"
-                        );
-                    }
-                };
-            };
-            if force_filter_selections {
-                builder =
-                    
builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
-            }
-
             // ------------------------------------------------------------
             // Step: prune row groups by range, predicate and bloom filter
             // ------------------------------------------------------------
 
             // Determine which row groups to actually read. The idea is to skip
             // as many row groups as possible based on the metadata and query
-            let file_metadata = Arc::clone(builder.metadata());
-            let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
+            let file_metadata = Arc::clone(reader_metadata.metadata());
+            let pruning_pred = pruning_predicate.as_ref().map(|p| p.as_ref());
             let rg_metadata = file_metadata.row_groups();
             // track which row groups to actually read
             let access_plan =
@@ -506,13 +458,13 @@ impl FileOpener for ParquetOpener {
             }
 
             // If there is a predicate that can be evaluated against the 
metadata
-            if let Some(predicate) = predicate.as_ref() {
+            if let Some(pruning_pred) = pruning_pred.as_ref() {
                 if enable_row_group_stats_pruning {
                     row_groups.prune_by_statistics(
                         &physical_file_schema,
-                        builder.parquet_schema(),
+                        reader_metadata.parquet_schema(),
                         rg_metadata,
-                        predicate,
+                        pruning_pred,
                         &file_metrics,
                     );
                 } else {
@@ -524,11 +476,27 @@ impl FileOpener for ParquetOpener {
                 }
 
                 if enable_bloom_filter && !row_groups.is_empty() {
+                    // Use the existing reader for bloom filter I/O;
+                    // replace with a fresh reader for decoding below.
+                    let bf_reader = std::mem::replace(
+                        &mut async_file_reader,
+                        parquet_file_reader_factory.create_reader(
+                            partition_index,
+                            partitioned_file.clone(),
+                            metadata_size_hint,
+                            &metrics,
+                        )?,
+                    );
+                    let mut bf_builder =
+                        ParquetRecordBatchStreamBuilder::new_with_metadata(
+                            bf_reader,
+                            reader_metadata.clone(),
+                        );
                     row_groups
                         .prune_by_bloom_filters(
                             &physical_file_schema,
-                            &mut builder,
-                            predicate,
+                            &mut bf_builder,
+                            pruning_pred,
                             &file_metrics,
                         )
                         .await;
@@ -570,7 +538,7 @@ impl FileOpener for ParquetOpener {
                 access_plan = p.prune_plan_with_page_index(
                     access_plan,
                     &physical_file_schema,
-                    builder.parquet_schema(),
+                    reader_metadata.parquet_schema(),
                     file_metadata.as_ref(),
                     &file_metrics,
                 );
@@ -588,8 +556,59 @@ impl FileOpener for ParquetOpener {
                 prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
             }
 
+            if prepared_plan.row_group_indexes.is_empty() {
+                return Ok(futures::stream::empty().boxed());
+            }
+
+            // ---------------------------------------------------------
+            // Step: construct builder for the final RecordBatch stream
+            // ---------------------------------------------------------
+
+            let mut builder =
+                
ParquetPushDecoderBuilder::new_with_metadata(reader_metadata.clone())
+                    .with_batch_size(batch_size);
+
+            // 
---------------------------------------------------------------------
+            // Step: optionally add row filter to the builder
+            //
+            // Row filter is used for late materialization in parquet 
decoding, see
+            // `row_filter` for details.
+            // 
---------------------------------------------------------------------
+
+            // Filter pushdown: evaluate predicates during scan
+            if let Some(predicate) =
+                pushdown_filters.then_some(predicate.as_ref()).flatten()
+            {
+                let row_filter = row_filter::build_row_filter(
+                    predicate,
+                    &physical_file_schema,
+                    file_metadata.as_ref(),
+                    reorder_predicates,
+                    &file_metrics,
+                );
+
+                match row_filter {
+                    Ok(Some(filter)) => {
+                        builder = builder.with_row_filter(filter);
+                    }
+                    Ok(None) => {}
+                    Err(e) => {
+                        debug!(
+                            "Ignoring error building row filter for 
'{predicate:?}': {e}"
+                        );
+                    }
+                };
+            };
+            if force_filter_selections {
+                builder =
+                    
builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
+            }
+
             // Apply the prepared plan to the builder
-            builder = prepared_plan.apply_to_builder(builder);
+            if let Some(row_selection) = prepared_plan.row_selection {
+                builder = builder.with_row_selection(row_selection);
+            }
+            builder = builder.with_row_groups(prepared_plan.row_group_indexes);
 
             if let Some(limit) = limit {
                 builder = builder.with_limit(limit)
@@ -603,11 +622,11 @@ impl FileOpener for ParquetOpener {
             let arrow_reader_metrics = ArrowReaderMetrics::enabled();
 
             let indices = projection.column_indices();
-            let mask = ProjectionMask::roots(builder.parquet_schema(), 
indices);
+            let mask =
+                ProjectionMask::roots(reader_metadata.parquet_schema(), 
indices.clone());
 
-            let stream = builder
+            let decoder = builder
                 .with_projection(mask)
-                .with_batch_size(batch_size)
                 .with_metrics(arrow_reader_metrics.clone())
                 .build()?;
 
@@ -617,57 +636,39 @@ impl FileOpener for ParquetOpener {
                 file_metrics.predicate_cache_inner_records.clone();
             let predicate_cache_records = 
file_metrics.predicate_cache_records.clone();
 
-            let stream_schema = Arc::clone(stream.schema());
-            // Check if we need to replace the schema to handle things like 
differing nullability or metadata.
-            // See note below about file vs. output schema.
-            let replace_schema = !stream_schema.eq(&output_schema);
-
             // Rebase column indices to match the narrowed stream schema.
             // The projection expressions have indices based on 
physical_file_schema,
             // but the stream only contains the columns selected by the 
ProjectionMask.
+            let stream_schema = 
Arc::new(physical_file_schema.project(&indices)?);
+            let replace_schema = stream_schema != output_schema;
             let projection = projection
                 .try_map_exprs(|expr| reassign_expr_columns(expr, 
&stream_schema))?;
-
             let projector = projection.make_projector(&stream_schema)?;
-
-            let stream = stream.map_err(DataFusionError::from).map(move |b| {
-                b.and_then(|mut b| {
-                    copy_arrow_reader_metrics(
-                        &arrow_reader_metrics,
-                        &predicate_cache_inner_records,
-                        &predicate_cache_records,
-                    );
-                    b = projector.project_batch(&b)?;
-                    if replace_schema {
-                        // Ensure the output batch has the expected schema.
-                        // This handles things like schema level and field 
level metadata, which may not be present
-                        // in the physical file schema.
-                        // It is also possible for nullability to differ; some 
writers create files with
-                        // OPTIONAL fields even when there are no nulls in the 
data.
-                        // In these cases it may make sense for the logical 
schema to be `NOT NULL`.
-                        // RecordBatch::try_new_with_options checks that if 
the schema is NOT NULL
-                        // the array cannot contain nulls, amongst other 
checks.
-                        let (_stream_schema, arrays, num_rows) = 
b.into_parts();
-                        let options =
-                            
RecordBatchOptions::new().with_row_count(Some(num_rows));
-                        RecordBatch::try_new_with_options(
-                            Arc::clone(&output_schema),
-                            arrays,
-                            &options,
-                        )
-                        .map_err(Into::into)
-                    } else {
-                        Ok(b)
-                    }
-                })
-            });
+            let stream = futures::stream::unfold(
+                PushDecoderStreamState {
+                    decoder,
+                    reader: async_file_reader,
+                    projector,
+                    output_schema,
+                    replace_schema,
+                    arrow_reader_metrics,
+                    predicate_cache_inner_records,
+                    predicate_cache_records,
+                },
+                |mut state| async move {
+                    let result = state.transition().await;
+                    result.map(|r| (r, state))
+                },
+            )
+            .fuse();
 
             // 
----------------------------------------------------------------------
             // Step: wrap the stream so a dynamic filter can stop the file 
scan early
             // 
----------------------------------------------------------------------
             if let Some(file_pruner) = file_pruner {
+                let boxed_stream = stream.boxed();
                 Ok(EarlyStoppingStream::new(
-                    stream,
+                    boxed_stream,
                     file_pruner,
                     files_ranges_pruned_statistics,
                 )
@@ -679,19 +680,82 @@ impl FileOpener for ParquetOpener {
     }
 }
 
-/// Copies metrics from ArrowReaderMetrics (the metrics collected by the
-/// arrow-rs parquet reader) to the parquet file metrics for DataFusion
-fn copy_arrow_reader_metrics(
-    arrow_reader_metrics: &ArrowReaderMetrics,
-    predicate_cache_inner_records: &Gauge,
-    predicate_cache_records: &Gauge,
-) {
-    if let Some(v) = arrow_reader_metrics.records_read_from_inner() {
-        predicate_cache_inner_records.set(v);
+/// State for a stream that decodes a single Parquet file using a push-based 
decoder.
+///
+/// The [`transition`](Self::transition) method drives the decoder in a loop: 
it requests
+/// byte ranges from the [`AsyncFileReader`], pushes the fetched data into the
+/// [`ParquetPushDecoder`], and yields projected [`RecordBatch`]es until the 
file is
+/// fully consumed.
+struct PushDecoderStreamState {
+    decoder: ParquetPushDecoder,
+    reader: Box<dyn AsyncFileReader>,
+    projector: Projector,
+    output_schema: Arc<Schema>,
+    replace_schema: bool,
+    arrow_reader_metrics: ArrowReaderMetrics,
+    predicate_cache_inner_records: Gauge,
+    predicate_cache_records: Gauge,
+}
+
+impl PushDecoderStreamState {
+    /// Advances the decoder state machine until the next [`RecordBatch`] is
+    /// produced, the file is fully consumed, or an error occurs.
+    ///
+    /// On each iteration the decoder is polled via 
[`ParquetPushDecoder::try_decode`]:
+    /// - [`NeedsData`](DecodeResult::NeedsData) – the requested byte ranges 
are
+    ///   fetched from the [`AsyncFileReader`] and fed back into the decoder.
+    /// - [`Data`](DecodeResult::Data) – a decoded batch is projected and 
returned.
+    /// - [`Finished`](DecodeResult::Finished) – signals end-of-stream 
(`None`).
+    async fn transition(&mut self) -> Option<Result<RecordBatch>> {
+        loop {
+            match self.decoder.try_decode() {
+                Ok(DecodeResult::NeedsData(ranges)) => {
+                    let fetch = async {
+                        let data = 
self.reader.get_byte_ranges(ranges.clone()).await?;
+                        self.decoder.push_ranges(ranges, data)?;
+                        Ok::<_, ParquetError>(())
+                    };
+                    if let Err(e) = fetch.await {
+                        return Some(Err(DataFusionError::from(e)));
+                    }
+                }
+                Ok(DecodeResult::Data(batch)) => {
+                    self.copy_arrow_reader_metrics();
+                    return Some(self.project_batch(&batch));
+                }
+                Ok(DecodeResult::Finished) => {
+                    return None;
+                }
+                Err(e) => {
+                    return Some(Err(DataFusionError::from(e)));
+                }
+            }
+        }
     }
 
-    if let Some(v) = arrow_reader_metrics.records_read_from_cache() {
-        predicate_cache_records.set(v);
+    /// Copies metrics from ArrowReaderMetrics (the metrics collected by the
+    /// arrow-rs parquet reader) to the parquet file metrics for DataFusion
+    fn copy_arrow_reader_metrics(&self) {
+        if let Some(v) = self.arrow_reader_metrics.records_read_from_inner() {
+            self.predicate_cache_inner_records.set(v);
+        }
+        if let Some(v) = self.arrow_reader_metrics.records_read_from_cache() {
+            self.predicate_cache_records.set(v);
+        }
+    }
+
+    fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
+        let mut batch = self.projector.project_batch(batch)?;
+        if self.replace_schema {
+            let (_schema, arrays, num_rows) = batch.into_parts();
+            let options = 
RecordBatchOptions::new().with_row_count(Some(num_rows));
+            batch = RecordBatch::try_new_with_options(
+                Arc::clone(&self.output_schema),
+                arrays,
+                &options,
+            )?;
+        }
+        Ok(batch)
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to