adriangb commented on code in PR #21956:
URL: https://github.com/apache/datafusion/pull/21956#discussion_r3244796568


##########
datafusion/datasource-parquet/src/access_plan_optimizer.rs:
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`AccessPlanOptimizer`] trait and implementations for optimizing
+//! row group access order during parquet scans.
+//!
+//! Applied after row group pruning but before building the decoder,
+//! these optimizers reorder (or reverse) the row groups to improve
+//! query performance — e.g., placing the "best" row groups first
+//! so TopK's dynamic filter threshold tightens quickly.
+
+use crate::access_plan::PreparedAccessPlan;
+use arrow::datatypes::Schema;
+use datafusion_common::Result;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use parquet::file::metadata::ParquetMetaData;
+use std::fmt::Debug;
+
+/// Optimizes the row group access order for a prepared access plan.
+///
+/// Implementations can reorder, reverse, or otherwise transform the
+/// row group read order to improve scan performance. The optimizer
+/// is applied once per file, after all pruning passes are complete.
+///
+/// # Examples
+///
+/// - [`ReverseRowGroups`]: simple O(n) reversal for DESC on ASC-sorted data
+/// - [`ReorderByStatistics`]: sort row groups by min/max statistics
+///   so TopK queries find optimal values first
+pub(crate) trait AccessPlanOptimizer: Send + Sync + Debug {
+    /// Transform the prepared access plan.
+    ///
+    /// Implementations should return the plan unchanged if they cannot
+    /// apply their optimization (e.g., missing statistics).
+    fn optimize(
+        &self,
+        plan: PreparedAccessPlan,
+        file_metadata: &ParquetMetaData,
+        arrow_schema: &Schema,
+    ) -> Result<PreparedAccessPlan>;
+}
+
+/// Reverse the row group order — simple O(n) reversal.
+///
+/// Used as a fallback when the sort column has no statistics available.
+/// For ASC-sorted files with a DESC query, reversing row groups places
+/// the highest-value row groups first.
+#[derive(Debug)]
+pub(crate) struct ReverseRowGroups;
+
+impl AccessPlanOptimizer for ReverseRowGroups {
+    fn optimize(
+        &self,
+        plan: PreparedAccessPlan,
+        file_metadata: &ParquetMetaData,
+        _arrow_schema: &Schema,
+    ) -> Result<PreparedAccessPlan> {
+        plan.reverse(file_metadata)

Review Comment:
   I like the idea but is it really worth having a trait just to hide 1 line 
behind it?



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1123,13 +1129,107 @@ impl RowGroupsPrunedParquetOpen {
             );
         }
 
-        // Prepare the access plan (extract row groups and row selection)
-        let mut prepared_plan = access_plan.prepare(rg_metadata)?;
+        // Row group ordering optimization (two composable steps):
+        //
+        // 1. reorder_by_statistics: sort RGs by min values (ASC) to align
+        //    with the file's declared output ordering. This fixes out-of-order

Review Comment:
   This comment doesn't make sense to me. A file either declares an output 
ordering or not. If it does the row groups should already be sorted by it. 
Should this say the _query's desired_ output ordering?



##########
datafusion/datasource-parquet/src/access_plan_optimizer.rs:
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`AccessPlanOptimizer`] trait and implementations for optimizing
+//! row group access order during parquet scans.
+//!
+//! Applied after row group pruning but before building the decoder,
+//! these optimizers reorder (or reverse) the row groups to improve
+//! query performance — e.g., placing the "best" row groups first
+//! so TopK's dynamic filter threshold tightens quickly.
+
+use crate::access_plan::PreparedAccessPlan;
+use arrow::datatypes::Schema;
+use datafusion_common::Result;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use parquet::file::metadata::ParquetMetaData;
+use std::fmt::Debug;
+
+/// Optimizes the row group access order for a prepared access plan.
+///
+/// Implementations can reorder, reverse, or otherwise transform the
+/// row group read order to improve scan performance. The optimizer
+/// is applied once per file, after all pruning passes are complete.
+///
+/// # Examples
+///
+/// - [`ReverseRowGroups`]: simple O(n) reversal for DESC on ASC-sorted data
+/// - [`ReorderByStatistics`]: sort row groups by min/max statistics
+///   so TopK queries find optimal values first
+pub(crate) trait AccessPlanOptimizer: Send + Sync + Debug {
+    /// Transform the prepared access plan.
+    ///
+    /// Implementations should return the plan unchanged if they cannot
+    /// apply their optimization (e.g., missing statistics).
+    fn optimize(
+        &self,
+        plan: PreparedAccessPlan,
+        file_metadata: &ParquetMetaData,
+        arrow_schema: &Schema,
+    ) -> Result<PreparedAccessPlan>;
+}
+
+/// Reverse the row group order — simple O(n) reversal.
+///
+/// Used as a fallback when the sort column has no statistics available.
+/// For ASC-sorted files with a DESC query, reversing row groups places
+/// the highest-value row groups first.
+#[derive(Debug)]
+pub(crate) struct ReverseRowGroups;
+
+impl AccessPlanOptimizer for ReverseRowGroups {
+    fn optimize(
+        &self,
+        plan: PreparedAccessPlan,
+        file_metadata: &ParquetMetaData,
+        _arrow_schema: &Schema,

Review Comment:
   It's also smelly that we have 2 implementations and the arguments / 
signatures already don't match.



##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -377,6 +395,120 @@ impl PreparedAccessPlan {
         })
     }
 
+    /// Reorder row groups by their min statistics for the given sort order.
+    ///
+    /// This helps TopK queries find optimal values first. Row groups are
+    /// always sorted by min values in ASC order — direction (DESC) is
+    /// handled separately by `reverse()` which is applied after reorder.
+    ///
+    /// Gracefully skips reordering when:
+    /// - There is a row_selection (too complex to remap)
+    /// - 0 or 1 row groups (nothing to reorder)
+    /// - Sort expression is not a simple column reference
+    /// - Statistics are unavailable
+    pub(crate) fn reorder_by_statistics(
+        mut self,
+        sort_order: &LexOrdering,
+        file_metadata: &ParquetMetaData,
+        arrow_schema: &Schema,
+    ) -> Result<Self> {
+        // Skip if row_selection present (too complex to remap)
+        if self.row_selection.is_some() {
+            debug!("Skipping RG reorder: row_selection present");
+            return Ok(self);
+        }
+
+        // Nothing to reorder
+        if self.row_group_indexes.len() <= 1 {
+            return Ok(self);
+        }
+
+        let first_sort_expr = sort_order.first();
+
+        // Extract column name from sort expression
+        let column: &Column = match 
first_sort_expr.expr.downcast_ref::<Column>() {
+            Some(col) => col,
+            None => {
+                debug!("Skipping RG reorder: sort expr is not a simple 
column");
+                return Ok(self);
+            }
+        };
+
+        // Build statistics converter for this column
+        let converter = match StatisticsConverter::try_new(
+            column.name(),
+            arrow_schema,
+            file_metadata.file_metadata().schema_descr(),
+        ) {
+            Ok(c) => c,
+            Err(e) => {
+                debug!("Skipping RG reorder: cannot create stats converter: 
{e}");
+                return Ok(self);
+            }
+        };
+
+        // Always sort ASC by min values — direction is handled by reverse
+        let rg_metadata: Vec<&RowGroupMetaData> = self
+            .row_group_indexes
+            .iter()
+            .map(|&idx| file_metadata.row_group(idx))
+            .collect();
+
+        let stat_mins = match 
converter.row_group_mins(rg_metadata.iter().copied()) {
+            Ok(vals) => vals,
+            Err(e) => {
+                debug!("Skipping RG reorder: cannot get min values: {e}");

Review Comment:
   Same note as 
https://github.com/apache/datafusion/pull/21956/changes#r3244789303.
   
   If this is expected a docstring explaining why would be great. If 
unexpected... then we should at least error in CI.



##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -208,9 +215,38 @@ impl DynamicFilterPhysicalExpr {
             state_watch,
             data_type: Arc::new(RwLock::new(None)),
             nullable: Arc::new(RwLock::new(None)),
+            sort_options: None,
+            fetch: None,
         }
     }
 
+    /// Create a new [`DynamicFilterPhysicalExpr`] with sort options.
+    ///
+    /// Sort options indicate the sort direction for each child expression,
+    /// enabling downstream consumers (e.g., parquet readers) to reorder
+    /// row groups by statistics for TopK queries.

Review Comment:
   Again not a fan of this way of passing information around.



##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -377,6 +395,120 @@ impl PreparedAccessPlan {
         })
     }
 
+    /// Reorder row groups by their min statistics for the given sort order.
+    ///
+    /// This helps TopK queries find optimal values first. Row groups are
+    /// always sorted by min values in ASC order — direction (DESC) is
+    /// handled separately by `reverse()` which is applied after reorder.
+    ///
+    /// Gracefully skips reordering when:
+    /// - There is a row_selection (too complex to remap)
+    /// - 0 or 1 row groups (nothing to reorder)
+    /// - Sort expression is not a simple column reference
+    /// - Statistics are unavailable
+    pub(crate) fn reorder_by_statistics(
+        mut self,
+        sort_order: &LexOrdering,
+        file_metadata: &ParquetMetaData,
+        arrow_schema: &Schema,
+    ) -> Result<Self> {
+        // Skip if row_selection present (too complex to remap)
+        if self.row_selection.is_some() {
+            debug!("Skipping RG reorder: row_selection present");
+            return Ok(self);
+        }
+
+        // Nothing to reorder
+        if self.row_group_indexes.len() <= 1 {
+            return Ok(self);
+        }
+
+        let first_sort_expr = sort_order.first();
+
+        // Extract column name from sort expression
+        let column: &Column = match 
first_sort_expr.expr.downcast_ref::<Column>() {
+            Some(col) => col,
+            None => {
+                debug!("Skipping RG reorder: sort expr is not a simple 
column");
+                return Ok(self);
+            }
+        };
+
+        // Build statistics converter for this column
+        let converter = match StatisticsConverter::try_new(
+            column.name(),
+            arrow_schema,
+            file_metadata.file_metadata().schema_descr(),
+        ) {
+            Ok(c) => c,
+            Err(e) => {
+                debug!("Skipping RG reorder: cannot create stats converter: 
{e}");
+                return Ok(self);
+            }
+        };
+
+        // Always sort ASC by min values — direction is handled by reverse
+        let rg_metadata: Vec<&RowGroupMetaData> = self
+            .row_group_indexes
+            .iter()
+            .map(|&idx| file_metadata.row_group(idx))
+            .collect();
+
+        let stat_mins = match 
converter.row_group_mins(rg_metadata.iter().copied()) {
+            Ok(vals) => vals,
+            Err(e) => {
+                debug!("Skipping RG reorder: cannot get min values: {e}");
+                return Ok(self);
+            }
+        };
+        let stat_maxes = match 
converter.row_group_maxes(rg_metadata.iter().copied()) {
+            Ok(vals) => vals,
+            Err(e) => {
+                debug!("Skipping RG reorder: cannot get max values: {e}");
+                return Ok(self);
+            }
+        };
+
+        let sort_options = arrow::compute::SortOptions {
+            descending: false,
+            nulls_first: first_sort_expr.options.nulls_first,
+        };
+        let sorted_indices =
+            match arrow::compute::sort_to_indices(&stat_mins, 
Some(sort_options), None) {
+                Ok(indices) => indices,
+                Err(e) => {
+                    debug!("Skipping RG reorder: sort failed: {e}");

Review Comment:
   Is this expected to happen normally? If not maybe this should be an `info` 
or a `warn`. Maybe we even add a debug assertion so it fails in our CI?



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1123,13 +1129,107 @@ impl RowGroupsPrunedParquetOpen {
             );
         }
 
-        // Prepare the access plan (extract row groups and row selection)
-        let mut prepared_plan = access_plan.prepare(rg_metadata)?;
+        // Row group ordering optimization (two composable steps):
+        //
+        // 1. reorder_by_statistics: sort RGs by min values (ASC) to align
+        //    with the file's declared output ordering. This fixes out-of-order
+        //    RGs (e.g., from append-heavy workloads) without changing 
direction.
+        //    Skipped gracefully when statistics are unavailable.
+        //
+        // 2. reverse: flip the order for DESC queries. Applied AFTER reorder
+        //    so the reversed order is correct whether or not reorder changed
+        //    anything. Also handles row_selection remapping.
+        //
+        // For sorted data: reorder is a no-op, reverse gives perfect DESC.
+        // For unsorted data: reorder fixes the order, reverse flips for DESC.
+        let reorder_optimizer: Option<
+            Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
+        > = if let Some(sort_order) = &prepared.sort_order_for_reorder {
+            Some(
+                
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
+                    sort_order.clone(),
+                ))
+                    as Box<dyn 
crate::access_plan_optimizer::AccessPlanOptimizer>,
+            )
+        } else if let Some(predicate) = &prepared.predicate
+            && let Some(df) = find_dynamic_filter(predicate)
+            && let Some(sort_options) = df.sort_options()
+            && !sort_options.is_empty()
+        {
+            // Build a sort order from DynamicFilter for non-sort-pushdown 
TopK.
+            // Quick bail: check if the sort column exists in file schema.

Review Comment:
   Why is this needed? It seems kinda hacky / a side channel when we already 
have sort pushdown APIs. What is "non-sort-pushdown TopK"?



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1614,6 +1714,39 @@ async fn load_page_index<T: AsyncFileReader>(
     }
 }
 
+/// Find a `DynamicFilterPhysicalExpr` in the expression tree.
+fn find_dynamic_filter(

Review Comment:
   Is it safe to just blindly recurse like this? What if the expression is 
something like `NOT (dynamic_filter)` or whatever?



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1123,13 +1129,107 @@ impl RowGroupsPrunedParquetOpen {
             );
         }
 
-        // Prepare the access plan (extract row groups and row selection)
-        let mut prepared_plan = access_plan.prepare(rg_metadata)?;
+        // Row group ordering optimization (two composable steps):
+        //
+        // 1. reorder_by_statistics: sort RGs by min values (ASC) to align
+        //    with the file's declared output ordering. This fixes out-of-order
+        //    RGs (e.g., from append-heavy workloads) without changing 
direction.
+        //    Skipped gracefully when statistics are unavailable.
+        //
+        // 2. reverse: flip the order for DESC queries. Applied AFTER reorder
+        //    so the reversed order is correct whether or not reorder changed
+        //    anything. Also handles row_selection remapping.
+        //
+        // For sorted data: reorder is a no-op, reverse gives perfect DESC.
+        // For unsorted data: reorder fixes the order, reverse flips for DESC.
+        let reorder_optimizer: Option<
+            Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
+        > = if let Some(sort_order) = &prepared.sort_order_for_reorder {
+            Some(
+                
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
+                    sort_order.clone(),
+                ))
+                    as Box<dyn 
crate::access_plan_optimizer::AccessPlanOptimizer>,
+            )
+        } else if let Some(predicate) = &prepared.predicate
+            && let Some(df) = find_dynamic_filter(predicate)
+            && let Some(sort_options) = df.sort_options()
+            && !sort_options.is_empty()
+        {
+            // Build a sort order from DynamicFilter for non-sort-pushdown 
TopK.
+            // Quick bail: check if the sort column exists in file schema.
+            let children = df.children();
+            if !children.is_empty() {
+                let col = find_column_in_expr(children[0]);
+                if let Some(c) = col
+                    && prepared
+                        .physical_file_schema
+                        .field_with_name(c.name())
+                        .is_ok()
+                {
+                    // Use the unwrapped Column (not the original expr which
+                    // may be wrapped in Cast etc.) so reorder_by_statistics
+                    // can extract the column name for StatisticsConverter.
+                    let sort_expr =
+                        
datafusion_physical_expr_common::sort_expr::PhysicalSortExpr {
+                            expr: Arc::new(c.clone()),
+                            options: arrow::compute::SortOptions {
+                                descending: false,
+                                nulls_first: sort_options[0].nulls_first,
+                            },
+                        };
+                    LexOrdering::new(vec![sort_expr]).map(|order| {
+                        
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
+                            order,
+                        ))
+                            as Box<dyn 
crate::access_plan_optimizer::AccessPlanOptimizer>
+                    })
+                } else {
+                    None
+                }
+            } else {
+                None
+            }
+        } else {
+            None
+        };
 
-        // Potentially reverse the access plan for performance.
-        // See `ParquetSource::try_pushdown_sort` for the rationale.
-        if prepared.reverse_row_groups {
-            prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
+        // Reverse for DESC queries. Only when reorder is active (the sort
+        // column exists in parquet stats). Without reorder, reversing RGs
+        // randomly changes I/O patterns with no benefit.
+        let is_descending = prepared.reverse_row_groups
+            || (reorder_optimizer.is_some()
+                && prepared
+                    .predicate
+                    .as_ref()
+                    .and_then(find_dynamic_filter)
+                    .and_then(|df| df.sort_options().map(|opts| 
opts[0].descending))
+                    .unwrap_or(false));
+        let reverse_optimizer: Option<
+            Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
+        > = if is_descending {
+            Some(Box::new(crate::access_plan_optimizer::ReverseRowGroups))
+        } else {
+            None
+        };

Review Comment:
   I'm not convinced by this abstraction. The implementations are trivial, it's 
more code overall and it also obscures the code execution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to