This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 48199b9da3 Use `ParquetPushDecoder` in `ParquetOpener` (#20839)
48199b9da3 is described below
commit 48199b9da3341100eb754cd569c008936b3f32a7
Author: Daniël Heres <[email protected]>
AuthorDate: Wed Mar 11 07:15:06 2026 +0100
Use `ParquetPushDecoder` in `ParquetOpener` (#20839)
## Which issue does this PR close?
- Closes #20841
## Rationale for this change
We want to split IO and CPU to allow for more (NUMA-aware) parallelism
and utilizing IO and CPU better.
This allows for e.g. more coalescing, prefetching, parallel IO, more
parallel / incremental decoding etc.
Also this allows doing morsels only on a CPU level and not doing IO
multiple times for each morsel.
## What changes are included in this PR?
Just refactor `ParquetOpener` to use `ParquetPushDecoder`. I used claude
to rewrite it and to keep changes small.
## Are these changes tested?
Existing tests. Nothing should change, the arrow-rs code also uses
`ParquetPushDecoder`.
## Are there any user-facing changes?
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
datafusion/datasource-parquet/src/opener.rs | 302 +++++++++++++++++-----------
1 file changed, 183 insertions(+), 119 deletions(-)
diff --git a/datafusion/datasource-parquet/src/opener.rs
b/datafusion/datasource-parquet/src/opener.rs
index 108e8c5752..0d8e825a89 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -24,11 +24,12 @@ use crate::{
apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
};
use arrow::array::{RecordBatch, RecordBatchOptions};
-use arrow::datatypes::DataType;
+use arrow::datatypes::{DataType, Schema};
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
-use datafusion_physical_expr::projection::ProjectionExprs;
+use datafusion_physical_expr::projection::{ProjectionExprs, Projector};
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr_adapter::replace_columns_with_literals;
+use parquet::errors::ParquetError;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
@@ -56,13 +57,15 @@ use crate::sort::reverse_row_selection;
use datafusion_common::config::EncryptionFactoryOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_execution::parquet_encryption::EncryptionFactory;
-use futures::{Stream, StreamExt, TryStreamExt, ready};
+use futures::{Stream, StreamExt, ready};
use log::debug;
+use parquet::DecodeResult;
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
};
use parquet::arrow::async_reader::AsyncFileReader;
+use parquet::arrow::push_decoder::{ParquetPushDecoder,
ParquetPushDecoderBuilder};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader,
RowGroupMetaData};
@@ -167,17 +170,6 @@ impl PreparedAccessPlan {
Ok(self)
}
-
- /// Apply this access plan to a ParquetRecordBatchStreamBuilder
- fn apply_to_builder(
- self,
- mut builder: ParquetRecordBatchStreamBuilder<Box<dyn AsyncFileReader>>,
- ) -> ParquetRecordBatchStreamBuilder<Box<dyn AsyncFileReader>> {
- if let Some(row_selection) = self.row_selection {
- builder = builder.with_row_selection(row_selection);
- }
- builder.with_row_groups(self.row_group_indexes)
- }
}
impl FileOpener for ParquetOpener {
@@ -267,6 +259,9 @@ impl FileOpener for ParquetOpener {
let enable_bloom_filter = self.enable_bloom_filter;
let enable_row_group_stats_pruning =
self.enable_row_group_stats_pruning;
let limit = self.limit;
+ let parquet_file_reader_factory =
Arc::clone(&self.parquet_file_reader_factory);
+ let partition_index = self.partition_index;
+ let metrics = self.metrics.clone();
let predicate_creation_errors = MetricBuilder::new(&self.metrics)
.global_counter("num_predicate_creation_errors");
@@ -444,57 +439,14 @@ impl FileOpener for ParquetOpener {
metadata_timer.stop();
- // ---------------------------------------------------------
- // Step: construct builder for the final RecordBatch stream
- // ---------------------------------------------------------
-
- let mut builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(
- async_file_reader,
- reader_metadata,
- );
-
- //
---------------------------------------------------------------------
- // Step: optionally add row filter to the builder
- //
- // Row filter is used for late materialization in parquet
decoding, see
- // `row_filter` for details.
- //
---------------------------------------------------------------------
-
- // Filter pushdown: evaluate predicates during scan
- if let Some(predicate) =
pushdown_filters.then_some(predicate).flatten() {
- let row_filter = row_filter::build_row_filter(
- &predicate,
- &physical_file_schema,
- builder.metadata(),
- reorder_predicates,
- &file_metrics,
- );
-
- match row_filter {
- Ok(Some(filter)) => {
- builder = builder.with_row_filter(filter);
- }
- Ok(None) => {}
- Err(e) => {
- debug!(
- "Ignoring error building row filter for
'{predicate:?}': {e}"
- );
- }
- };
- };
- if force_filter_selections {
- builder =
-
builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
- }
-
// ------------------------------------------------------------
// Step: prune row groups by range, predicate and bloom filter
// ------------------------------------------------------------
// Determine which row groups to actually read. The idea is to skip
// as many row groups as possible based on the metadata and query
- let file_metadata = Arc::clone(builder.metadata());
- let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
+ let file_metadata = Arc::clone(reader_metadata.metadata());
+ let pruning_pred = pruning_predicate.as_ref().map(|p| p.as_ref());
let rg_metadata = file_metadata.row_groups();
// track which row groups to actually read
let access_plan =
@@ -506,13 +458,13 @@ impl FileOpener for ParquetOpener {
}
// If there is a predicate that can be evaluated against the
metadata
- if let Some(predicate) = predicate.as_ref() {
+ if let Some(pruning_pred) = pruning_pred.as_ref() {
if enable_row_group_stats_pruning {
row_groups.prune_by_statistics(
&physical_file_schema,
- builder.parquet_schema(),
+ reader_metadata.parquet_schema(),
rg_metadata,
- predicate,
+ pruning_pred,
&file_metrics,
);
} else {
@@ -524,11 +476,27 @@ impl FileOpener for ParquetOpener {
}
if enable_bloom_filter && !row_groups.is_empty() {
+ // Use the existing reader for bloom filter I/O;
+ // replace with a fresh reader for decoding below.
+ let bf_reader = std::mem::replace(
+ &mut async_file_reader,
+ parquet_file_reader_factory.create_reader(
+ partition_index,
+ partitioned_file.clone(),
+ metadata_size_hint,
+ &metrics,
+ )?,
+ );
+ let mut bf_builder =
+ ParquetRecordBatchStreamBuilder::new_with_metadata(
+ bf_reader,
+ reader_metadata.clone(),
+ );
row_groups
.prune_by_bloom_filters(
&physical_file_schema,
- &mut builder,
- predicate,
+ &mut bf_builder,
+ pruning_pred,
&file_metrics,
)
.await;
@@ -570,7 +538,7 @@ impl FileOpener for ParquetOpener {
access_plan = p.prune_plan_with_page_index(
access_plan,
&physical_file_schema,
- builder.parquet_schema(),
+ reader_metadata.parquet_schema(),
file_metadata.as_ref(),
&file_metrics,
);
@@ -588,8 +556,59 @@ impl FileOpener for ParquetOpener {
prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
}
+ if prepared_plan.row_group_indexes.is_empty() {
+ return Ok(futures::stream::empty().boxed());
+ }
+
+ // ---------------------------------------------------------
+ // Step: construct builder for the final RecordBatch stream
+ // ---------------------------------------------------------
+
+ let mut builder =
+
ParquetPushDecoderBuilder::new_with_metadata(reader_metadata.clone())
+ .with_batch_size(batch_size);
+
+ //
---------------------------------------------------------------------
+ // Step: optionally add row filter to the builder
+ //
+ // Row filter is used for late materialization in parquet
decoding, see
+ // `row_filter` for details.
+ //
---------------------------------------------------------------------
+
+ // Filter pushdown: evaluate predicates during scan
+ if let Some(predicate) =
+ pushdown_filters.then_some(predicate.as_ref()).flatten()
+ {
+ let row_filter = row_filter::build_row_filter(
+ predicate,
+ &physical_file_schema,
+ file_metadata.as_ref(),
+ reorder_predicates,
+ &file_metrics,
+ );
+
+ match row_filter {
+ Ok(Some(filter)) => {
+ builder = builder.with_row_filter(filter);
+ }
+ Ok(None) => {}
+ Err(e) => {
+ debug!(
+ "Ignoring error building row filter for
'{predicate:?}': {e}"
+ );
+ }
+ };
+ };
+ if force_filter_selections {
+ builder =
+
builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
+ }
+
// Apply the prepared plan to the builder
- builder = prepared_plan.apply_to_builder(builder);
+ if let Some(row_selection) = prepared_plan.row_selection {
+ builder = builder.with_row_selection(row_selection);
+ }
+ builder = builder.with_row_groups(prepared_plan.row_group_indexes);
if let Some(limit) = limit {
builder = builder.with_limit(limit)
@@ -603,11 +622,11 @@ impl FileOpener for ParquetOpener {
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
let indices = projection.column_indices();
- let mask = ProjectionMask::roots(builder.parquet_schema(),
indices);
+ let mask =
+ ProjectionMask::roots(reader_metadata.parquet_schema(),
indices.clone());
- let stream = builder
+ let decoder = builder
.with_projection(mask)
- .with_batch_size(batch_size)
.with_metrics(arrow_reader_metrics.clone())
.build()?;
@@ -617,57 +636,39 @@ impl FileOpener for ParquetOpener {
file_metrics.predicate_cache_inner_records.clone();
let predicate_cache_records =
file_metrics.predicate_cache_records.clone();
- let stream_schema = Arc::clone(stream.schema());
- // Check if we need to replace the schema to handle things like
differing nullability or metadata.
- // See note below about file vs. output schema.
- let replace_schema = !stream_schema.eq(&output_schema);
-
// Rebase column indices to match the narrowed stream schema.
// The projection expressions have indices based on
physical_file_schema,
// but the stream only contains the columns selected by the
ProjectionMask.
+ let stream_schema =
Arc::new(physical_file_schema.project(&indices)?);
+ let replace_schema = stream_schema != output_schema;
let projection = projection
.try_map_exprs(|expr| reassign_expr_columns(expr,
&stream_schema))?;
-
let projector = projection.make_projector(&stream_schema)?;
-
- let stream = stream.map_err(DataFusionError::from).map(move |b| {
- b.and_then(|mut b| {
- copy_arrow_reader_metrics(
- &arrow_reader_metrics,
- &predicate_cache_inner_records,
- &predicate_cache_records,
- );
- b = projector.project_batch(&b)?;
- if replace_schema {
- // Ensure the output batch has the expected schema.
- // This handles things like schema level and field
level metadata, which may not be present
- // in the physical file schema.
- // It is also possible for nullability to differ; some
writers create files with
- // OPTIONAL fields even when there are no nulls in the
data.
- // In these cases it may make sense for the logical
schema to be `NOT NULL`.
- // RecordBatch::try_new_with_options checks that if
the schema is NOT NULL
- // the array cannot contain nulls, amongst other
checks.
- let (_stream_schema, arrays, num_rows) =
b.into_parts();
- let options =
-
RecordBatchOptions::new().with_row_count(Some(num_rows));
- RecordBatch::try_new_with_options(
- Arc::clone(&output_schema),
- arrays,
- &options,
- )
- .map_err(Into::into)
- } else {
- Ok(b)
- }
- })
- });
+ let stream = futures::stream::unfold(
+ PushDecoderStreamState {
+ decoder,
+ reader: async_file_reader,
+ projector,
+ output_schema,
+ replace_schema,
+ arrow_reader_metrics,
+ predicate_cache_inner_records,
+ predicate_cache_records,
+ },
+ |mut state| async move {
+ let result = state.transition().await;
+ result.map(|r| (r, state))
+ },
+ )
+ .fuse();
//
----------------------------------------------------------------------
// Step: wrap the stream so a dynamic filter can stop the file
scan early
//
----------------------------------------------------------------------
if let Some(file_pruner) = file_pruner {
+ let boxed_stream = stream.boxed();
Ok(EarlyStoppingStream::new(
- stream,
+ boxed_stream,
file_pruner,
files_ranges_pruned_statistics,
)
@@ -679,19 +680,82 @@ impl FileOpener for ParquetOpener {
}
}
-/// Copies metrics from ArrowReaderMetrics (the metrics collected by the
-/// arrow-rs parquet reader) to the parquet file metrics for DataFusion
-fn copy_arrow_reader_metrics(
- arrow_reader_metrics: &ArrowReaderMetrics,
- predicate_cache_inner_records: &Gauge,
- predicate_cache_records: &Gauge,
-) {
- if let Some(v) = arrow_reader_metrics.records_read_from_inner() {
- predicate_cache_inner_records.set(v);
+/// State for a stream that decodes a single Parquet file using a push-based
decoder.
+///
+/// The [`transition`](Self::transition) method drives the decoder in a loop:
it requests
+/// byte ranges from the [`AsyncFileReader`], pushes the fetched data into the
+/// [`ParquetPushDecoder`], and yields projected [`RecordBatch`]es until the
file is
+/// fully consumed.
+struct PushDecoderStreamState {
+ decoder: ParquetPushDecoder,
+ reader: Box<dyn AsyncFileReader>,
+ projector: Projector,
+ output_schema: Arc<Schema>,
+ replace_schema: bool,
+ arrow_reader_metrics: ArrowReaderMetrics,
+ predicate_cache_inner_records: Gauge,
+ predicate_cache_records: Gauge,
+}
+
+impl PushDecoderStreamState {
+ /// Advances the decoder state machine until the next [`RecordBatch`] is
+ /// produced, the file is fully consumed, or an error occurs.
+ ///
+ /// On each iteration the decoder is polled via
[`ParquetPushDecoder::try_decode`]:
+ /// - [`NeedsData`](DecodeResult::NeedsData) – the requested byte ranges
are
+ /// fetched from the [`AsyncFileReader`] and fed back into the decoder.
+ /// - [`Data`](DecodeResult::Data) – a decoded batch is projected and
returned.
+ /// - [`Finished`](DecodeResult::Finished) – signals end-of-stream
(`None`).
+ async fn transition(&mut self) -> Option<Result<RecordBatch>> {
+ loop {
+ match self.decoder.try_decode() {
+ Ok(DecodeResult::NeedsData(ranges)) => {
+ let fetch = async {
+ let data =
self.reader.get_byte_ranges(ranges.clone()).await?;
+ self.decoder.push_ranges(ranges, data)?;
+ Ok::<_, ParquetError>(())
+ };
+ if let Err(e) = fetch.await {
+ return Some(Err(DataFusionError::from(e)));
+ }
+ }
+ Ok(DecodeResult::Data(batch)) => {
+ self.copy_arrow_reader_metrics();
+ return Some(self.project_batch(&batch));
+ }
+ Ok(DecodeResult::Finished) => {
+ return None;
+ }
+ Err(e) => {
+ return Some(Err(DataFusionError::from(e)));
+ }
+ }
+ }
}
- if let Some(v) = arrow_reader_metrics.records_read_from_cache() {
- predicate_cache_records.set(v);
+ /// Copies metrics from ArrowReaderMetrics (the metrics collected by the
+ /// arrow-rs parquet reader) to the parquet file metrics for DataFusion
+ fn copy_arrow_reader_metrics(&self) {
+ if let Some(v) = self.arrow_reader_metrics.records_read_from_inner() {
+ self.predicate_cache_inner_records.set(v);
+ }
+ if let Some(v) = self.arrow_reader_metrics.records_read_from_cache() {
+ self.predicate_cache_records.set(v);
+ }
+ }
+
+ fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
+ let mut batch = self.projector.project_batch(batch)?;
+ if self.replace_schema {
+ let (_schema, arrays, num_rows) = batch.into_parts();
+ let options =
RecordBatchOptions::new().with_row_count(Some(num_rows));
+ batch = RecordBatch::try_new_with_options(
+ Arc::clone(&self.output_schema),
+ arrays,
+ &options,
+ )?;
+ }
+ Ok(batch)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]