xudong963 commented on code in PR #21580:
URL: https://github.com/apache/datafusion/pull/21580#discussion_r3123373198
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -952,14 +962,16 @@ impl SortExec {
if fetch.is_some() && is_pipeline_friendly {
cache = cache.with_boundedness(Boundedness::Bounded);
}
- let filter = fetch.is_some().then(|| {
- // If we already have a filter, keep it. Otherwise, create a new
one.
- self.filter.clone().unwrap_or_else(|| self.create_filter())
- });
let mut new_sort = self.cloned();
new_sort.fetch = fetch;
new_sort.cache = cache.into();
- new_sort.filter = filter;
+ new_sort.filter = fetch.is_some().then(|| {
+ // If we already have a filter, keep it. Otherwise, create a new
one.
+ // Must be called after setting fetch so DynamicFilter gets the K
value.
+ self.filter
Review Comment:
`SortExec::with_fetch` can leave the embedded dynamic filter’s fetch stale.
The new parquet optimizations read df.fetch() for threshold init and cumulative
prune, so if any later optimizer rewrites fetch through with_fetch, parquet may
prune using the old K
##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -377,6 +381,124 @@ impl PreparedAccessPlan {
})
}
+ /// Return a reference to the row group indexes.
+ pub(crate) fn row_group_indexes(&self) -> &[usize] {
+ &self.row_group_indexes
+ }
+
+ /// Keep only the first `count` row groups, dropping the rest.
+ /// Used for TopK cumulative pruning after reorder + reverse.
+ pub(crate) fn truncate_row_groups(mut self, count: usize) -> Self {
+ self.row_group_indexes.truncate(count);
+ // Clear row_selection since it's tied to the original RG set
+ if self.row_selection.is_some() {
+ self.row_selection = None;
+ }
+ self
+ }
+
+ /// Reorder row groups by their min statistics for the given sort order.
+ ///
+ /// This helps TopK queries find optimal values first. For ASC sort,
+ /// row groups with the smallest min values come first. For DESC sort,
+ /// row groups with the largest min values come first.
+ ///
+ /// Gracefully skips reordering when:
+ /// - There is a row_selection (too complex to remap)
+ /// - 0 or 1 row groups (nothing to reorder)
+ /// - Sort expression is not a simple column reference
+ /// - Statistics are unavailable
+ pub(crate) fn reorder_by_statistics(
+ mut self,
+ sort_order: &LexOrdering,
+ file_metadata: &ParquetMetaData,
+ arrow_schema: &Schema,
+ ) -> Result<Self> {
+ // Skip if row_selection present (too complex to remap)
+ if self.row_selection.is_some() {
+ debug!("Skipping RG reorder: row_selection present");
+ return Ok(self);
+ }
+
+ // Nothing to reorder
+ if self.row_group_indexes.len() <= 1 {
+ return Ok(self);
+ }
+
+ // Get the first sort expression
+ // LexOrdering is guaranteed non-empty, so first() returns
&PhysicalSortExpr
+ let first_sort_expr = sort_order.first();
Review Comment:
what about multi-key ORDER BY?
##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -377,6 +381,124 @@ impl PreparedAccessPlan {
})
}
+ /// Return a reference to the row group indexes.
+ pub(crate) fn row_group_indexes(&self) -> &[usize] {
+ &self.row_group_indexes
+ }
+
+ /// Keep only the first `count` row groups, dropping the rest.
+ /// Used for TopK cumulative pruning after reorder + reverse.
+ pub(crate) fn truncate_row_groups(mut self, count: usize) -> Self {
Review Comment:
it drops existing row_selection entirely after truncation, which can widen
the scan back to full row groups or discard exact page-level pruning state
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1123,13 +1166,157 @@ impl RowGroupsPrunedParquetOpen {
);
}
- // Prepare the access plan (extract row groups and row selection)
+ // Row group ordering optimization (two composable steps):
+ //
+ // 1. reorder_by_statistics: sort RGs by min values (ASC) to align
+ // with the file's declared output ordering. This fixes out-of-order
+ // RGs (e.g., from append-heavy workloads) without changing
direction.
+ // Skipped gracefully when statistics are unavailable.
+ //
+ // 2. reverse: flip the order for DESC queries. Applied AFTER reorder
+ // so the reversed order is correct whether or not reorder changed
+ // anything. Also handles row_selection remapping.
+ //
+ // For sorted data: reorder is a no-op, reverse gives perfect DESC.
+ // For unsorted data: reorder fixes the order, reverse flips for DESC.
+ // Build reorder optimizer from sort_order_for_reorder (Inexact path)
+ // or from DynamicFilterPhysicalExpr sort_options (any TopK query).
+ // Fuzz test uses tiebreaker columns so reorder is safe for all TopK.
+ let reorder_optimizer: Option<
+ Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
+ > = if let Some(sort_order) = &prepared.sort_order_for_reorder {
+ Some(
+
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
+ sort_order.clone(),
+ ))
+ as Box<dyn
crate::access_plan_optimizer::AccessPlanOptimizer>,
+ )
+ } else if let Some(predicate) = &prepared.predicate
+ && let Some(df) = find_dynamic_filter(predicate)
+ && let Some(sort_options) = df.sort_options()
+ && sort_options.len() == 1
+ {
+ // Build a sort order from DynamicFilter for non-sort-pushdown
TopK.
+ // Quick bail: check if the sort column exists in file schema.
+ // For GROUP BY + ORDER BY, the sort column is an aggregate output
+ // (not in parquet) — skip to avoid wasted StatisticsConverter
work.
+ let children = df.children();
+ if !children.is_empty() {
+ let col = find_column_in_expr(children[0]);
+ if let Some(ref c) = col
+ && prepared
+ .physical_file_schema
+ .field_with_name(c.name())
+ .is_ok()
+ {
+ let sort_expr =
+
datafusion_physical_expr_common::sort_expr::PhysicalSortExpr {
+ expr: Arc::clone(children[0]),
+ options: arrow::compute::SortOptions {
+ descending: false,
+ nulls_first: sort_options[0].nulls_first,
+ },
+ };
+ LexOrdering::new(vec![sort_expr]).map(|order| {
+
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
+ order,
+ ))
+ as Box<dyn
crate::access_plan_optimizer::AccessPlanOptimizer>
+ })
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+
+ // Reverse for DESC queries. Only when reorder is active (the sort
+ // column exists in parquet stats). Without reorder, reversing RGs
+ // randomly changes I/O patterns with no benefit.
+ let is_descending = prepared.reverse_row_groups
+ || (reorder_optimizer.is_some()
+ && prepared
+ .predicate
+ .as_ref()
+ .and_then(find_dynamic_filter)
+ .and_then(|df| df.sort_options().map(|opts|
opts[0].descending))
+ .unwrap_or(false));
+ let reverse_optimizer: Option<
+ Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
+ > = if is_descending {
+ Some(Box::new(crate::access_plan_optimizer::ReverseRowGroups))
+ } else {
+ None
+ };
+
+ // Prepare the access plan and apply optimizers in order:
+ // 1. reorder (fix out-of-order RGs to match declared ordering)
+ // 2. reverse (flip for DESC queries)
let mut prepared_plan = access_plan.prepare(rg_metadata)?;
+ if let Some(opt) = &reorder_optimizer {
+ prepared_plan = opt.optimize(
+ prepared_plan,
+ file_metadata.as_ref(),
+ &prepared.physical_file_schema,
+ )?;
+ }
+ if let Some(opt) = &reverse_optimizer {
+ prepared_plan = opt.optimize(
+ prepared_plan,
+ file_metadata.as_ref(),
+ &prepared.physical_file_schema,
+ )?;
+ }
- // Potentially reverse the access plan for performance.
- // See `ParquetSource::try_pushdown_sort` for the rationale.
- if prepared.reverse_row_groups {
- prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
+ // TopK cumulative pruning: after reorder + reverse, the RGs are in
+ // optimal order. Accumulate rows from the front until >= K, prune
rest.
+ //
+ // Only safe when predicate is DynamicFilter-only (no WHERE clause).
+ // With WHERE, raw num_rows overestimates qualifying rows — cumulative
+ // prune may keep too few RGs, returning fewer than K results.
+ //
+ // Additionally requires either sort pushdown (guaranteed
non-overlapping)
+ // or verified non-overlap from statistics.
+ let is_pure_dynamic_filter =
prepared.predicate.as_ref().is_some_and(|p| {
+ let any_ref: &dyn std::any::Any = p.as_ref();
+ any_ref
+ .downcast_ref::<DynamicFilterPhysicalExpr>()
+ .is_some()
+ });
+ let has_sort_pushdown = prepared.sort_order_for_reorder.is_some();
+ if is_pure_dynamic_filter
+ && let Some(predicate) = &prepared.predicate
+ && let Some(df) = find_dynamic_filter(predicate)
+ && let Some(fetch) = df.fetch()
+ && (has_sort_pushdown
+ || rgs_are_non_overlapping(
+ &prepared_plan,
+ file_metadata.as_ref(),
+ &prepared.physical_file_schema,
+ &df,
+ ))
+ {
+ let rg_indexes = prepared_plan.row_group_indexes();
+ let mut cumulative = 0usize;
+ let mut keep_count = 0;
+ for &idx in rg_indexes {
+ cumulative += file_metadata.row_group(idx).num_rows() as usize;
Review Comment:
The parquet opener first turns the dynamic filter into a pushed-down
row_filter / row_selection, but later the cumulative cutoff still sums raw
`row_group.num_rows()` and truncates once that raw count reaches fetch. That is
unsafe when early RGs contain many rows, but few rows survive the filter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]