Copilot commented on code in PR #21976:
URL: https://github.com/apache/datafusion/pull/21976#discussion_r3192516930


##########
datafusion/physical-optimizer/src/ensure_requirements/mod.rs:
##########
@@ -0,0 +1,1804 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EnsureRequirements`] optimizer rule that enforces both distribution and
+//! sorting requirements in a **single bottom-up pass**.
+//!
+//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting`
+//! rules with a unified approach inspired by Apache Spark's 
`EnsureRequirements`
+//! and Presto/Trino's `AddExchanges`.
+//!
+//! # Motivation
+//!
+//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`)
+//! suffers from non-idempotent composition: `EnforceSorting`'s 
`pushdown_sorts`
+//! can break distribution invariants established by `EnforceDistribution`,
+//! because `SortExec.preserve_partitioning` couples sorting and distribution
+//! decisions. See <https://github.com/apache/datafusion/issues/21973> for 
details.
+//!
+//! # Architecture
+//!
+//! ```text
+//! EnsureRequirements::optimize(plan)
+//! │
+//! ├─ Phase 1 (optional): reorder_join_keys (top-down)
+//! │   └─ Same as existing adjust_input_keys_ordering
+//! │
+//! └─ Phase 2: ensure_requirements (single bottom-up pass)
+//!     └─ For each node (bottom-up), for each child:
+//!         Step 1: Ensure distribution requirement
+//!           └─ Add RepartitionExec / CoalescePartitionsExec / 
SortPreservingMergeExec
+//!         Step 2: Ensure ordering requirement (distribution-aware)
+//!           └─ Add SortExec with correct preserve_partitioning + SPM if 
needed
+//! ```
+//!
+//! # Key Properties
+//!
+//! - **Idempotent**: Running the rule twice produces the same plan.
+//! - **Distribution before sorting**: For each child, distribution is resolved
+//!   before ordering, so sorting decisions always have full distribution 
context.
+//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the 
bottom-up
+//!   pass only adds `SortExec` where the child doesn't already satisfy the
+//!   ordering requirement, naturally placing sorts at the deepest valid 
position.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::Result;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};

Review Comment:
   `TransformedResult` is imported but not used in this module. If CI runs 
`clippy -D warnings`, this will fail the build; please remove the unused import 
(or use it if intended).
   



##########
datafusion/physical-optimizer/src/optimizer.rs:
##########
@@ -170,18 +169,12 @@ impl PhysicalOptimizer {
             // those are handled by the later `FilterPushdown` rule.
             // See `FilterPushdownPhase` for more details.
             Arc::new(FilterPushdown::new()),
-            // The EnforceDistribution rule is for adding essential 
repartitioning to satisfy distribution
-            // requirements. Please make sure that the whole plan tree is 
determined before this rule.
-            // This rule increases parallelism if doing so is beneficial to 
the physical plan; i.e. at
-            // least one of the operators in the plan benefits from increased 
parallelism.
-            Arc::new(EnforceDistribution::new()),
-            // The CombinePartialFinalAggregate rule should be applied after 
the EnforceDistribution rule
+            // EnsureRequirements: merged EnforceDistribution + EnforceSorting 
into a
+            // single idempotent rule with distribution-aware pushdown_sorts.
+            // See https://github.com/apache/datafusion/issues/21973
+            Arc::new(EnsureRequirements::new()),
+            // The CombinePartialFinalAggregate rule should be applied after 
distribution enforcement
             Arc::new(CombinePartialFinalAggregate::new()),

Review Comment:
   With the default optimizer chain change, EnsureRequirements now enforces 
sorting before `CombinePartialFinalAggregate` runs. 
`CombinePartialFinalAggregate` only matches directly-adjacent Partial→Final 
aggregates, but `ensure_sorting` can insert `SortExec` between them (e.g. when 
the Final aggregate has `required_input_ordering`), preventing the combine 
optimization. Consider moving `CombinePartialFinalAggregate` before the sorting 
phase (e.g. keep it between distribution and sorting inside EnsureRequirements, 
or otherwise preserve the previous relative ordering of this combine rule vs 
sort enforcement).
   



##########
datafusion/physical-optimizer/src/ensure_requirements/mod.rs:
##########
@@ -0,0 +1,1804 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`EnsureRequirements`] optimizer rule that enforces both distribution and
+//! sorting requirements in a **single bottom-up pass**.
+//!
+//! This rule replaces the separate `EnforceDistribution` + `EnforceSorting`
+//! rules with a unified approach inspired by Apache Spark's 
`EnsureRequirements`
+//! and Presto/Trino's `AddExchanges`.
+//!
+//! # Motivation
+//!
+//! The previous two-rule design (`EnforceDistribution` then `EnforceSorting`)
+//! suffers from non-idempotent composition: `EnforceSorting`'s 
`pushdown_sorts`
+//! can break distribution invariants established by `EnforceDistribution`,
+//! because `SortExec.preserve_partitioning` couples sorting and distribution
+//! decisions. See <https://github.com/apache/datafusion/issues/21973> for 
details.
+//!
+//! # Architecture
+//!
+//! ```text
+//! EnsureRequirements::optimize(plan)
+//! │
+//! ├─ Phase 1 (optional): reorder_join_keys (top-down)
+//! │   └─ Same as existing adjust_input_keys_ordering
+//! │
+//! └─ Phase 2: ensure_requirements (single bottom-up pass)
+//!     └─ For each node (bottom-up), for each child:
+//!         Step 1: Ensure distribution requirement
+//!           └─ Add RepartitionExec / CoalescePartitionsExec / 
SortPreservingMergeExec
+//!         Step 2: Ensure ordering requirement (distribution-aware)
+//!           └─ Add SortExec with correct preserve_partitioning + SPM if 
needed
+//! ```
+//!
+//! # Key Properties
+//!
+//! - **Idempotent**: Running the rule twice produces the same plan.
+//! - **Distribution before sorting**: For each child, distribution is resolved
+//!   before ordering, so sorting decisions always have full distribution 
context.
+//! - **No separate `pushdown_sorts`**: Sort pushdown is implicit — the 
bottom-up
+//!   pass only adds `SortExec` where the child doesn't already satisfy the
+//!   ordering requirement, naturally placing sorts at the deepest valid 
position.

Review Comment:
   Module docs claim EnsureRequirements performs a single bottom-up pass and 
has "No separate `pushdown_sorts`", but the implementation still runs 
`pushdown_sorts` in Phase 3c (and also runs multiple sequential passes: 
distribution enforcement, sorting enforcement, parallelize_sorts, 
order-preserving variants, pushdown_sorts, partial sort). Please update the 
module-level documentation to match the actual algorithm, or adjust the 
implementation to align with the documented design.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to