alamb commented on code in PR #22191:
URL: https://github.com/apache/datafusion/pull/22191#discussion_r3248847301


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1280,6 +1230,113 @@ impl RowGroupsPrunedParquetOpen {
     }
 }
 
+/// Builds row filters for decoder runs.
+///
+/// A [`RowFilter`] must be owned by a decoder, so scans split across multiple
+/// decoder runs need a fresh filter for each run that evaluates row 
predicates.
+struct RowFilterGenerator<'a> {
+    predicate: Option<&'a Arc<dyn PhysicalExpr>>,
+    physical_file_schema: &'a SchemaRef,
+    file_metadata: &'a ParquetMetaData,
+    reorder_predicates: bool,
+    file_metrics: &'a ParquetFileMetrics,
+    first_row_filter: Option<RowFilter>,
+}
+
+impl<'a> RowFilterGenerator<'a> {
+    fn new(
+        prepared: &'a PreparedParquetOpen,
+        file_metadata: &'a ParquetMetaData,
+    ) -> Self {
+        let predicate = prepared
+            .pushdown_filters
+            .then_some(prepared.predicate.as_ref())
+            .flatten();
+
+        let mut generator = Self {
+            predicate,
+            physical_file_schema: &prepared.physical_file_schema,
+            file_metadata,
+            reorder_predicates: prepared.reorder_predicates,
+            file_metrics: &prepared.file_metrics,
+            first_row_filter: None,
+        };
+        generator.first_row_filter = generator.build_row_filter();
+        generator
+    }
+
+    fn has_row_filter(&self) -> bool {
+        self.first_row_filter.is_some()
+    }
+
+    fn next_filter(&mut self) -> Option<RowFilter> {
+        self.first_row_filter
+            .take()
+            .or_else(|| self.build_row_filter())
+    }
+
+    fn build_row_filter(&self) -> Option<RowFilter> {
+        let predicate = self.predicate?;
+        match row_filter::build_row_filter(
+            predicate,
+            self.physical_file_schema,
+            self.file_metadata,
+            self.reorder_predicates,
+            self.file_metrics,
+        ) {
+            Ok(Some(filter)) => Some(filter),
+            Ok(None) => None,
+            Err(e) => {
+                debug!("Ignoring error building row filter for 
'{predicate:?}': {e}");
+                None
+            }
+        }
+    }
+}
+
+fn prepare_access_plan(
+    plan: ParquetAccessPlan,
+    rg_metadata: &[RowGroupMetaData],
+    file_metadata: &ParquetMetaData,
+    reverse_row_groups: bool,
+) -> Result<PreparedAccessPlan> {
+    let mut prepared_access_plan = plan.prepare(rg_metadata)?;
+    if reverse_row_groups {
+        prepared_access_plan = prepared_access_plan.reverse(file_metadata)?;
+    }
+    Ok(prepared_access_plan)
+}
+
+struct DecoderBuilderConfig<'a> {

Review Comment:
   It would be nice to add some comments here explaining what this represnets 
(namely the state needed to build the ParquetPushDecoder)



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1280,6 +1230,113 @@ impl RowGroupsPrunedParquetOpen {
     }
 }
 
+/// Builds row filters for decoder runs.
+///
+/// A [`RowFilter`] must be owned by a decoder, so scans split across multiple
+/// decoder runs need a fresh filter for each run that evaluates row 
predicates.
+struct RowFilterGenerator<'a> {
+    predicate: Option<&'a Arc<dyn PhysicalExpr>>,
+    physical_file_schema: &'a SchemaRef,
+    file_metadata: &'a ParquetMetaData,
+    reorder_predicates: bool,
+    file_metrics: &'a ParquetFileMetrics,
+    first_row_filter: Option<RowFilter>,
+}
+
+impl<'a> RowFilterGenerator<'a> {
+    fn new(
+        prepared: &'a PreparedParquetOpen,
+        file_metadata: &'a ParquetMetaData,
+    ) -> Self {
+        let predicate = prepared
+            .pushdown_filters
+            .then_some(prepared.predicate.as_ref())
+            .flatten();
+
+        let mut generator = Self {
+            predicate,
+            physical_file_schema: &prepared.physical_file_schema,
+            file_metadata,
+            reorder_predicates: prepared.reorder_predicates,
+            file_metrics: &prepared.file_metrics,
+            first_row_filter: None,
+        };
+        generator.first_row_filter = generator.build_row_filter();
+        generator
+    }
+
+    fn has_row_filter(&self) -> bool {
+        self.first_row_filter.is_some()
+    }
+
+    fn next_filter(&mut self) -> Option<RowFilter> {
+        self.first_row_filter
+            .take()
+            .or_else(|| self.build_row_filter())
+    }
+
+    fn build_row_filter(&self) -> Option<RowFilter> {
+        let predicate = self.predicate?;
+        match row_filter::build_row_filter(
+            predicate,
+            self.physical_file_schema,
+            self.file_metadata,
+            self.reorder_predicates,
+            self.file_metrics,
+        ) {
+            Ok(Some(filter)) => Some(filter),
+            Ok(None) => None,
+            Err(e) => {
+                debug!("Ignoring error building row filter for 
'{predicate:?}': {e}");
+                None
+            }
+        }
+    }
+}
+
+fn prepare_access_plan(
+    plan: ParquetAccessPlan,
+    rg_metadata: &[RowGroupMetaData],
+    file_metadata: &ParquetMetaData,
+    reverse_row_groups: bool,
+) -> Result<PreparedAccessPlan> {
+    let mut prepared_access_plan = plan.prepare(rg_metadata)?;
+    if reverse_row_groups {
+        prepared_access_plan = prepared_access_plan.reverse(file_metadata)?;
+    }
+    Ok(prepared_access_plan)
+}
+
+struct DecoderBuilderConfig<'a> {
+    read_plan: &'a ParquetReadPlan,
+    batch_size: usize,
+    arrow_reader_metrics: &'a ArrowReaderMetrics,
+    force_filter_selections: bool,
+    decoder_limit: Option<usize>,
+}
+
+fn build_decoder_builder(

Review Comment:
   Is there a reason this isn't a method on DecoderBuilderConfig? That seems 
like a natural way to encapsulate the functionality
   
   ```rust
   impl DecoderBuilderConfig {
     fn build(self, prepared_access_plan, metadata) -> ParquetPushDecoderBuilder
   ```
   
   



##########
datafusion/datasource-parquet/src/page_filter.rs:
##########
@@ -115,6 +115,26 @@ pub struct PagePruningAccessPlanFilter {
     predicates: Vec<PruningPredicate>,
 }
 
+/// Result of applying page-index pruning to a [`ParquetAccessPlan`].
+pub(crate) struct PagePruningResult {
+    pub(crate) access_plan: ParquetAccessPlan,
+    /// Pages skipped because the containing row group was fully matched by

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to