This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fa265154bd Encapsulate early pruning in its own stream (#17293)
fa265154bd is described below
commit fa265154bd6826b15e7dea4ac30d39ac369cae91
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Aug 24 03:51:03 2025 -0700
Encapsulate early pruning in its own stream (#17293)
---
datafusion/datasource-parquet/src/opener.rs | 153 ++++++++++++++++++----------
1 file changed, 97 insertions(+), 56 deletions(-)
diff --git a/datafusion/datasource-parquet/src/opener.rs
b/datafusion/datasource-parquet/src/opener.rs
index 694b14f6a5..42b5776abe 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -17,17 +17,19 @@
//! [`ParquetOpener`] for opening Parquet files
-use std::sync::Arc;
-
use crate::page_filter::PagePruningAccessPlanFilter;
use crate::row_group_filter::RowGroupAccessPlanFilter;
use crate::{
apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
};
+use arrow::array::RecordBatch;
use datafusion_datasource::file_meta::FileMeta;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit};
use arrow::error::ArrowError;
@@ -47,7 +49,7 @@ use datafusion_pruning::{build_pruning_predicate, FilePruner,
PruningPredicate};
use datafusion_common::config::EncryptionFactoryOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_execution::parquet_encryption::EncryptionFactory;
-use futures::{StreamExt, TryStreamExt};
+use futures::{ready, Stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use log::debug;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
@@ -409,64 +411,103 @@ impl FileOpener for ParquetOpener {
.with_row_groups(row_group_indexes)
.build()?;
- // Create a stateful stream that can check pruning after each batch
- let adapted = {
- use futures::stream;
- let schema_mapping = Some(schema_mapping);
- let file_pruner = file_pruner;
- let stream = stream.map_err(|e|
ArrowError::ExternalError(Box::new(e)));
- let files_ranges_pruned_statistics =
- file_metrics.files_ranges_pruned_statistics.clone();
-
- stream::try_unfold(
- (
- stream,
- schema_mapping,
- file_pruner,
- files_ranges_pruned_statistics,
- ),
- move |(
- mut stream,
- schema_mapping_opt,
- mut file_pruner,
- files_ranges_pruned_statistics,
- )| async move {
- match stream.try_next().await? {
- Some(batch) => {
- let schema_mapping =
schema_mapping_opt.as_ref().unwrap();
- let mapped_batch =
schema_mapping.map_batch(batch)?;
-
- // Check if we can prune the file now
- if let Some(ref mut pruner) = file_pruner {
- if pruner.should_prune()? {
- // File can now be pruned based on
updated dynamic filters
- files_ranges_pruned_statistics.add(1);
- // Terminate the stream early
- return Ok(None);
- }
- }
-
- Ok(Some((
- mapped_batch,
- (
- stream,
- schema_mapping_opt,
- file_pruner,
- files_ranges_pruned_statistics,
- ),
- )))
- }
- None => Ok(None),
- }
- },
- )
- };
+ let stream = stream
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))
+ .map(move |b| b.and_then(|b|
Ok(schema_mapping.map_batch(b)?)));
- Ok(adapted.boxed())
+ if let Some(file_pruner) = file_pruner {
+ Ok(EarlyStoppingStream::new(
+ stream,
+ file_pruner,
+ file_metrics.files_ranges_pruned_statistics.clone(),
+ )
+ .boxed())
+ } else {
+ Ok(stream.boxed())
+ }
}))
}
}
+/// Wraps an inner RecordBatchStream and a [`FilePruner`]
+///
+/// This can terminate the scan early when some dynamic filters is updated
after
+/// the scan starts, so we discover after the scan starts that the file can be
+/// pruned (can't have matching rows).
+struct EarlyStoppingStream<S> {
+ /// Has the stream finished processing? All subsequent polls will return
+ /// None
+ done: bool,
+ file_pruner: FilePruner,
+ files_ranges_pruned_statistics: Count,
+ /// The inner stream
+ inner: S,
+}
+
+impl<S> EarlyStoppingStream<S> {
+ pub fn new(
+ stream: S,
+ file_pruner: FilePruner,
+ files_ranges_pruned_statistics: Count,
+ ) -> Self {
+ Self {
+ done: false,
+ inner: stream,
+ file_pruner,
+ files_ranges_pruned_statistics,
+ }
+ }
+}
+impl<S> EarlyStoppingStream<S>
+where
+ S: Stream<Item = Result<RecordBatch, ArrowError>> + Unpin,
+{
+ fn check_prune(
+ &mut self,
+ input: Result<RecordBatch, ArrowError>,
+ ) -> Result<Option<RecordBatch>, ArrowError> {
+ let batch = input?;
+
+ // Since dynamic filters may have been updated, see if we can stop
+ // reading this stream entirely.
+ if self.file_pruner.should_prune()? {
+ self.files_ranges_pruned_statistics.add(1);
+ self.done = true;
+ Ok(None)
+ } else {
+ // Return the adapted batch
+ Ok(Some(batch))
+ }
+ }
+}
+
+impl<S> Stream for EarlyStoppingStream<S>
+where
+ S: Stream<Item = Result<RecordBatch, ArrowError>> + Unpin,
+{
+ type Item = Result<RecordBatch, ArrowError>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ if self.done {
+ return Poll::Ready(None);
+ }
+ match ready!(self.inner.poll_next_unpin(cx)) {
+ None => {
+ // input done
+ self.done = true;
+ Poll::Ready(None)
+ }
+ Some(input_batch) => {
+ let output = self.check_prune(input_batch);
+ Poll::Ready(output.transpose())
+ }
+ }
+ }
+}
+
#[cfg(feature = "parquet_encryption")]
impl ParquetOpener {
fn get_file_decryption_properties(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]