This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new cbda394c0d Update enforce_distribution.rs (#16913)
cbda394c0d is described below

commit cbda394c0dd1b9e20db45c1cc47cf2d2cc204983
Author: Berkay Şahin <[email protected]>
AuthorDate: Sun Jul 27 04:56:15 2025 +0300

    Update enforce_distribution.rs (#16913)
---
 .../physical-optimizer/src/enforce_distribution.rs | 22 ++++++++++++----------
 1 file changed, 12 insertions(+), 10 deletions(-)

diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs 
b/datafusion/physical-optimizer/src/enforce_distribution.rs
index 39eb557ea6..88dcd4c523 100644
--- a/datafusion/physical-optimizer/src/enforce_distribution.rs
+++ b/datafusion/physical-optimizer/src/enforce_distribution.rs
@@ -925,19 +925,20 @@ fn add_hash_on_top(
     Ok(input)
 }
 
-/// Adds a [`SortPreservingMergeExec`] operator on top of input executor
-/// to satisfy single distribution requirement.
+/// Adds a [`SortPreservingMergeExec`] or a [`CoalescePartitionsExec`] operator
+/// on top of the given plan node to satisfy a single partition requirement
+/// while preserving ordering constraints.
 ///
-/// # Arguments
+/// # Parameters
 ///
 /// * `input`: Current node.
 ///
 /// # Returns
 ///
-/// Updated node with an execution plan, where desired single
-/// distribution is satisfied by adding [`SortPreservingMergeExec`].
-fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
-    // Add SortPreservingMerge only when partition count is larger than 1.
+/// Updated node with an execution plan, where the desired single distribution
+/// requirement is satisfied.
+fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
+    // Apply only when the partition count is larger than one.
     if input.plan.output_partitioning().partition_count() > 1 {
         // When there is an existing ordering, we preserve ordering
         // when decreasing partitions. This will be un-done in the future
@@ -945,12 +946,13 @@ fn add_spm_on_top(input: DistributionContext) -> 
DistributionContext {
         // - Preserving ordering is not helpful in terms of satisfying 
ordering requirements
         // - Usage of order preserving variants is not desirable
         // (determined by flag `config.optimizer.prefer_existing_sort`)
-        let new_plan = if let Some(ordering) = input.plan.output_ordering() {
+        let new_plan = if let Some(req) = input.plan.output_ordering() {
             Arc::new(SortPreservingMergeExec::new(
-                ordering.clone(),
+                req.clone(),
                 Arc::clone(&input.plan),
             )) as _
         } else {
+            // If there is no input order, we can simply coalesce partitions:
             Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
         };
 
@@ -1259,7 +1261,7 @@ pub fn ensure_distribution(
             // Satisfy the distribution requirement if it is unmet.
             match &requirement {
                 Distribution::SinglePartition => {
-                    child = add_spm_on_top(child);
+                    child = add_merge_on_top(child);
                 }
                 Distribution::HashPartitioned(exprs) => {
                     if add_roundrobin {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to