Copilot commented on code in PR #21976: URL: https://github.com/apache/datafusion/pull/21976#discussion_r3192516930
########## datafusion/physical-optimizer/src/ensure_requirements/mod.rs: ########## @@ -0,0 +1,1804 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`EnsureRequirements`] optimizer rule that enforces both distribution and +//! sorting requirements in a **single bottom-up pass**. +//! +//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting` +//! rules with a unified approach inspired by Apache Spark's `EnsureRequirements` +//! and Presto/Trino's `AddExchanges`. +//! +//! # Motivation +//! +//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`) +//! suffers from non-idempotent composition: `EnforceSorting`'s `pushdown_sorts` +//! can break distribution invariants established by `EnforceDistribution`, +//! because `SortExec.preserve_partitioning` couples sorting and distribution +//! decisions. See <https://github.com/apache/datafusion/issues/21973> for details. +//! +//! # Architecture +//! +//! ```text +//! EnsureRequirements::optimize(plan) +//! │ +//! ├─ Phase 1 (optional): reorder_join_keys (top-down) +//! │ └─ Same as existing adjust_input_keys_ordering +//! │ +//! └─ Phase 2: ensure_requirements (single bottom-up pass) +//! └─ For each node (bottom-up), for each child: +//! Step 1: Ensure distribution requirement +//! └─ Add RepartitionExec / CoalescePartitionsExec / SortPreservingMergeExec +//! Step 2: Ensure ordering requirement (distribution-aware) +//! └─ Add SortExec with correct preserve_partitioning + SPM if needed +//! ``` +//! +//! # Key Properties +//! +//! - **Idempotent**: Running the rule twice produces the same plan. +//! - **Distribution before sorting**: For each child, distribution is resolved +//! before ordering, so sorting decisions always have full distribution context. +//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the bottom-up +//! pass only adds `SortExec` where the child doesn't already satisfy the +//! ordering requirement, naturally placing sorts at the deepest valid position. + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; + +use datafusion_common::Result; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; Review Comment: `TransformedResult` is imported but not used in this module. If CI runs `clippy -D warnings`, this will fail the build; please remove the unused import (or use it if intended). ########## datafusion/physical-optimizer/src/optimizer.rs: ########## @@ -170,18 +169,12 @@ impl PhysicalOptimizer { // those are handled by the later `FilterPushdown` rule. // See `FilterPushdownPhase` for more details. Arc::new(FilterPushdown::new()), - // The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution - // requirements. Please make sure that the whole plan tree is determined before this rule. - // This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at - // least one of the operators in the plan benefits from increased parallelism. - Arc::new(EnforceDistribution::new()), - // The CombinePartialFinalAggregate rule should be applied after the EnforceDistribution rule + // EnsureRequirements: merged EnforceDistribution + EnforceSorting into a + // single idempotent rule with distribution-aware pushdown_sorts. + // See https://github.com/apache/datafusion/issues/21973 + Arc::new(EnsureRequirements::new()), + // The CombinePartialFinalAggregate rule should be applied after distribution enforcement Arc::new(CombinePartialFinalAggregate::new()), Review Comment: With the default optimizer chain change, EnsureRequirements now enforces sorting before `CombinePartialFinalAggregate` runs. `CombinePartialFinalAggregate` only matches directly-adjacent Partial→Final aggregates, but `ensure_sorting` can insert `SortExec` between them (e.g. when the Final aggregate has `required_input_ordering`), preventing the combine optimization. Consider moving `CombinePartialFinalAggregate` before the sorting phase (e.g. keep it between distribution and sorting inside EnsureRequirements, or otherwise preserve the previous relative ordering of this combine rule vs sort enforcement). ########## datafusion/physical-optimizer/src/ensure_requirements/mod.rs: ########## @@ -0,0 +1,1804 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`EnsureRequirements`] optimizer rule that enforces both distribution and +//! sorting requirements in a **single bottom-up pass**. +//! +//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting` +//! rules with a unified approach inspired by Apache Spark's `EnsureRequirements` +//! and Presto/Trino's `AddExchanges`. +//! +//! # Motivation +//! +//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`) +//! suffers from non-idempotent composition: `EnforceSorting`'s `pushdown_sorts` +//! can break distribution invariants established by `EnforceDistribution`, +//! because `SortExec.preserve_partitioning` couples sorting and distribution +//! decisions. See <https://github.com/apache/datafusion/issues/21973> for details. +//! +//! # Architecture +//! +//! ```text +//! EnsureRequirements::optimize(plan) +//! │ +//! ├─ Phase 1 (optional): reorder_join_keys (top-down) +//! │ └─ Same as existing adjust_input_keys_ordering +//! │ +//! └─ Phase 2: ensure_requirements (single bottom-up pass) +//! └─ For each node (bottom-up), for each child: +//! Step 1: Ensure distribution requirement +//! └─ Add RepartitionExec / CoalescePartitionsExec / SortPreservingMergeExec +//! Step 2: Ensure ordering requirement (distribution-aware) +//! └─ Add SortExec with correct preserve_partitioning + SPM if needed +//! ``` +//! +//! # Key Properties +//! +//! - **Idempotent**: Running the rule twice produces the same plan. +//! - **Distribution before sorting**: For each child, distribution is resolved +//! before ordering, so sorting decisions always have full distribution context. +//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the bottom-up +//! pass only adds `SortExec` where the child doesn't already satisfy the +//! ordering requirement, naturally placing sorts at the deepest valid position. Review Comment: Module docs claim EnsureRequirements performs a single bottom-up pass and has "No separate `pushdown_sorts`", but the implementation still runs `pushdown_sorts` in Phase 3c (and also runs multiple sequential passes: distribution enforcement, sorting enforcement, parallelize_sorts, order-preserving variants, pushdown_sorts, partial sort). Please update the module-level documentation to match the actual algorithm, or adjust the implementation to align with the documented design. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
