xudong963 commented on code in PR #14637: URL: https://github.com/apache/datafusion/pull/14637#discussion_r1958375906
########## datafusion/physical-optimizer/src/enforce_sorting/mod.rs: ########## @@ -126,29 +126,65 @@ fn update_sort_ctx_children( /// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data /// attribute stores whether the plan is a `CoalescePartitionsExec` or is /// connected to a `CoalescePartitionsExec` via its children. +/// +/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce). +/// +/// This requires a bottom-up traversal was previously performed, updating the +/// children previously. pub type PlanWithCorrespondingCoalescePartitions = PlanContext<bool>; +/// Determines if the coalesce may be safely removed. +fn is_coalesce_to_remove( + node: &Arc<dyn ExecutionPlan>, + parent: &Arc<dyn ExecutionPlan>, +) -> bool { + node.as_any().downcast_ref::<CoalescePartitionsExec>() + .map(|_coalesce| { + // TODO(wiedld): find a more generalized approach that does not rely on + // pattern matching the structure of the DAG + // Note that the `Partitioning::satisfy()` (parent vs. coalesce.child) cannot be used for cases of: + // * Repartition -> Coalesce -> Repartition + // * Coalesce -> AggregateExec(input=hash-partitioned) + + let parent_req_single_partition = matches!(parent.required_input_distribution()[0], Distribution::SinglePartition) + // handle aggregates with input=hashPartitioning with a single output partition + || (is_aggregate(parent) && parent.properties().output_partitioning().partition_count() <= 1); + + // node above does not require single distribution + !parent_req_single_partition + // it doesn't immediately repartition + || is_repartition(parent) + // any adjacent Coalesce->Sort can be replaced + || is_sort(parent) + }).unwrap_or(false) +} + fn update_coalesce_ctx_children( coalesce_context: &mut PlanWithCorrespondingCoalescePartitions, ) { - let children = &coalesce_context.children; - coalesce_context.data = if children.is_empty() { - // Plan has no children, it cannot be a `CoalescePartitionsExec`. - false - } else if is_coalesce_partitions(&coalesce_context.plan) { - // Initiate a connection: - true - } else { - children.iter().enumerate().any(|(idx, node)| { - // Only consider operators that don't require a single partition, - // and connected to some `CoalescePartitionsExec`: - node.data - && !matches!( - coalesce_context.plan.required_input_distribution()[idx], - Distribution::SinglePartition - ) - }) - }; + // perform lookahead(1) during bottom up traversal + // since we are checking distribution requirements after the coalesce occurs + let parent = &coalesce_context.plan; + + for child_context in coalesce_context.children.iter_mut() { + // determine if child, or it's descendents, are a coalesce to be removed + child_context.data = if child_context.children.is_empty() { + // Plan has no children, it cannot be a `CoalescePartitionsExec`. + false + } else if is_coalesce_to_remove(&child_context.plan, parent) { + // Initiate a connection: + true + } else if is_sort(&child_context.plan) { + // halt coalesce removals at the sort + false + } else { + // propogate Review Comment: `propagate`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org