mustafasrepo commented on code in PR #7090:
URL: https://github.com/apache/arrow-datafusion/pull/7090#discussion_r1274450719
##########
datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs:
##########
@@ -15,136 +15,256 @@
// specific language governing permissions and limitations
// under the License.
-//! Repartition optimizer that replaces `SortExec`s and their suitable
`RepartitionExec` children with `SortPreservingRepartitionExec`s.
+//! Optimizer rule that replaces executors that lose ordering with their
+//! order-preserving variants when it is helpful; either in terms of
+//! performance or to accommodate unbounded streams by fixing the pipeline.
+
use crate::error::Result;
-use crate::physical_optimizer::sort_enforcement::unbounded_output;
+use crate::physical_optimizer::sort_enforcement::{unbounded_output, ExecTree};
+use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort};
use crate::physical_plan::repartition::RepartitionExec;
-use crate::physical_plan::sorts::sort::SortExec;
-use crate::physical_plan::ExecutionPlan;
+use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use super::utils::is_repartition;
-use datafusion_common::tree_node::Transformed;
+use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_physical_expr::utils::ordering_satisfy;
-use itertools::enumerate;
use std::sync::Arc;
-/// Creates a `SortPreservingRepartitionExec` from given `RepartitionExec`
-fn sort_preserving_repartition(
- repartition: &RepartitionExec,
-) -> Result<Arc<RepartitionExec>> {
- Ok(Arc::new(
- RepartitionExec::try_new(
- repartition.input().clone(),
- repartition.partitioning().clone(),
- )?
- .with_preserve_order(),
- ))
+/// For a given `plan`, this object carries the information one needs from its
+/// descendants to decide whether it is beneficial to replace order-losing (but
+/// somewhat faster) variants of certain operators with their order-preserving
+/// (but somewhat slower) cousins.
+#[derive(Debug, Clone)]
+pub(crate) struct OrderPreservationContext {
+ pub(crate) plan: Arc<dyn ExecutionPlan>,
+ ordering_onwards: Vec<Option<ExecTree>>,
}
-fn does_plan_maintain_input_order(plan: &Arc<dyn ExecutionPlan>) -> bool {
- plan.maintains_input_order().iter().any(|flag| *flag)
-}
+impl OrderPreservationContext {
+ /// Creates a "default" order-preservation context.
+ pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+ let length = plan.children().len();
+ OrderPreservationContext {
+ plan,
+ ordering_onwards: vec![None; length],
+ }
+ }
-/// Check the children nodes of a `SortExec` until ordering is lost (e.g. until
-/// another `SortExec` or a `CoalescePartitionsExec` which doesn't maintain
ordering)
-/// and replace `RepartitionExec`s that do not maintain ordering (e.g. those
whose
-/// input partition counts are larger than unity) with
`SortPreservingRepartitionExec`s.
-/// Note that doing this may render the `SortExec` in question unneccessary,
which will
-/// be removed later on.
-///
-/// For example, we transform the plan below
-/// "FilterExec: c@2 > 3",
-/// " RepartitionExec: partitioning=Hash(\[b@0], 16), input_partitions=16",
-/// " RepartitionExec: partitioning=Hash(\[a@0], 16), input_partitions=1",
-/// " MemoryExec: partitions=1,
partition_sizes=\[(<depends_on_batch_size>)], output_ordering:
\[PhysicalSortExpr { expr: Column { name: \"a\", index: 0 }, options:
SortOptions { descending: false, nulls_first: false } }]",
-/// into
-/// "FilterExec: c@2 > 3",
-/// " SortPreservingRepartitionExec: partitioning=Hash(\[b@0], 16),
input_partitions=16",
-/// " RepartitionExec: partitioning=Hash(\[a@0], 16), input_partitions=1",
-/// " MemoryExec: partitions=1,
partition_sizes=\[<depends_on_batch_size>], output_ordering: \[PhysicalSortExpr
{ expr: Column { name: \"a\", index: 0 }, options: SortOptions { descending:
false, nulls_first: false } }]",
-/// where the `FilterExec` in the latter has output ordering `a ASC`. This
ordering will
-/// potentially remove a `SortExec` at the top of `FilterExec`. If this
doesn't help remove
-/// a `SortExec`, the old version is used.
-fn replace_sort_children(
- plan: &Arc<dyn ExecutionPlan>,
-) -> Result<Arc<dyn ExecutionPlan>> {
- if plan.children().is_empty() {
- return Ok(plan.clone());
+ /// Creates a new order-preservation context from those of children nodes.
+ pub fn new_from_children_nodes(
+ children_nodes: Vec<OrderPreservationContext>,
+ parent_plan: Arc<dyn ExecutionPlan>,
+ ) -> Result<Self> {
+ let children_plans = children_nodes
+ .iter()
+ .map(|item| item.plan.clone())
+ .collect();
+ let ordering_onwards = children_nodes
+ .into_iter()
+ .enumerate()
+ .map(|(idx, item)| {
+ // Leaves of the `coalesce_onwards` tree are
`CoalescePartitionsExec`
+ // operators. This tree collects all the intermediate
executors that
+ // maintain a single partition. If we just saw a
`CoalescePartitionsExec`
+ // operator, we reset the tree and start accumulating.
+ let plan = item.plan;
+ let ordering_onwards = item.ordering_onwards;
+ if plan.children().is_empty() {
+ // Plan has no children, there is nothing to propagate.
+ None
+ } else if ordering_onwards[0].is_none()
+ && ((is_repartition(&plan) &&
!plan.maintains_input_order()[0])
+ || (is_coalesce_partitions(&plan)
+ && plan.children()[0].output_ordering().is_some()))
+ {
+ Some(ExecTree::new(plan, idx, vec![]))
+ } else {
+ let children = ordering_onwards
+ .into_iter()
+ .flatten()
+ .filter(|item| {
+ // Only consider operators that maintains ordering
+ plan.maintains_input_order()[item.idx]
+ || is_coalesce_partitions(&plan)
+ || is_repartition(&plan)
+ })
+ .collect::<Vec<_>>();
+ if children.is_empty() {
+ None
+ } else {
+ Some(ExecTree::new(plan, idx, children))
+ }
+ }
+ })
+ .collect();
+ let plan = with_new_children_if_necessary(parent_plan,
children_plans)?.into();
+ Ok(OrderPreservationContext {
+ plan,
+ ordering_onwards,
+ })
}
- let mut children = plan.children();
- for (idx, child) in enumerate(plan.children()) {
- if !is_repartition(&child) && !does_plan_maintain_input_order(&child) {
- break;
- }
+ /// Computes order-preservation contexts for every child of the plan.
+ pub fn children(&self) -> Vec<OrderPreservationContext> {
+ self.plan
+ .children()
+ .into_iter()
+ .map(|child| OrderPreservationContext::new(child))
+ .collect()
+ }
+}
- if let Some(repartition) =
child.as_any().downcast_ref::<RepartitionExec>() {
- // Replace this `RepartitionExec` with a
`SortPreservingRepartitionExec`
- // if it doesn't preserve ordering and its input is unbounded.
Doing
- // so avoids breaking the pipeline.
- if !repartition.maintains_input_order()[0] &&
unbounded_output(&child) {
- let spr = sort_preserving_repartition(repartition)?
- .with_new_children(repartition.children())?;
- // Perform the replacement and recurse into this plan's
children:
- children[idx] = replace_sort_children(&spr)?;
- continue;
+impl TreeNode for OrderPreservationContext {
+ fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
+ where
+ F: FnMut(&Self) -> Result<VisitRecursion>,
+ {
+ for child in self.children() {
+ match op(&child)? {
+ VisitRecursion::Continue => {}
+ VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
+ VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
+ Ok(VisitRecursion::Continue)
+ }
- children[idx] = replace_sort_children(&child)?;
+ fn map_children<F>(self, transform: F) -> Result<Self>
+ where
+ F: FnMut(Self) -> Result<Self>,
+ {
+ let children = self.children();
+ if children.is_empty() {
+ Ok(self)
+ } else {
+ let children_nodes = children
+ .into_iter()
+ .map(transform)
+ .collect::<Result<Vec<_>>>()?;
+ OrderPreservationContext::new_from_children_nodes(children_nodes,
self.plan)
+ }
}
+}
- plan.clone().with_new_children(children)
+/// Calculates the updated plan by replacing executors that lose ordering
+/// inside the `ExecTree` with their order-preserving variants. This will
+/// generate an alternative plan, which will be accepted or rejected later on
+/// depending on whether it helps us remove a `SortExec`.
+fn get_updated_plan(
+ exec_tree: &ExecTree,
+ // Flag indicating that it is desirable to replace `RepartitionExec`s with
+ // `SortPreservingRepartitionExec`s:
+ is_spr_better: bool,
+ // Flag indicating that it is desirable to replace
`CoalescePartitionsExec`s
+ // with `SortPreservingMergeExec`s:
+ is_spm_better: bool,
+) -> Result<Arc<dyn ExecutionPlan>> {
+ let plan = exec_tree.plan.clone();
+
+ let mut children = plan.children();
+ // Update children and their descendants in the given tree:
+ for item in &exec_tree.children {
+ children[item.idx] = get_updated_plan(item, is_spr_better,
is_spm_better)?;
+ }
+ // Construct the plan with updated children:
+ let mut plan = plan.with_new_children(children)?;
+
+ // When a `RepartitionExec` doesn't preserve ordering, replace it with
+ // a `SortPreservingRepartitionExec` if appropriate:
+ if is_repartition(&plan) && !plan.maintains_input_order()[0] &&
is_spr_better {
+ let child = plan.children()[0].clone();
+ plan = Arc::new(
+ RepartitionExec::try_new(child, plan.output_partitioning())?
+ .with_preserve_order(),
+ ) as _
+ }
+ // When the input of a `CoalescePartitionsExec` has an ordering, replace it
+ // with a `SortPreservingMergeExec` if appropriate:
+ if is_coalesce_partitions(&plan)
+ && plan.children()[0].output_ordering().is_some()
+ && is_spm_better
+ {
+ let child = plan.children()[0].clone();
+ plan = Arc::new(SortPreservingMergeExec::new(
+ child.output_ordering().unwrap_or(&[]).to_vec(),
+ child,
+ )) as _
+ }
+ Ok(plan)
}
-/// The `replace_repartition_execs` optimizer sub-rule searches for `SortExec`s
-/// and their `RepartitionExec` children with multiple input partitioning
having
-/// local (per-partition) ordering, so that it can replace the
`RepartitionExec`
-/// with a `SortPreservingRepartitionExec` and remove the pipeline-breaking
`SortExec`
-/// from the physical plan.
+/// The `replace_with_order_preserving_variants` optimizer sub-rule tries to
+/// remove `SortExec`s from the physical plan by replacing operators that do
+/// not preserve ordering with their order-preserving variants; i.e. by
replacing
+/// `RepartitionExec`s with `SortPreservingRepartitionExec`s or by replacing
+/// `CoalescePartitionsExec`s with `SortPreservingMergeExec`s.
+///
+/// If this replacement is helpful for removing a `SortExec`, it updates the
plan.
+/// Otherwise, it leaves the plan unchanged.
///
/// The algorithm flow is simply like this:
-/// 1. Visit nodes of the physical plan top-down and look for `SortExec` nodes.
-/// 2. If a `SortExec` is found, iterate over its children recursively until an
-/// executor that doesn't maintain ordering is encountered (or until a leaf
node).
-/// `RepartitionExec`s with multiple input partitions are considered as if
they
-/// maintain input ordering because they are potentially replaceable with
-/// `SortPreservingRepartitionExec`s which maintain ordering.
-/// 3_1. Replace the `RepartitionExec`s with multiple input partitions (which
doesn't
-/// maintain ordering) with a `SortPreservingRepartitionExec`.
-/// 3_2. Otherwise, keep the plan as is.
-/// 4. Check if the `SortExec` is still necessary in the updated plan by
comparing
+/// 1. Visit nodes of the physical plan bottom-up and look for `SortExec`
nodes.
+/// 1_1. During the traversal, build an `ExecTree` to keep track of operators
+/// that maintain ordering (or can maintain ordering when replaced by an
+/// order-preserving variant) until a `SortExec` is found.
+/// 2. When a `SortExec` is found, update the child of the `SortExec` by
replacing
+/// operators that do not preserve ordering in the `ExecTree` with their
order
+/// preserving variants.
+/// 3. Check if the `SortExec` is still necessary in the updated plan by
comparing
/// its input ordering with the output ordering it imposes. We do this
because
-/// replacing `RepartitionExec`s with `SortPreservingRepartitionExec`s
enables us
-/// to preserve the previously lost ordering during `RepartitionExec`s.
-/// 5_1. If the `SortExec` in question turns out to be unnecessary, remove it
and use
-/// updated plan. Otherwise, use the original plan.
-/// 6. Continue the top-down iteration until another `SortExec` is seen, or
the iterations finish.
-pub fn replace_repartition_execs(
- plan: Arc<dyn ExecutionPlan>,
-) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
- if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
- let changed_plan = replace_sort_children(&plan)?;
- // Since we have a `SortExec` here, it's guaranteed that it has a
single child.
- let input = &changed_plan.children()[0];
- // Check if any child is changed, if so remove the `SortExec`. If the
ordering
- // is being satisfied with the child, then it means `SortExec` is
unnecessary.
+/// replacing operators that lose ordering with their order-preserving
variants
+/// enables us to preserve the previously lost ordering at the input of
`SortExec`.
+/// 4. If the `SortExec` in question turns out to be unnecessary, remove it
and use
+/// updated plan. Otherwise, use the original plan.
+/// 5. Continue the bottom-up traversal until another `SortExec` is seen, or
the traversal
+/// is complete.
+pub(crate) fn replace_with_order_preserving_variants(
+ requirements: OrderPreservationContext,
+ // A flag indicating that replacing `RepartitionExec`s with
+ // `SortPreservingRepartitionExec`s is desirable when it helps
+ // to remove a `SortExec` from the plan. If this flag is `false`,
+ // this replacement should only be made to fix the pipeline (streaming).
+ is_spr_better: bool,
+ // A flag indicating that replacing `CoalescePartitionsExec`s with
+ // `SortPreservingMergeExec`s is desirable when it helps to remove
+ // a `SortExec` from the plan. If this flag is `false`, this replacement
+ // should only be made to fix the pipeline (streaming).
+ is_spm_better: bool,
+) -> Result<Transformed<OrderPreservationContext>> {
+ let plan = &requirements.plan;
+ let ordering_onwards = &requirements.ordering_onwards;
+ if is_sort(plan) {
+ let exec_tree = if let Some(exec_tree) = &ordering_onwards[0] {
+ exec_tree
+ } else {
+ return Ok(Transformed::No(requirements));
+ };
+ // For unbounded cases, replace with the order-preserving variant in
+ // any case, as doing so helps fix the pipeline.
+ let is_unbounded = unbounded_output(plan);
Review Comment:
Yes, code behaves as @ozankabak described. With this semantic, we postpone
the validation of "whether a physical plan can run without braking pipeline" to
`PipelineChecker`. During optimization, we do not restrict ourselves with such
checks. What we care is that, whether relevant executor would receive unbounded
data or not. If after all of the optimization, physical plan still cannot be
run without braking pipeline. Then an error is raised indicating physical plan
breaks pipeline.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]