xudong963 commented on code in PR #18817: URL: https://github.com/apache/datafusion/pull/18817#discussion_r2559080033
########## datafusion/physical-optimizer/src/pushdown_sort.rs: ########## @@ -0,0 +1,1235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Sort Pushdown Optimization +//! +//! This optimizer attempts to push sort requirements down to data sources that can +//! satisfy them natively, avoiding expensive sort operations. +//! +//! Currently supported optimizations: +//! - **Reverse scan**: If a data source naturally produces data in DESC order and +//! we need ASC (or vice versa), we can reverse the scan direction instead of +//! adding a SortExec node. +//! +//! Future optimizations could include: +//! - Reordering row groups in Parquet files +//! - Leveraging native indexes +//! - Reordering files in multi-file scans +use crate::PhysicalOptimizerRule; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::Result; +use datafusion_datasource::source::DataSourceExec; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; +use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use std::sync::Arc; + +/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources +/// that can natively handle them (e.g., by reversing scan direction). +/// +/// This optimization: +/// 1. Detects SortExec nodes that require a specific ordering +/// 2. Checks if the input can satisfy the ordering by reversing its scan direction +/// 3. Pushes the sort requirement down to the data source when possible +/// 4. Removes unnecessary sort operations when the input already satisfies the requirement +#[derive(Debug, Clone, Default)] +pub struct PushdownSort; + +impl PushdownSort { + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for PushdownSort { + fn optimize( + &self, + plan: Arc<dyn ExecutionPlan>, + config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + // Check if sort pushdown optimization is enabled + let enable_sort_pushdown = config.execution.parquet.enable_sort_pushdown; + + // Return early if not enabled + if !enable_sort_pushdown { + return Ok(plan); + } + + // Search for any SortExec nodes and try to optimize them + plan.transform_up(&|plan: Arc<dyn ExecutionPlan>| { + // First check if this is a GlobalLimitExec -> SortExec pattern + if let Some(limit_exec) = plan.as_any().downcast_ref::<GlobalLimitExec>() { + if let Some(sort_exec) = + limit_exec.input().as_any().downcast_ref::<SortExec>() + { + return optimize_limit_sort(limit_exec, sort_exec); + } + } + + // Otherwise, check if this is just a SortExec + let sort_exec = match plan.as_any().downcast_ref::<SortExec>() { + Some(sort_exec) => sort_exec, + None => return Ok(Transformed::no(plan)), + }; + + optimize_sort(sort_exec) + }) + .data() + } + + fn name(&self) -> &str { + "PushdownSort" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// Optimize a SortExec by potentially pushing the sort down to the data source +fn optimize_sort(sort_exec: &SortExec) -> Result<Transformed<Arc<dyn ExecutionPlan>>> { + let sort_input = Arc::clone(sort_exec.input()); + let required_ordering = sort_exec.expr(); + + // First, check if the sort is already satisfied by input ordering + if let Some(_input_ordering) = sort_input.output_ordering() { + let input_eq_properties = sort_input.equivalence_properties(); + + if input_eq_properties.ordering_satisfy(required_ordering.clone())? { + return remove_unnecessary_sort(sort_exec, sort_input); + } + } + + // Try to push the sort requirement down to the data source + if let Some(optimized_input) = try_pushdown_sort(&sort_input, required_ordering)? { + // Verify that the optimized input satisfies the required ordering + if optimized_input + .equivalence_properties() + .ordering_satisfy(required_ordering.clone())? + { + return remove_unnecessary_sort(sort_exec, optimized_input); + } + + // If not fully satisfied, keep the sort but with optimized input + return Ok(Transformed::yes(Arc::new( + SortExec::new(required_ordering.clone(), optimized_input) + .with_fetch(sort_exec.fetch()) + .with_preserve_partitioning(sort_exec.preserve_partitioning()), + ))); + } + + Ok(Transformed::no(Arc::new(sort_exec.clone()))) +} + +/// Handle the GlobalLimitExec -> SortExec pattern +fn optimize_limit_sort( + limit_exec: &GlobalLimitExec, + sort_exec: &SortExec, +) -> Result<Transformed<Arc<dyn ExecutionPlan>>> { + let sort_input = Arc::clone(sort_exec.input()); + let required_ordering = sort_exec.expr(); + + // Check if input is already sorted + if let Some(_input_ordering) = sort_input.output_ordering() { + let input_eq_properties = sort_input.equivalence_properties(); + if input_eq_properties.ordering_satisfy(required_ordering.clone())? { + // Input is already sorted correctly, remove sort and keep limit + return Ok(Transformed::yes(Arc::new(GlobalLimitExec::new( + sort_input, + limit_exec.skip(), + limit_exec.fetch(), + )))); + } + } + + // Try to push down the sort requirement + if let Some(optimized_input) = try_pushdown_sort(&sort_input, required_ordering)? { + if optimized_input + .equivalence_properties() + .ordering_satisfy(required_ordering.clone())? + { + // Successfully pushed down sort, now handle the limit + let total_fetch = limit_exec.skip() + limit_exec.fetch().unwrap_or(0); + + // Try to push limit down as well if the source supports it Review Comment: I think the current `limit_pushdown` physical optimizer rule can do this. So do we still need to distinguish the sort and limit + sort pattern? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
