alamb commented on code in PR #21976: URL: https://github.com/apache/datafusion/pull/21976#discussion_r3214790498
########## datafusion/physical-optimizer/src/ensure_requirements/new_tests.rs: ########## @@ -0,0 +1,782 @@ +// Licensed to the Apache Software Foundation (ASF) under one Review Comment: Why is this a separate module? If we want tests in a separate module, I think putting it in an integration test might be easier to find (datafusion/physical-optimizer/tests/ensure_requirements.rs perhaps?) ########## datafusion/physical-optimizer/src/ensure_requirements/mod.rs: ########## @@ -0,0 +1,1804 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`EnsureRequirements`] optimizer rule that enforces both distribution and +//! sorting requirements in a **single bottom-up pass**. +//! +//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting` +//! rules with a unified approach inspired by Apache Spark's `EnsureRequirements` +//! and Presto/Trino's `AddExchanges`. +//! +//! # Motivation +//! +//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`) +//! suffers from non-idempotent composition: `EnforceSorting`'s `pushdown_sorts` +//! can break distribution invariants established by `EnforceDistribution`, +//! because `SortExec.preserve_partitioning` couples sorting and distribution +//! decisions. See <https://github.com/apache/datafusion/issues/21973> for details. +//! +//! # Architecture +//! +//! ```text +//! EnsureRequirements::optimize(plan) +//! │ +//! ├─ Phase 1 (optional): reorder_join_keys (top-down) +//! │ └─ Same as existing adjust_input_keys_ordering +//! │ +//! └─ Phase 2: ensure_requirements (single bottom-up pass) +//! └─ For each node (bottom-up), for each child: +//! Step 1: Ensure distribution requirement +//! └─ Add RepartitionExec / CoalescePartitionsExec / SortPreservingMergeExec +//! Step 2: Ensure ordering requirement (distribution-aware) +//! └─ Add SortExec with correct preserve_partitioning + SPM if needed +//! ``` +//! +//! # Key Properties +//! +//! - **Idempotent**: Running the rule twice produces the same plan. +//! - **Distribution before sorting**: For each child, distribution is resolved +//! before ordering, so sorting decisions always have full distribution context. +//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the bottom-up +//! pass only adds `SortExec` where the child doesn't already satisfy the +//! ordering requirement, naturally placing sorts at the deepest valid position. + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; + +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_physical_plan::ExecutionPlan; + +// Internal functions used directly instead of calling EnforceDistribution/EnforceSorting +// as opaque boxes. This gives us control over the pass ordering and enables +// future merging into a true single-pass architecture. + +// For the no-pushdown variant (Phase 3) +use crate::enforce_sorting::replace_with_order_preserving_variants::{ + OrderPreservationContext, replace_with_order_preserving_variants, +}; +use crate::enforce_sorting::{ + PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort, ensure_sorting, + parallelize_sorts, replace_with_partial_sort, +}; + +/// Optimizer rule that enforces both distribution and sorting requirements. +/// +/// This rule combines the functionality of `EnforceDistribution` and +/// `EnforceSorting` into a coordinated sequence where distribution is +/// always settled before sorting for each operator, preventing the +/// non-idempotent interactions between the two separate rules. +/// +/// See [module level documentation](self) for more details. +#[derive(Default, Debug)] +pub struct EnsureRequirements {} + +impl EnsureRequirements { + /// Create a new `EnsureRequirements` optimizer rule. + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for EnsureRequirements { + fn optimize( + &self, + plan: Arc<dyn ExecutionPlan>, + config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + // Phase 1: Join key reordering (top-down, from EnforceDistribution) + use crate::enforce_distribution::{ + PlanWithKeyRequirements, adjust_input_keys_ordering, + }; + let top_down_join_key_reordering = config.optimizer.top_down_join_key_reordering; + let plan = if top_down_join_key_reordering { + let ctx = PlanWithKeyRequirements::new_default(plan); + ctx.transform_down(adjust_input_keys_ordering).data()?.plan + } else { + use crate::enforce_distribution::reorder_join_keys_to_inputs; + plan.transform_up(|p| Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?))) + .data()? + }; + + // Phase 2: Combined distribution + sorting enforcement (single bottom-up pass) + // For each node: distribution first, then sorting. + use crate::enforce_distribution::{DistributionContext, ensure_distribution}; + use crate::enforce_sorting::{PlanWithCorrespondingSort, ensure_sorting}; + + // Step 2a: Distribution enforcement (bottom-up) + let dist_ctx = DistributionContext::new_default(plan); + let dist_ctx = dist_ctx + .transform_up(|ctx| ensure_distribution(ctx, config)) + .data()?; + + // Step 2b: Sorting enforcement (bottom-up) — runs on distribution-fixed plan + let sort_ctx = PlanWithCorrespondingSort::new_default(dist_ctx.plan); + let sort_ctx = sort_ctx.transform_up(ensure_sorting)?.data; + + // Phase 3: Optimization passes + // 3a: Parallelize sorts (Coalesce+Sort → SPM+Sort) + use crate::enforce_sorting::{ + PlanWithCorrespondingCoalescePartitions, parallelize_sorts, + replace_with_partial_sort, + }; + let plan = if config.optimizer.repartition_sorts { + let ctx = PlanWithCorrespondingCoalescePartitions::new_default(sort_ctx.plan); + ctx.transform_up(parallelize_sorts).data()?.plan + } else { + sort_ctx.plan + }; + + // 3b: Order-preserving variants + use crate::enforce_sorting::replace_with_order_preserving_variants::{ + OrderPreservationContext, replace_with_order_preserving_variants, + }; + let ctx = OrderPreservationContext::new_default(plan); + let plan = ctx + .transform_up(|c| { + replace_with_order_preserving_variants(c, false, true, config) + }) + .data()? + .plan; + + // 3c: Sort pushdown (distribution-aware) + use crate::enforce_sorting::sort_pushdown::{ + SortPushDown, assign_initial_requirements, pushdown_sorts, + }; + let mut sort_pushdown = SortPushDown::new_default(plan); + assign_initial_requirements(&mut sort_pushdown); + let adjusted = pushdown_sorts(sort_pushdown)?; + + // 3d: Partial sort + adjusted + .plan + .transform_up(|p| Ok(Transformed::yes(replace_with_partial_sort(p)?))) + .data() + } + + fn name(&self) -> &str { + "EnsureRequirements" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// Phase 3 variant: no `pushdown_sorts`, sort placement handled entirely +/// by bottom-up passes. Currently experimental — some plan shapes differ +/// from the `pushdown_sorts` variant (less optimal but still correct). +#[derive(Default, Debug)] +pub struct EnsureRequirementsNoPushdown {} + +impl EnsureRequirementsNoPushdown { + /// Create a new rule. + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for EnsureRequirementsNoPushdown { + fn optimize( + &self, + plan: Arc<dyn ExecutionPlan>, + config: &ConfigOptions, + ) -> Result<Arc<dyn ExecutionPlan>> { + // Step 1: Distribution enforcement + use crate::enforce_distribution::{ + DistributionContext as DistCtx, PlanWithKeyRequirements as KeyReqs, + adjust_input_keys_ordering as adj_keys, ensure_distribution as ensure_dist, + }; + let top_down = config.optimizer.top_down_join_key_reordering; + let plan = if top_down { + KeyReqs::new_default(plan) + .transform_down(adj_keys) + .data()? + .plan + } else { + use crate::enforce_distribution::reorder_join_keys_to_inputs; + plan.transform_up(|p| Ok(Transformed::yes(reorder_join_keys_to_inputs(p)?))) + .data()? + }; + let dist_ctx = DistCtx::new_default(plan); + let plan = dist_ctx + .transform_up(|ctx| ensure_dist(ctx, config)) + .data()? + .plan; + + // Step 2: ensure_sorting (bottom-up, NO pushdown_sorts) + let plan_requirements = PlanWithCorrespondingSort::new_default(plan); + let adjusted = plan_requirements.transform_up(ensure_sorting)?.data; + + // Step 3: parallelize_sorts (optional) + let plan = if config.optimizer.repartition_sorts { + let ctx = PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan); + ctx.transform_up(parallelize_sorts).data()?.plan + } else { + adjusted.plan + }; + + // Step 4: order-preserving variants + let ctx = OrderPreservationContext::new_default(plan); + let plan = ctx + .transform_up(|c| { + replace_with_order_preserving_variants(c, false, true, config) + }) + .data()? + .plan; + + // Step 5: partial sort + let plan = plan + .transform_up(|p| Ok(Transformed::yes(replace_with_partial_sort(p)?))) + .data()?; + + // NO pushdown_sorts — sort placement is purely bottom-up. + // Step 6: Final distribution enforcement + let dist_ctx2 = DistCtx::new_default(plan); + let plan = dist_ctx2 + .transform_up(|ctx| ensure_dist(ctx, config)) + .data()? + .plan; + + // Step 7: Fix any sorting violations the final distribution pass introduced. + let sort_ctx2 = PlanWithCorrespondingSort::new_default(plan); + let adjusted2 = sort_ctx2.transform_up(ensure_sorting)?.data; + + Ok(adjusted2.plan) + } + + fn name(&self) -> &str { + "EnsureRequirementsNoPushdown" + } + + fn schema_check(&self) -> bool { + true + } +} + +#[cfg(test)] +mod new_tests; + +#[cfg(test)] +mod tests { + use super::*; + + use std::sync::Arc; + + use arrow::compute::SortOptions; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::Result; + use datafusion_common::tree_node::TreeNodeRecursion; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr::{ + EquivalenceProperties, LexOrdering, PhysicalExpr, PhysicalSortExpr, + }; + use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; + use datafusion_physical_plan::limit::GlobalLimitExec; + use datafusion_physical_plan::sorts::sort::SortExec; + use datafusion_physical_plan::union::UnionExec; + use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, + Partitioning, PlanProperties, SendableRecordBatchStream, + }; + + use crate::enforce_distribution::EnforceDistribution; + use crate::output_requirements::OutputRequirementExec; + use crate::sanity_checker::SanityCheckPlan; + + use datafusion_common::{JoinType, NullEquality}; + use datafusion_physical_expr::Distribution; + use datafusion_physical_expr_common::sort_expr::OrderingRequirements; + use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; + use datafusion_physical_plan::joins::{ + HashJoinExec, PartitionMode, SortMergeJoinExec, + }; + use datafusion_physical_plan::projection::ProjectionExec; + use datafusion_physical_plan::repartition::RepartitionExec; + use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; + + /// Mock ExecutionPlan with configurable partition count and output ordering. + #[derive(Debug)] + struct MockMultiPartitionExec { + properties: Arc<PlanProperties>, + } + + impl MockMultiPartitionExec { + fn new(partition_count: usize) -> Self { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int64, false), + ])); + let mut eq = EquivalenceProperties::new(Arc::clone(&schema)); + if let Some(ordering) = LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions { + descending: false, + nulls_first: false, + }, + )]) { + eq.add_orderings(vec![ordering.into_iter().collect::<Vec<_>>()]); + } + let properties = PlanProperties::new( + eq, + Partitioning::UnknownPartitioning(partition_count), + EmissionType::Incremental, + Boundedness::Bounded, + ); + Self { + properties: Arc::new(properties), + } + } + } + + impl DisplayAs for MockMultiPartitionExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "MockMultiPartitionExec") + } + } + + impl ExecutionPlan for MockMultiPartitionExec { + fn name(&self) -> &str { + "MockMultiPartitionExec" + } + fn properties(&self) -> &Arc<PlanProperties> { + &self.properties + } + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![] + } + fn with_new_children( + self: Arc<Self>, + _children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(self) + } + fn apply_expressions( + &self, + _f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>, + ) -> Result<TreeNodeRecursion> { + Ok(TreeNodeRecursion::Continue) + } + fn execute( + &self, + _partition: usize, + _context: Arc<datafusion_execution::TaskContext>, + ) -> Result<SendableRecordBatchStream> { + unimplemented!() + } + } + + /// Helper: run EnsureRequirements and verify SanityCheckPlan passes + fn optimize_and_sanity_check( + plan: Arc<dyn ExecutionPlan>, + ) -> Result<Arc<dyn ExecutionPlan>> { + let config = ConfigOptions::default(); + let optimized = EnsureRequirements::new().optimize(plan, &config)?; + // SanityCheckPlan must pass + SanityCheckPlan::new().optimize(Arc::clone(&optimized), &config)?; + Ok(optimized) + } + + /// Helper: verify idempotency — running twice produces the same plan + fn assert_idempotent(plan: Arc<dyn ExecutionPlan>) { + let config = ConfigOptions::default(); + let p1 = EnsureRequirements::new() + .optimize(plan, &config) + .expect("first optimize failed"); + let p2 = EnsureRequirements::new() + .optimize(Arc::clone(&p1), &config) + .expect("second optimize failed"); + + let s1 = datafusion_physical_plan::displayable(p1.as_ref()) + .indent(true) + .to_string(); + let s2 = datafusion_physical_plan::displayable(p2.as_ref()) + .indent(true) + .to_string(); + assert_eq!( + s1, s2, + "EnsureRequirements is NOT idempotent!\nFirst:\n{s1}\nSecond:\n{s2}" + ); + + // Both must pass SanityCheckPlan + SanityCheckPlan::new() + .optimize(p1, &config) + .expect("SanityCheckPlan failed on first pass"); + SanityCheckPlan::new() + .optimize(p2, &config) + .expect("SanityCheckPlan failed on second pass"); + } + + /// Multi-partition sort + GlobalLimitExec must produce valid plan. + /// Regression test for UXX0/HRC 502s (April 2026). + #[test] + fn test_multi_partition_sort_limit_sanity_check() { + let source = Arc::new(MockMultiPartitionExec::new(32)); + + let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions { + descending: true, + nulls_first: true, + }, + )]) + .unwrap(); + + let sort = Arc::new(SortExec::new(sort_expr, source)); + let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(21))); + + let result = optimize_and_sanity_check(limit); + assert!(result.is_ok(), "SanityCheckPlan failed: {:?}", result.err()); + } + + /// Union with mixed partition counts + sort + limit. + #[test] + fn test_union_mixed_partitions_sort_limit() { + let live = Arc::new(MockMultiPartitionExec::new(32)); + let historical = Arc::new(MockMultiPartitionExec::new(1)); + + let union = UnionExec::try_new(vec![live as _, historical as _]).unwrap(); + + let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions { + descending: true, + nulls_first: true, + }, + )]) + .unwrap(); + + let sort = Arc::new(SortExec::new(sort_expr, union)); + let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(21))); + + let result = optimize_and_sanity_check(limit); + assert!(result.is_ok(), "SanityCheckPlan failed: {:?}", result.err()); + } + + /// Idempotency: multi-partition sort + limit + #[test] + fn test_idempotent_multi_partition_sort_limit() { + let source = Arc::new(MockMultiPartitionExec::new(16)); + + let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions { + descending: true, + nulls_first: true, + }, + )]) + .unwrap(); + + let sort = Arc::new(SortExec::new(sort_expr, source)); + let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(10))); + + assert_idempotent(limit); + } + + /// Idempotency: union with mixed partitions + #[test] + fn test_idempotent_union_mixed_partitions() { + let live = Arc::new(MockMultiPartitionExec::new(8)); + let hist = Arc::new(MockMultiPartitionExec::new(1)); + let union = UnionExec::try_new(vec![live as _, hist as _]).unwrap(); + + let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions { + descending: true, + nulls_first: true, + }, + )]) + .unwrap(); + + let sort = Arc::new(SortExec::new(sort_expr, union)); + let limit = Arc::new(GlobalLimitExec::new(sort, 0, Some(5))); + + assert_idempotent(limit); + } + + // ======================================================================== + // Projection + multi-partition tests (pushdown_sorts trigger path) + // ======================================================================== + + /// ProjectionExec over multi-partition + sort DESC + limit. + /// This is the topology where pushdown_sorts pushes sort through projection + /// onto the multi-partition source. The optimizer must still produce a valid plan. + #[test] + fn test_projection_over_multi_partition_sort_limit() { + let source = Arc::new(MockMultiPartitionExec::new(16)); + // Identity projection + let proj_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![ + (Arc::new(Column::new("a", 0)), "a".to_string()), + (Arc::new(Column::new("b", 1)), "b".to_string()), + ]; + let projection = + Arc::new(ProjectionExec::try_new(proj_exprs, source as _).unwrap()); + + let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions { + descending: true, + nulls_first: true, + }, + )]) + .unwrap(); + + let sort = Arc::new(SortExec::new(sort_expr, projection)); + let limit: Arc<dyn ExecutionPlan> = + Arc::new(GlobalLimitExec::new(sort, 0, Some(21))); + + let result = optimize_and_sanity_check(Arc::clone(&limit)); + assert!( + result.is_ok(), + "SanityCheckPlan failed for projection over multi-partition: {:?}", + result.err() + ); + assert_idempotent(limit); + } + + // ======================================================================== + // Single partition tests (no unnecessary operators) + // ======================================================================== + + /// Single partition source + sort + limit should NOT add SortPreservingMergeExec. + #[test] + fn test_single_partition_no_unnecessary_spm() { + let source = Arc::new(MockMultiPartitionExec::new(1)); + + let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions { + descending: true, + nulls_first: true, + }, + )]) + .unwrap(); + + let sort = Arc::new(SortExec::new(sort_expr, source)); + let limit: Arc<dyn ExecutionPlan> = + Arc::new(GlobalLimitExec::new(sort, 0, Some(10))); + + let optimized = optimize_and_sanity_check(limit).unwrap(); + let plan_str = datafusion_physical_plan::displayable(optimized.as_ref()) + .indent(true) + .to_string(); + + // Single partition should not have SortPreservingMergeExec + assert!( + !plan_str.contains("SortPreservingMergeExec"), + "Unnecessary SortPreservingMergeExec for single partition:\n{plan_str}" + ); + } + + /// Source already has correct ordering → should not add SortExec. + #[test] + fn test_sort_already_satisfied_no_extra_sort() { + let source = Arc::new(MockMultiPartitionExec::new(1)); + + // Sort ASC matches MockMultiPartitionExec's output ordering (a ASC) + let sort_expr = LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new("a", 0)), + SortOptions { + descending: false, + nulls_first: false, + }, + )]) + .unwrap(); + + let sort = Arc::new(SortExec::new(sort_expr, source)); + let limit: Arc<dyn ExecutionPlan> = + Arc::new(GlobalLimitExec::new(sort, 0, Some(10))); + + let optimized = optimize_and_sanity_check(limit).unwrap(); + let plan_str = datafusion_physical_plan::displayable(optimized.as_ref()) + .indent(true) + .to_string(); + + // Sort should be eliminated since source already satisfies ordering + // The plan should just be limit + source (or limit + local limit + source) + assert!( + !plan_str.contains("SortExec: expr=[a@0 ASC"), + "Unnecessary SortExec when ordering already satisfied:\n{plan_str}" + ); + } + + // ======================================================================== Review Comment: many of these tests look similar/ the same as what is in enforce distribution, etc but it is hard to tell what changed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
