This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fa265154bd Encapsulate early pruning in its own stream (#17293)
fa265154bd is described below

commit fa265154bd6826b15e7dea4ac30d39ac369cae91
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Aug 24 03:51:03 2025 -0700

    Encapsulate early pruning in its own stream (#17293)
---
 datafusion/datasource-parquet/src/opener.rs | 153 ++++++++++++++++++----------
 1 file changed, 97 insertions(+), 56 deletions(-)

diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index 694b14f6a5..42b5776abe 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -17,17 +17,19 @@
 
 //! [`ParquetOpener`] for opening Parquet files
 
-use std::sync::Arc;
-
 use crate::page_filter::PagePruningAccessPlanFilter;
 use crate::row_group_filter::RowGroupAccessPlanFilter;
 use crate::{
     apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
     ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
 };
+use arrow::array::RecordBatch;
 use datafusion_datasource::file_meta::FileMeta;
 use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
 use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
 
 use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit};
 use arrow::error::ArrowError;
@@ -47,7 +49,7 @@ use datafusion_pruning::{build_pruning_predicate, FilePruner, 
PruningPredicate};
 use datafusion_common::config::EncryptionFactoryOptions;
 #[cfg(feature = "parquet_encryption")]
 use datafusion_execution::parquet_encryption::EncryptionFactory;
-use futures::{StreamExt, TryStreamExt};
+use futures::{ready, Stream, StreamExt, TryStreamExt};
 use itertools::Itertools;
 use log::debug;
 use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
@@ -409,64 +411,103 @@ impl FileOpener for ParquetOpener {
                 .with_row_groups(row_group_indexes)
                 .build()?;
 
-            // Create a stateful stream that can check pruning after each batch
-            let adapted = {
-                use futures::stream;
-                let schema_mapping = Some(schema_mapping);
-                let file_pruner = file_pruner;
-                let stream = stream.map_err(|e| 
ArrowError::ExternalError(Box::new(e)));
-                let files_ranges_pruned_statistics =
-                    file_metrics.files_ranges_pruned_statistics.clone();
-
-                stream::try_unfold(
-                    (
-                        stream,
-                        schema_mapping,
-                        file_pruner,
-                        files_ranges_pruned_statistics,
-                    ),
-                    move |(
-                        mut stream,
-                        schema_mapping_opt,
-                        mut file_pruner,
-                        files_ranges_pruned_statistics,
-                    )| async move {
-                        match stream.try_next().await? {
-                            Some(batch) => {
-                                let schema_mapping = 
schema_mapping_opt.as_ref().unwrap();
-                                let mapped_batch = 
schema_mapping.map_batch(batch)?;
-
-                                // Check if we can prune the file now
-                                if let Some(ref mut pruner) = file_pruner {
-                                    if pruner.should_prune()? {
-                                        // File can now be pruned based on 
updated dynamic filters
-                                        files_ranges_pruned_statistics.add(1);
-                                        // Terminate the stream early
-                                        return Ok(None);
-                                    }
-                                }
-
-                                Ok(Some((
-                                    mapped_batch,
-                                    (
-                                        stream,
-                                        schema_mapping_opt,
-                                        file_pruner,
-                                        files_ranges_pruned_statistics,
-                                    ),
-                                )))
-                            }
-                            None => Ok(None),
-                        }
-                    },
-                )
-            };
+            let stream = stream
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))
+                .map(move |b| b.and_then(|b| 
Ok(schema_mapping.map_batch(b)?)));
 
-            Ok(adapted.boxed())
+            if let Some(file_pruner) = file_pruner {
+                Ok(EarlyStoppingStream::new(
+                    stream,
+                    file_pruner,
+                    file_metrics.files_ranges_pruned_statistics.clone(),
+                )
+                .boxed())
+            } else {
+                Ok(stream.boxed())
+            }
         }))
     }
 }
 
+/// Wraps an inner RecordBatchStream and a [`FilePruner`]
+///
+/// This can terminate the scan early when some dynamic filters is updated 
after
+/// the scan starts, so we discover after the scan starts that the file can be
+/// pruned (can't have matching rows).
+struct EarlyStoppingStream<S> {
+    /// Has the stream finished processing? All subsequent polls will return
+    /// None
+    done: bool,
+    file_pruner: FilePruner,
+    files_ranges_pruned_statistics: Count,
+    /// The inner stream
+    inner: S,
+}
+
+impl<S> EarlyStoppingStream<S> {
+    pub fn new(
+        stream: S,
+        file_pruner: FilePruner,
+        files_ranges_pruned_statistics: Count,
+    ) -> Self {
+        Self {
+            done: false,
+            inner: stream,
+            file_pruner,
+            files_ranges_pruned_statistics,
+        }
+    }
+}
+impl<S> EarlyStoppingStream<S>
+where
+    S: Stream<Item = Result<RecordBatch, ArrowError>> + Unpin,
+{
+    fn check_prune(
+        &mut self,
+        input: Result<RecordBatch, ArrowError>,
+    ) -> Result<Option<RecordBatch>, ArrowError> {
+        let batch = input?;
+
+        // Since dynamic filters may have been updated, see if we can stop
+        // reading this stream entirely.
+        if self.file_pruner.should_prune()? {
+            self.files_ranges_pruned_statistics.add(1);
+            self.done = true;
+            Ok(None)
+        } else {
+            // Return the adapted batch
+            Ok(Some(batch))
+        }
+    }
+}
+
+impl<S> Stream for EarlyStoppingStream<S>
+where
+    S: Stream<Item = Result<RecordBatch, ArrowError>> + Unpin,
+{
+    type Item = Result<RecordBatch, ArrowError>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if self.done {
+            return Poll::Ready(None);
+        }
+        match ready!(self.inner.poll_next_unpin(cx)) {
+            None => {
+                // input done
+                self.done = true;
+                Poll::Ready(None)
+            }
+            Some(input_batch) => {
+                let output = self.check_prune(input_batch);
+                Poll::Ready(output.transpose())
+            }
+        }
+    }
+}
+
 #[cfg(feature = "parquet_encryption")]
 impl ParquetOpener {
     fn get_file_decryption_properties(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to