ozankabak commented on code in PR #7090:
URL: https://github.com/apache/arrow-datafusion/pull/7090#discussion_r1273941523
##########
datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs:
##########
@@ -15,136 +15,256 @@
// specific language governing permissions and limitations
// under the License.
-//! Repartition optimizer that replaces `SortExec`s and their suitable
`RepartitionExec` children with `SortPreservingRepartitionExec`s.
+//! Optimizer rule that replaces executors that lose ordering with their
+//! order-preserving variants when it is helpful; either in terms of
+//! performance or to accommodate unbounded streams by fixing the pipeline.
+
use crate::error::Result;
-use crate::physical_optimizer::sort_enforcement::unbounded_output;
+use crate::physical_optimizer::sort_enforcement::{unbounded_output, ExecTree};
+use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort};
use crate::physical_plan::repartition::RepartitionExec;
-use crate::physical_plan::sorts::sort::SortExec;
-use crate::physical_plan::ExecutionPlan;
+use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use super::utils::is_repartition;
-use datafusion_common::tree_node::Transformed;
+use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_physical_expr::utils::ordering_satisfy;
-use itertools::enumerate;
use std::sync::Arc;
-/// Creates a `SortPreservingRepartitionExec` from given `RepartitionExec`
-fn sort_preserving_repartition(
- repartition: &RepartitionExec,
-) -> Result<Arc<RepartitionExec>> {
- Ok(Arc::new(
- RepartitionExec::try_new(
- repartition.input().clone(),
- repartition.partitioning().clone(),
- )?
- .with_preserve_order(),
- ))
+/// For a given `plan`, this object carries the information one needs from its
+/// descendants to decide whether it is beneficial to replace order-losing (but
+/// somewhat faster) variants of certain operators with their order-preserving
+/// (but somewhat slower) cousins.
+#[derive(Debug, Clone)]
+pub(crate) struct OrderPreservationContext {
+ pub(crate) plan: Arc<dyn ExecutionPlan>,
+ ordering_onwards: Vec<Option<ExecTree>>,
}
-fn does_plan_maintain_input_order(plan: &Arc<dyn ExecutionPlan>) -> bool {
- plan.maintains_input_order().iter().any(|flag| *flag)
-}
+impl OrderPreservationContext {
+ /// Creates a "default" order-preservation context.
+ pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
+ let length = plan.children().len();
+ OrderPreservationContext {
+ plan,
+ ordering_onwards: vec![None; length],
+ }
+ }
-/// Check the children nodes of a `SortExec` until ordering is lost (e.g. until
-/// another `SortExec` or a `CoalescePartitionsExec` which doesn't maintain
ordering)
-/// and replace `RepartitionExec`s that do not maintain ordering (e.g. those
whose
-/// input partition counts are larger than unity) with
`SortPreservingRepartitionExec`s.
-/// Note that doing this may render the `SortExec` in question unneccessary,
which will
-/// be removed later on.
-///
-/// For example, we transform the plan below
-/// "FilterExec: c@2 > 3",
-/// " RepartitionExec: partitioning=Hash(\[b@0], 16), input_partitions=16",
-/// " RepartitionExec: partitioning=Hash(\[a@0], 16), input_partitions=1",
-/// " MemoryExec: partitions=1,
partition_sizes=\[(<depends_on_batch_size>)], output_ordering:
\[PhysicalSortExpr { expr: Column { name: \"a\", index: 0 }, options:
SortOptions { descending: false, nulls_first: false } }]",
-/// into
-/// "FilterExec: c@2 > 3",
-/// " SortPreservingRepartitionExec: partitioning=Hash(\[b@0], 16),
input_partitions=16",
-/// " RepartitionExec: partitioning=Hash(\[a@0], 16), input_partitions=1",
-/// " MemoryExec: partitions=1,
partition_sizes=\[<depends_on_batch_size>], output_ordering: \[PhysicalSortExpr
{ expr: Column { name: \"a\", index: 0 }, options: SortOptions { descending:
false, nulls_first: false } }]",
-/// where the `FilterExec` in the latter has output ordering `a ASC`. This
ordering will
-/// potentially remove a `SortExec` at the top of `FilterExec`. If this
doesn't help remove
-/// a `SortExec`, the old version is used.
-fn replace_sort_children(
- plan: &Arc<dyn ExecutionPlan>,
-) -> Result<Arc<dyn ExecutionPlan>> {
- if plan.children().is_empty() {
- return Ok(plan.clone());
+ /// Creates a new order-preservation context from those of children nodes.
+ pub fn new_from_children_nodes(
+ children_nodes: Vec<OrderPreservationContext>,
+ parent_plan: Arc<dyn ExecutionPlan>,
+ ) -> Result<Self> {
+ let children_plans = children_nodes
+ .iter()
+ .map(|item| item.plan.clone())
+ .collect();
+ let ordering_onwards = children_nodes
+ .into_iter()
+ .enumerate()
+ .map(|(idx, item)| {
+ // Leaves of the `coalesce_onwards` tree are
`CoalescePartitionsExec`
+ // operators. This tree collects all the intermediate
executors that
+ // maintain a single partition. If we just saw a
`CoalescePartitionsExec`
+ // operator, we reset the tree and start accumulating.
Review Comment:
This comment is outdated and confusing, let's remove and replace with an
appropriate one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]