xudong963 commented on code in PR #14637:
URL: https://github.com/apache/datafusion/pull/14637#discussion_r1958375906


##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -126,29 +126,65 @@ fn update_sort_ctx_children(
 /// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The 
data
 /// attribute stores whether the plan is a `CoalescePartitionsExec` or is
 /// connected to a `CoalescePartitionsExec` via its children.
+///
+/// The tracker halts at each [`SortExec`] (where the SPM will act to replace 
the coalesce).
+///
+/// This requires a bottom-up traversal was previously performed, updating the
+/// children previously.
 pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>;
 
+/// Determines if the coalesce may be safely removed.
+fn is_coalesce_to_remove(
+    node: &Arc<dyn ExecutionPlan>,
+    parent: &Arc<dyn ExecutionPlan>,
+) -> bool {
+    node.as_any().downcast_ref::<CoalescePartitionsExec>()
+        .map(|_coalesce| {
+            // TODO(wiedld): find a more generalized approach that does not 
rely on
+            // pattern matching the structure of the DAG
+            // Note that the `Partitioning::satisfy()` (parent vs. 
coalesce.child) cannot be used for cases of:
+            //      * Repartition -> Coalesce -> Repartition
+            //      * Coalesce -> AggregateExec(input=hash-partitioned)
+
+            let parent_req_single_partition = 
matches!(parent.required_input_distribution()[0], Distribution::SinglePartition)
+                // handle aggregates with input=hashPartitioning with a single 
output partition
+                || (is_aggregate(parent) && 
parent.properties().output_partitioning().partition_count() <= 1);
+
+            // node above does not require single distribution
+            !parent_req_single_partition
+            // it doesn't immediately repartition
+            || is_repartition(parent)
+            // any adjacent Coalesce->Sort can be replaced
+            || is_sort(parent)
+        }).unwrap_or(false)
+}
+
 fn update_coalesce_ctx_children(
     coalesce_context: &mut PlanWithCorrespondingCoalescePartitions,
 ) {
-    let children = &coalesce_context.children;
-    coalesce_context.data = if children.is_empty() {
-        // Plan has no children, it cannot be a `CoalescePartitionsExec`.
-        false
-    } else if is_coalesce_partitions(&coalesce_context.plan) {
-        // Initiate a connection:
-        true
-    } else {
-        children.iter().enumerate().any(|(idx, node)| {
-            // Only consider operators that don't require a single partition,
-            // and connected to some `CoalescePartitionsExec`:
-            node.data
-                && !matches!(
-                    coalesce_context.plan.required_input_distribution()[idx],
-                    Distribution::SinglePartition
-                )
-        })
-    };
+    // perform lookahead(1) during bottom up traversal
+    // since we are checking distribution requirements after the coalesce 
occurs
+    let parent = &coalesce_context.plan;
+
+    for child_context in coalesce_context.children.iter_mut() {
+        // determine if child, or it's descendents, are a coalesce to be 
removed
+        child_context.data = if child_context.children.is_empty() {
+            // Plan has no children, it cannot be a `CoalescePartitionsExec`.
+            false
+        } else if is_coalesce_to_remove(&child_context.plan, parent) {
+            // Initiate a connection:
+            true
+        } else if is_sort(&child_context.plan) {
+            // halt coalesce removals at the sort
+            false
+        } else {
+            // propogate

Review Comment:
   `propagate`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to