ozankabak commented on code in PR #7090:
URL: https://github.com/apache/arrow-datafusion/pull/7090#discussion_r1274360977


##########
datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs:
##########
@@ -15,136 +15,256 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Repartition optimizer that replaces `SortExec`s and their suitable 
`RepartitionExec` children with `SortPreservingRepartitionExec`s.
+//! Optimizer rule that replaces executors that lose ordering with their
+//! order-preserving variants when it is helpful; either in terms of
+//! performance or to accommodate unbounded streams by fixing the pipeline.
+
 use crate::error::Result;
-use crate::physical_optimizer::sort_enforcement::unbounded_output;
+use crate::physical_optimizer::sort_enforcement::{unbounded_output, ExecTree};
+use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort};
 use crate::physical_plan::repartition::RepartitionExec;
-use crate::physical_plan::sorts::sort::SortExec;
-use crate::physical_plan::ExecutionPlan;
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
 
 use super::utils::is_repartition;
 
-use datafusion_common::tree_node::Transformed;
+use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
 use datafusion_physical_expr::utils::ordering_satisfy;
 
-use itertools::enumerate;
 use std::sync::Arc;
 
-/// Creates a `SortPreservingRepartitionExec` from given `RepartitionExec`
-fn sort_preserving_repartition(
-    repartition: &RepartitionExec,
-) -> Result<Arc<RepartitionExec>> {
-    Ok(Arc::new(
-        RepartitionExec::try_new(
-            repartition.input().clone(),
-            repartition.partitioning().clone(),
-        )?
-        .with_preserve_order(),
-    ))
+/// For a given `plan`, this object carries the information one needs from its
+/// descendants to decide whether it is beneficial to replace order-losing (but
+/// somewhat faster) variants of certain operators with their order-preserving
+/// (but somewhat slower) cousins.
+#[derive(Debug, Clone)]
+pub(crate) struct OrderPreservationContext {
+    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    ordering_onwards: Vec<Option<ExecTree>>,
 }
 
-fn does_plan_maintain_input_order(plan: &Arc<dyn ExecutionPlan>) -> bool {
-    plan.maintains_input_order().iter().any(|flag| *flag)
-}
+impl OrderPreservationContext {
+    /// Creates a "default" order-preservation context.
+    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+        let length = plan.children().len();
+        OrderPreservationContext {
+            plan,
+            ordering_onwards: vec![None; length],
+        }
+    }
 
-/// Check the children nodes of a `SortExec` until ordering is lost (e.g. until
-/// another `SortExec` or a `CoalescePartitionsExec` which doesn't maintain 
ordering)
-/// and replace `RepartitionExec`s that do not maintain ordering (e.g. those 
whose
-/// input partition counts are larger than unity) with 
`SortPreservingRepartitionExec`s.
-/// Note that doing this may render the `SortExec` in question unneccessary, 
which will
-/// be removed later on.
-///
-/// For example, we transform the plan below
-/// "FilterExec: c@2 > 3",
-/// "  RepartitionExec: partitioning=Hash(\[b@0], 16), input_partitions=16",
-/// "    RepartitionExec: partitioning=Hash(\[a@0], 16), input_partitions=1",
-/// "      MemoryExec: partitions=1, 
partition_sizes=\[(<depends_on_batch_size>)], output_ordering: 
\[PhysicalSortExpr { expr: Column { name: \"a\", index: 0 }, options: 
SortOptions { descending: false, nulls_first: false } }]",
-/// into
-/// "FilterExec: c@2 > 3",
-/// "  SortPreservingRepartitionExec: partitioning=Hash(\[b@0], 16), 
input_partitions=16",
-/// "    RepartitionExec: partitioning=Hash(\[a@0], 16), input_partitions=1",
-/// "      MemoryExec: partitions=1, 
partition_sizes=\[<depends_on_batch_size>], output_ordering: \[PhysicalSortExpr 
{ expr: Column { name: \"a\", index: 0 }, options: SortOptions { descending: 
false, nulls_first: false } }]",
-/// where the `FilterExec` in the latter has output ordering `a ASC`. This 
ordering will
-/// potentially remove a `SortExec` at the top of `FilterExec`. If this 
doesn't help remove
-/// a `SortExec`, the old version is used.
-fn replace_sort_children(
-    plan: &Arc<dyn ExecutionPlan>,
-) -> Result<Arc<dyn ExecutionPlan>> {
-    if plan.children().is_empty() {
-        return Ok(plan.clone());
+    /// Creates a new order-preservation context from those of children nodes.
+    pub fn new_from_children_nodes(
+        children_nodes: Vec<OrderPreservationContext>,
+        parent_plan: Arc<dyn ExecutionPlan>,
+    ) -> Result<Self> {
+        let children_plans = children_nodes
+            .iter()
+            .map(|item| item.plan.clone())
+            .collect();
+        let ordering_onwards = children_nodes
+            .into_iter()
+            .enumerate()
+            .map(|(idx, item)| {
+                // Leaves of the `coalesce_onwards` tree are 
`CoalescePartitionsExec`
+                // operators. This tree collects all the intermediate 
executors that
+                // maintain a single partition. If we just saw a 
`CoalescePartitionsExec`
+                // operator, we reset the tree and start accumulating.
+                let plan = item.plan;
+                let ordering_onwards = item.ordering_onwards;
+                if plan.children().is_empty() {
+                    // Plan has no children, there is nothing to propagate.
+                    None
+                } else if ordering_onwards[0].is_none()
+                    && ((is_repartition(&plan) && 
!plan.maintains_input_order()[0])
+                        || (is_coalesce_partitions(&plan)
+                            && plan.children()[0].output_ordering().is_some()))
+                {
+                    Some(ExecTree::new(plan, idx, vec![]))
+                } else {
+                    let children = ordering_onwards
+                        .into_iter()
+                        .flatten()
+                        .filter(|item| {
+                            // Only consider operators that maintains ordering
+                            plan.maintains_input_order()[item.idx]
+                                || is_coalesce_partitions(&plan)
+                                || is_repartition(&plan)
+                        })
+                        .collect::<Vec<_>>();
+                    if children.is_empty() {
+                        None
+                    } else {
+                        Some(ExecTree::new(plan, idx, children))
+                    }
+                }
+            })
+            .collect();
+        let plan = with_new_children_if_necessary(parent_plan, 
children_plans)?.into();
+        Ok(OrderPreservationContext {
+            plan,
+            ordering_onwards,
+        })
     }
 
-    let mut children = plan.children();
-    for (idx, child) in enumerate(plan.children()) {
-        if !is_repartition(&child) && !does_plan_maintain_input_order(&child) {
-            break;
-        }
+    /// Computes order-preservation contexts for every child of the plan.
+    pub fn children(&self) -> Vec<OrderPreservationContext> {
+        self.plan
+            .children()
+            .into_iter()
+            .map(|child| OrderPreservationContext::new(child))
+            .collect()
+    }
+}
 
-        if let Some(repartition) = 
child.as_any().downcast_ref::<RepartitionExec>() {
-            // Replace this `RepartitionExec` with a 
`SortPreservingRepartitionExec`
-            // if it doesn't preserve ordering and its input is unbounded. 
Doing
-            // so avoids breaking the pipeline.
-            if !repartition.maintains_input_order()[0] && 
unbounded_output(&child) {
-                let spr = sort_preserving_repartition(repartition)?
-                    .with_new_children(repartition.children())?;
-                // Perform the replacement and recurse into this plan's 
children:
-                children[idx] = replace_sort_children(&spr)?;
-                continue;
+impl TreeNode for OrderPreservationContext {
+    fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
+    where
+        F: FnMut(&Self) -> Result<VisitRecursion>,
+    {
+        for child in self.children() {
+            match op(&child)? {
+                VisitRecursion::Continue => {}
+                VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
+                VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
             }
         }
+        Ok(VisitRecursion::Continue)
+    }
 
-        children[idx] = replace_sort_children(&child)?;
+    fn map_children<F>(self, transform: F) -> Result<Self>
+    where
+        F: FnMut(Self) -> Result<Self>,
+    {
+        let children = self.children();
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            let children_nodes = children
+                .into_iter()
+                .map(transform)
+                .collect::<Result<Vec<_>>>()?;
+            OrderPreservationContext::new_from_children_nodes(children_nodes, 
self.plan)
+        }
     }
+}
 
-    plan.clone().with_new_children(children)
+/// Calculates the updated plan by replacing executors that lose ordering
+/// inside the `ExecTree` with their order-preserving variants. This will
+/// generate an alternative plan, which will be accepted or rejected later on
+/// depending on whether it helps us remove a `SortExec`.
+fn get_updated_plan(
+    exec_tree: &ExecTree,
+    // Flag indicating that it is desirable to replace `RepartitionExec`s with
+    // `SortPreservingRepartitionExec`s:
+    is_spr_better: bool,
+    // Flag indicating that it is desirable to replace 
`CoalescePartitionsExec`s
+    // with `SortPreservingMergeExec`s:
+    is_spm_better: bool,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    let plan = exec_tree.plan.clone();
+
+    let mut children = plan.children();
+    // Update children and their descendants in the given tree:
+    for item in &exec_tree.children {
+        children[item.idx] = get_updated_plan(item, is_spr_better, 
is_spm_better)?;
+    }
+    // Construct the plan with updated children:
+    let mut plan = plan.with_new_children(children)?;
+
+    // When a `RepartitionExec` doesn't preserve ordering, replace it with
+    // a `SortPreservingRepartitionExec` if appropriate:
+    if is_repartition(&plan) && !plan.maintains_input_order()[0] && 
is_spr_better {
+        let child = plan.children()[0].clone();
+        plan = Arc::new(
+            RepartitionExec::try_new(child, plan.output_partitioning())?
+                .with_preserve_order(),
+        ) as _
+    }
+    // When the input of a `CoalescePartitionsExec` has an ordering, replace it
+    // with a `SortPreservingMergeExec` if appropriate:
+    if is_coalesce_partitions(&plan)
+        && plan.children()[0].output_ordering().is_some()
+        && is_spm_better
+    {
+        let child = plan.children()[0].clone();
+        plan = Arc::new(SortPreservingMergeExec::new(
+            child.output_ordering().unwrap_or(&[]).to_vec(),
+            child,
+        )) as _
+    }
+    Ok(plan)
 }
 
-/// The `replace_repartition_execs` optimizer sub-rule searches for `SortExec`s
-/// and their `RepartitionExec` children with multiple input partitioning 
having
-/// local (per-partition) ordering, so that it can replace the 
`RepartitionExec`
-/// with a `SortPreservingRepartitionExec` and remove the pipeline-breaking 
`SortExec`
-/// from the physical plan.
+/// The `replace_with_order_preserving_variants` optimizer sub-rule tries to
+/// remove `SortExec`s from the physical plan by replacing operators that do
+/// not preserve ordering with their order-preserving variants; i.e. by 
replacing
+/// `RepartitionExec`s with `SortPreservingRepartitionExec`s or by replacing
+/// `CoalescePartitionsExec`s with `SortPreservingMergeExec`s.
+///
+/// If this replacement is helpful for removing a `SortExec`, it updates the 
plan.
+/// Otherwise, it leaves the plan unchanged.
 ///
 /// The algorithm flow is simply like this:
-/// 1. Visit nodes of the physical plan top-down and look for `SortExec` nodes.
-/// 2. If a `SortExec` is found, iterate over its children recursively until an
-///    executor that doesn't maintain ordering is encountered (or until a leaf 
node).
-///    `RepartitionExec`s with multiple input partitions are considered as if 
they
-///    maintain input ordering because they are potentially replaceable with
-///    `SortPreservingRepartitionExec`s which maintain ordering.
-/// 3_1. Replace the `RepartitionExec`s with multiple input partitions (which 
doesn't
-///      maintain ordering) with a `SortPreservingRepartitionExec`.
-/// 3_2. Otherwise, keep the plan as is.
-/// 4. Check if the `SortExec` is still necessary in the updated plan by 
comparing
+/// 1. Visit nodes of the physical plan bottom-up and look for `SortExec` 
nodes.
+/// 1_1. During the traversal, build an `ExecTree` to keep track of operators
+///      that maintain ordering (or can maintain ordering when replaced by an
+///      order-preserving variant) until a `SortExec` is found.
+/// 2. When a `SortExec` is found, update the child of the `SortExec` by 
replacing
+///    operators that do not preserve ordering in the `ExecTree` with their 
order
+///    preserving variants.
+/// 3. Check if the `SortExec` is still necessary in the updated plan by 
comparing
 ///    its input ordering with the output ordering it imposes. We do this 
because
-///     replacing `RepartitionExec`s with `SortPreservingRepartitionExec`s 
enables us
-///     to preserve the previously lost ordering during `RepartitionExec`s.
-/// 5_1. If the `SortExec` in question turns out to be unnecessary, remove it 
and use
-///      updated plan. Otherwise, use the original plan.
-/// 6. Continue the top-down iteration until another `SortExec` is seen, or 
the iterations finish.
-pub fn replace_repartition_execs(
-    plan: Arc<dyn ExecutionPlan>,
-) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
-    if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
-        let changed_plan = replace_sort_children(&plan)?;
-        // Since we have a `SortExec` here, it's guaranteed that it has a 
single child.
-        let input = &changed_plan.children()[0];
-        // Check if any child is changed, if so remove the `SortExec`. If the 
ordering
-        // is being satisfied with the child, then it means `SortExec` is 
unnecessary.
+///    replacing operators that lose ordering with their order-preserving 
variants
+///    enables us to preserve the previously lost ordering at the input of 
`SortExec`.
+/// 4. If the `SortExec` in question turns out to be unnecessary, remove it 
and use
+///     updated plan. Otherwise, use the original plan.
+/// 5. Continue the bottom-up traversal until another `SortExec` is seen, or 
the traversal
+///    is complete.
+pub(crate) fn replace_with_order_preserving_variants(
+    requirements: OrderPreservationContext,
+    // A flag indicating that replacing `RepartitionExec`s with
+    // `SortPreservingRepartitionExec`s is desirable when it helps
+    // to remove a `SortExec` from the plan. If this flag is `false`,
+    // this replacement should only be made to fix the pipeline (streaming).
+    is_spr_better: bool,
+    // A flag indicating that replacing `CoalescePartitionsExec`s with
+    // `SortPreservingMergeExec`s is desirable when it helps to remove
+    // a `SortExec` from the plan. If this flag is `false`, this replacement
+    // should only be made to fix the pipeline (streaming).
+    is_spm_better: bool,
+) -> Result<Transformed<OrderPreservationContext>> {
+    let plan = &requirements.plan;
+    let ordering_onwards = &requirements.ordering_onwards;
+    if is_sort(plan) {
+        let exec_tree = if let Some(exec_tree) = &ordering_onwards[0] {
+            exec_tree
+        } else {
+            return Ok(Transformed::No(requirements));
+        };
+        // For unbounded cases, replace with the order-preserving variant in
+        // any case, as doing so helps fix the pipeline.
+        let is_unbounded = unbounded_output(plan);
+        let updated_sort_input = get_updated_plan(
+            exec_tree,
+            is_spr_better | is_unbounded,
+            is_spm_better | is_unbounded,

Review Comment:
   ```suggestion
               is_spr_better || is_unbounded,
               is_spm_better || is_unbounded,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to