This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push: new ceff6cb44 Orthogonalize distribution and sort enforcement rules into `EnforceDistribution` and `EnforceSorting` (#4839) ceff6cb44 is described below commit ceff6cb44ad621f89c9c4e9c0bd34cb204246910 Author: Mustafa akur <106137913+mustafasr...@users.noreply.github.com> AuthorDate: Tue Jan 10 01:10:43 2023 +0300 Orthogonalize distribution and sort enforcement rules into `EnforceDistribution` and `EnforceSorting` (#4839) * Separate sort rule * Migrate to clearer file name, tidy up comments * Add a note about tests verifying EnforceDistribution/EnforceSorting jointly * Address review, fix the stale comment Co-authored-by: Mehmet Ozan Kabak <ozanka...@gmail.com> --- datafusion/core/src/execution/context.rs | 39 ++++++------ .../{enforcement.rs => dist_enforcement.rs} | 73 +++++++++------------- datafusion/core/src/physical_optimizer/mod.rs | 4 +- .../core/src/physical_optimizer/repartition.rs | 10 ++- .../{optimize_sorts.rs => sort_enforcement.rs} | 41 ++++++------ datafusion/core/src/physical_plan/limit.rs | 4 ++ datafusion/expr/src/logical_plan/builder.rs | 3 +- 7 files changed, 87 insertions(+), 87 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 2687902a3..98fe6ff79 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -68,7 +68,7 @@ use crate::physical_optimizer::repartition::Repartition; use crate::config::ConfigOptions; use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry}; -use crate::physical_optimizer::enforcement::BasicEnforcement; +use crate::physical_optimizer::dist_enforcement::EnforceDistribution; use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet}; use crate::physical_plan::planner::DefaultPhysicalPlanner; use crate::physical_plan::udaf::AggregateUDF; @@ -91,9 +91,9 @@ use crate::catalog::listing_schema::ListingSchemaProvider; use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::memory_pool::MemoryPool; use crate::physical_optimizer::global_sort_selection::GlobalSortSelection; -use crate::physical_optimizer::optimize_sorts::OptimizeSorts; use crate::physical_optimizer::pipeline_checker::PipelineChecker; use crate::physical_optimizer::pipeline_fixer::PipelineFixer; +use crate::physical_optimizer::sort_enforcement::EnforceSorting; use datafusion_optimizer::OptimizerConfig; use datafusion_sql::planner::object_name_to_table_reference; use uuid::Uuid; @@ -1448,37 +1448,36 @@ impl SessionState { // output partitioning of some operators in the plan tree, which will influence // other rules. Therefore, it should run as soon as possible. It is optional because: // - It's not used for the distributed engine, Ballista. - // - It's conflicted with some parts of the BasicEnforcement, since it will - // introduce additional repartitioning while the BasicEnforcement aims at - // reducing unnecessary repartitioning. + // - It's conflicted with some parts of the EnforceDistribution, since it will + // introduce additional repartitioning while EnforceDistribution aims to + // reduce unnecessary repartitioning. Arc::new(Repartition::new()), // - Currently it will depend on the partition number to decide whether to change the // single node sort to parallel local sort and merge. Therefore, GlobalSortSelection // should run after the Repartition. // - Since it will change the output ordering of some operators, it should run - // before JoinSelection and BasicEnforcement, which may depend on that. + // before JoinSelection and EnforceSorting, which may depend on that. Arc::new(GlobalSortSelection::new()), // Statistics-based join selection will change the Auto mode to a real join implementation, - // like collect left, or hash join, or future sort merge join, which will - // influence the BasicEnforcement to decide whether to add additional repartition - // and local sort to meet the distribution and ordering requirements. - // Therefore, it should run before BasicEnforcement. + // like collect left, or hash join, or future sort merge join, which will influence the + // EnforceDistribution and EnforceSorting rules as they decide whether to add additional + // repartitioning and local sorting steps to meet distribution and ordering requirements. + // Therefore, it should run before EnforceDistribution and EnforceSorting. Arc::new(JoinSelection::new()), // If the query is processing infinite inputs, the PipelineFixer rule applies the // necessary transformations to make the query runnable (if it is not already runnable). // If the query can not be made runnable, the rule emits an error with a diagnostic message. // Since the transformations it applies may alter output partitioning properties of operators - // (e.g. by swapping hash join sides), this rule runs before BasicEnforcement. + // (e.g. by swapping hash join sides), this rule runs before EnforceDistribution. Arc::new(PipelineFixer::new()), - // BasicEnforcement is for adding essential repartition and local sorting operators - // to satisfy the required distribution and local sort requirements. - // Please make sure that the whole plan tree is determined. - Arc::new(BasicEnforcement::new()), - // The BasicEnforcement stage conservatively inserts sorts to satisfy ordering requirements. - // However, a deeper analysis may sometimes reveal that such a sort is actually unnecessary. - // These cases typically arise when we have reversible window expressions or deep subqueries. - // The rule below performs this analysis and removes unnecessary sorts. - Arc::new(OptimizeSorts::new()), + // The EnforceDistribution rule is for adding essential repartition to satisfy the required + // distribution. Please make sure that the whole plan tree is determined before this rule. + Arc::new(EnforceDistribution::new()), + // The EnforceSorting rule is for adding essential local sorting to satisfy the required + // ordering. Please make sure that the whole plan tree is determined before this rule. + // Note that one should always run this rule after running the EnforceDistribution rule + // as the latter may break local sorting requirements. + Arc::new(EnforceSorting::new()), // The CoalesceBatches rule will not influence the distribution and ordering of the // whole plan tree. Therefore, to avoid influencing other rules, it should run last. Arc::new(CoalesceBatches::new()), diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs similarity index 97% rename from datafusion/core/src/physical_optimizer/enforcement.rs rename to datafusion/core/src/physical_optimizer/dist_enforcement.rs index 0b8206a78..aa8b07569 100644 --- a/datafusion/core/src/physical_optimizer/enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -15,12 +15,11 @@ // specific language governing permissions and limitations // under the License. -//! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering -//! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]]. -//! +//! EnforceDistribution optimizer rule inspects the physical plan with respect +//! to distribution requirements and adds [RepartitionExec]s to satisfy them +//! when necessary. use crate::config::ConfigOptions; use crate::error::Result; -use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy}; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -46,8 +45,8 @@ use datafusion_physical_expr::{ use std::collections::HashMap; use std::sync::Arc; -/// BasicEnforcement rule, it ensures the Distribution and Ordering requirements are met -/// in the strictest way. It might add additional [[RepartitionExec]] to the plan tree +/// The EnforceDistribution rule ensures that distribution requirements are met +/// in the strictest way. It might add additional [RepartitionExec] to the plan tree /// and give a non-optimal plan, but it can avoid the possible data skew in joins. /// /// For example for a HashJoin with keys(a, b, c), the required Distribution(a, b, c) can be satisfied by @@ -55,16 +54,16 @@ use std::sync::Arc; /// /// This rule only chooses the exactly match and satisfies the Distribution(a, b, c) by a HashPartition(a, b, c). #[derive(Default)] -pub struct BasicEnforcement {} +pub struct EnforceDistribution {} -impl BasicEnforcement { +impl EnforceDistribution { #[allow(missing_docs)] pub fn new() -> Self { Self {} } } -impl PhysicalOptimizerRule for BasicEnforcement { +impl PhysicalOptimizerRule for EnforceDistribution { fn optimize( &self, plan: Arc<dyn ExecutionPlan>, @@ -81,7 +80,7 @@ impl PhysicalOptimizerRule for BasicEnforcement { } else { plan }; - // Distribution and Ordering enforcement need to be applied bottom-up. + // Distribution enforcement needs to be applied bottom-up. new_plan.transform_up(&{ |plan| { let adjusted = if !top_down_join_key_reordering { @@ -89,16 +88,13 @@ impl PhysicalOptimizerRule for BasicEnforcement { } else { plan }; - Ok(Some(ensure_distribution_and_ordering( - adjusted, - target_partitions, - )?)) + Ok(Some(ensure_distribution(adjusted, target_partitions)?)) } }) } fn name(&self) -> &str { - "BasicEnforcement" + "EnforceDistribution" } fn schema_check(&self) -> bool { @@ -829,10 +825,11 @@ fn new_join_conditions( new_join_on } -/// Within this function, it checks whether we need to add additional plan operators -/// of data exchanging and data ordering to satisfy the required distribution and ordering. -/// And we should avoid to manually add plan operators of data exchanging and data ordering in other places -fn ensure_distribution_and_ordering( +/// This function checks whether we need to add additional data exchange +/// operators to satisfy distribution requirements. Since this function +/// takes care of such requirements, we should avoid manually adding data +/// exchange operators in other places. +fn ensure_distribution( plan: Arc<dyn crate::physical_plan::ExecutionPlan>, target_partitions: usize, ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> { @@ -841,13 +838,11 @@ fn ensure_distribution_and_ordering( } let required_input_distributions = plan.required_input_distribution(); - let required_input_orderings = plan.required_input_ordering(); let children: Vec<Arc<dyn ExecutionPlan>> = plan.children(); assert_eq!(children.len(), required_input_distributions.len()); - assert_eq!(children.len(), required_input_orderings.len()); // Add RepartitionExec to guarantee output partitioning - let children = children + let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children .into_iter() .zip(required_input_distributions.into_iter()) .map(|(child, required)| { @@ -870,24 +865,8 @@ fn ensure_distribution_and_ordering( }; new_child } - }); - - // Add local SortExec to guarantee output ordering within each partition - let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children - .zip(required_input_orderings.into_iter()) - .map(|(child_result, required)| { - let child = child_result?; - if ordering_satisfy(child.output_ordering(), required, || { - child.equivalence_properties() - }) { - Ok(child) - } else { - let sort_expr = required.unwrap().to_vec(); - add_sort_above_child(&child, sort_expr) - } }) .collect(); - with_new_children_if_necessary(plan, new_children?) } @@ -979,6 +958,7 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; + use crate::physical_optimizer::sort_enforcement::EnforceSorting; use crate::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -1136,8 +1116,15 @@ mod tests { config.execution.target_partitions = 10; // run optimizer - let optimizer = BasicEnforcement {}; + let optimizer = EnforceDistribution {}; let optimized = optimizer.optimize($PLAN, &config)?; + // NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade + // because they were written prior to the separation of `BasicEnforcement` into + // `EnforceSorting` and `EnfoceDistribution`. + // TODO: Orthogonalize the tests here just to verify `EnforceDistribution` and create + // new tests for the cascade. + let optimizer = EnforceSorting {}; + let optimized = optimizer.optimize(optimized, &config)?; // Now format correctly let plan = displayable(optimized.as_ref()).indent().to_string(); @@ -1656,7 +1643,7 @@ mod tests { Column::new_with_schema("c1", &right.schema()).unwrap(), ), ]; - let bottom_left_join = ensure_distribution_and_ordering( + let bottom_left_join = ensure_distribution( hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner), 10, )?; @@ -1686,7 +1673,7 @@ mod tests { Column::new_with_schema("a1", &right.schema()).unwrap(), ), ]; - let bottom_right_join = ensure_distribution_and_ordering( + let bottom_right_join = ensure_distribution( hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner), 10, )?; @@ -1775,7 +1762,7 @@ mod tests { Column::new_with_schema("b1", &right.schema()).unwrap(), ), ]; - let bottom_left_join = ensure_distribution_and_ordering( + let bottom_left_join = ensure_distribution( hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner), 10, )?; @@ -1805,7 +1792,7 @@ mod tests { Column::new_with_schema("a1", &right.schema()).unwrap(), ), ]; - let bottom_right_join = ensure_distribution_and_ordering( + let bottom_right_join = ensure_distribution( hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner), 10, )?; diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index fb07d54b9..3958a546a 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -20,14 +20,14 @@ pub mod aggregate_statistics; pub mod coalesce_batches; -pub mod enforcement; +pub mod dist_enforcement; pub mod global_sort_selection; pub mod join_selection; -pub mod optimize_sorts; pub mod optimizer; pub mod pipeline_checker; pub mod pruning; pub mod repartition; +pub mod sort_enforcement; mod utils; pub mod pipeline_fixer; diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 2044b2aaa..98ca12a9e 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -241,7 +241,8 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; - use crate::physical_optimizer::enforcement::BasicEnforcement; + use crate::physical_optimizer::dist_enforcement::EnforceDistribution; + use crate::physical_optimizer::sort_enforcement::EnforceSorting; use crate::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -370,9 +371,12 @@ mod tests { // run optimizer let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![ Arc::new(Repartition::new()), - // The `BasicEnforcement` is an essential rule to be applied. + // EnforceDistribution is an essential rule to be applied. // Otherwise, the correctness of the generated optimized plan cannot be guaranteed - Arc::new(BasicEnforcement::new()), + Arc::new(EnforceDistribution::new()), + // EnforceSorting is an essential rule to be applied. + // Otherwise, the correctness of the generated optimized plan cannot be guaranteed + Arc::new(EnforceSorting::new()), ]; let optimized = optimizers.into_iter().fold($PLAN, |plan, optimizer| { optimizer.optimize(plan, &config).unwrap() diff --git a/datafusion/core/src/physical_optimizer/optimize_sorts.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs similarity index 96% rename from datafusion/core/src/physical_optimizer/optimize_sorts.rs rename to datafusion/core/src/physical_optimizer/sort_enforcement.rs index 17b27bfa7..52463b4bd 100644 --- a/datafusion/core/src/physical_optimizer/optimize_sorts.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -15,12 +15,16 @@ // specific language governing permissions and limitations // under the License. -//! OptimizeSorts optimizer rule inspects [SortExec]s in the given physical -//! plan and removes the ones it can prove unnecessary. The rule can work on -//! valid *and* invalid physical plans with respect to sorting requirements, -//! but always produces a valid physical plan in this sense. +//! EnforceSorting optimizer rule inspects the physical plan with respect +//! to local sorting requirements and does the following: +//! - Adds a [SortExec] when a requirement is not met, +//! - Removes an already-existing [SortExec] if it is possible to prove +//! that this sort is unnecessary +//! The rule can work on valid *and* invalid physical plans with respect to +//! sorting requirements, but always produces a valid physical plan in this sense. //! -//! A non-realistic but easy to follow example: Assume that we somehow get the fragment +//! A non-realistic but easy to follow example for sort removals: Assume that we +//! somehow get the fragment //! "SortExec: [nullable_col@0 ASC]", //! " SortExec: [non_nullable_col@1 ASC]", //! in the physical plan. The first sort is unnecessary since its result is overwritten @@ -46,16 +50,16 @@ use std::sync::Arc; /// This rule inspects SortExec's in the given physical plan and removes the /// ones it can prove unnecessary. #[derive(Default)] -pub struct OptimizeSorts {} +pub struct EnforceSorting {} -impl OptimizeSorts { +impl EnforceSorting { #[allow(missing_docs)] pub fn new() -> Self { Self {} } } -/// This is a "data class" we use within the [OptimizeSorts] rule that +/// This is a "data class" we use within the [EnforceSorting] rule that /// tracks the closest `SortExec` descendant for every child of a plan. #[derive(Debug, Clone)] struct PlanWithCorrespondingSort { @@ -119,7 +123,7 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort { } } -impl PhysicalOptimizerRule for OptimizeSorts { +impl PhysicalOptimizerRule for EnforceSorting { fn optimize( &self, plan: Arc<dyn ExecutionPlan>, @@ -127,12 +131,12 @@ impl PhysicalOptimizerRule for OptimizeSorts { ) -> Result<Arc<dyn ExecutionPlan>> { // Execute a post-order traversal to adjust input key ordering: let plan_requirements = PlanWithCorrespondingSort::new(plan); - let adjusted = plan_requirements.transform_up(&optimize_sorts)?; + let adjusted = plan_requirements.transform_up(&ensure_sorting)?; Ok(adjusted.plan) } fn name(&self) -> &str { - "OptimizeSorts" + "EnforceSorting" } fn schema_check(&self) -> bool { @@ -140,7 +144,7 @@ impl PhysicalOptimizerRule for OptimizeSorts { } } -fn optimize_sorts( +fn ensure_sorting( requirements: PlanWithCorrespondingSort, ) -> Result<Option<PlanWithCorrespondingSort>> { // Perform naive analysis at the beginning -- remove already-satisfied sorts: @@ -171,7 +175,8 @@ fn optimize_sorts( let sort_expr = required_ordering.to_vec(); *child = add_sort_above_child(child, sort_expr)?; sort_onwards.push((idx, child.clone())) - } else if let [first, ..] = sort_onwards.as_slice() { + } + if let [first, ..] = sort_onwards.as_slice() { // The ordering requirement is met, we can analyze if there is an unnecessary sort: let sort_any = first.1.clone(); let sort_exec = convert_to_sort_exec(&sort_any)?; @@ -618,7 +623,7 @@ mod tests { "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, state.config_options())?; + EnforceSorting::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); @@ -717,7 +722,7 @@ mod tests { "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, state.config_options())?; + EnforceSorting::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); @@ -761,7 +766,7 @@ mod tests { "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, state.config_options())?; + EnforceSorting::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); @@ -826,7 +831,7 @@ mod tests { "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, state.config_options())?; + EnforceSorting::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); @@ -886,7 +891,7 @@ mod tests { "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, state.config_options())?; + EnforceSorting::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index 3a3a4a20b..776fefd8b 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -287,6 +287,10 @@ impl ExecutionPlan for LocalLimitExec { self.input.output_ordering() } + fn maintains_input_order(&self) -> bool { + true + } + fn equivalence_properties(&self) -> EquivalenceProperties { self.input.equivalence_properties() } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 428a31f14..d5e5257ea 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -273,7 +273,8 @@ impl LogicalPlanBuilder { }); for (_, exprs) in groups { let window_exprs = exprs.into_iter().cloned().collect::<Vec<_>>(); - // the partition and sort itself is done at physical level, see the BasicEnforcement rule + // Partition and sorting is done at physical level, see the EnforceDistribution + // and EnforceSorting rules. plan = LogicalPlanBuilder::from(plan) .window(window_exprs)? .build()?;