This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new cbda394c0d Update enforce_distribution.rs (#16913)
cbda394c0d is described below
commit cbda394c0dd1b9e20db45c1cc47cf2d2cc204983
Author: Berkay Şahin <[email protected]>
AuthorDate: Sun Jul 27 04:56:15 2025 +0300
Update enforce_distribution.rs (#16913)
---
.../physical-optimizer/src/enforce_distribution.rs | 22 ++++++++++++----------
1 file changed, 12 insertions(+), 10 deletions(-)
diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs
b/datafusion/physical-optimizer/src/enforce_distribution.rs
index 39eb557ea6..88dcd4c523 100644
--- a/datafusion/physical-optimizer/src/enforce_distribution.rs
+++ b/datafusion/physical-optimizer/src/enforce_distribution.rs
@@ -925,19 +925,20 @@ fn add_hash_on_top(
Ok(input)
}
-/// Adds a [`SortPreservingMergeExec`] operator on top of input executor
-/// to satisfy single distribution requirement.
+/// Adds a [`SortPreservingMergeExec`] or a [`CoalescePartitionsExec`] operator
+/// on top of the given plan node to satisfy a single partition requirement
+/// while preserving ordering constraints.
///
-/// # Arguments
+/// # Parameters
///
/// * `input`: Current node.
///
/// # Returns
///
-/// Updated node with an execution plan, where desired single
-/// distribution is satisfied by adding [`SortPreservingMergeExec`].
-fn add_spm_on_top(input: DistributionContext) -> DistributionContext {
- // Add SortPreservingMerge only when partition count is larger than 1.
+/// Updated node with an execution plan, where the desired single distribution
+/// requirement is satisfied.
+fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
+ // Apply only when the partition count is larger than one.
if input.plan.output_partitioning().partition_count() > 1 {
// When there is an existing ordering, we preserve ordering
// when decreasing partitions. This will be un-done in the future
@@ -945,12 +946,13 @@ fn add_spm_on_top(input: DistributionContext) ->
DistributionContext {
// - Preserving ordering is not helpful in terms of satisfying
ordering requirements
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.prefer_existing_sort`)
- let new_plan = if let Some(ordering) = input.plan.output_ordering() {
+ let new_plan = if let Some(req) = input.plan.output_ordering() {
Arc::new(SortPreservingMergeExec::new(
- ordering.clone(),
+ req.clone(),
Arc::clone(&input.plan),
)) as _
} else {
+ // If there is no input order, we can simply coalesce partitions:
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
};
@@ -1259,7 +1261,7 @@ pub fn ensure_distribution(
// Satisfy the distribution requirement if it is unmet.
match &requirement {
Distribution::SinglePartition => {
- child = add_spm_on_top(child);
+ child = add_merge_on_top(child);
}
Distribution::HashPartitioned(exprs) => {
if add_roundrobin {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]