This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 10eabd6822 [MINOR]: Simplify code, change requirement from
PhysicalSortExpr to PhysicalSortRequirement (#7913)
10eabd6822 is described below
commit 10eabd6822821d537e1148d63e44919607d46b87
Author: Mustafa Akur <[email protected]>
AuthorDate: Tue Oct 24 23:14:57 2023 +0300
[MINOR]: Simplify code, change requirement from PhysicalSortExpr to
PhysicalSortRequirement (#7913)
* simplify code, change requirement from PhysicalSortExpr to
PhysicalSortRequirement
* Remove unnecessary result
---
.../src/physical_optimizer/enforce_distribution.rs | 11 +++-----
.../core/src/physical_optimizer/enforce_sorting.rs | 16 +++++------
.../replace_with_order_preserving_variants.rs | 18 ++++++-------
.../core/src/physical_optimizer/sort_pushdown.rs | 31 ++++++++++------------
datafusion/core/src/physical_optimizer/utils.rs | 15 +++++------
5 files changed, 41 insertions(+), 50 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 89036e9f8c..9cd7eff472 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -55,9 +55,7 @@ use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::{
map_columns_before_projection, ordering_satisfy_requirement_concrete,
};
-use datafusion_physical_expr::{
- expr_list_eq_strict_order, PhysicalExpr, PhysicalSortRequirement,
-};
+use datafusion_physical_expr::{expr_list_eq_strict_order, PhysicalExpr};
use datafusion_physical_plan::unbounded_output;
use datafusion_physical_plan::windows::{get_best_fitting_window,
BoundedWindowAggExec};
@@ -1374,10 +1372,7 @@ fn ensure_distribution(
// make sure ordering requirements are still satisfied
after.
if ordering_satisfied {
// Make sure to satisfy ordering requirement:
- let sort_expr = PhysicalSortRequirement::to_sort_exprs(
- required_input_ordering.clone(),
- );
- add_sort_above(&mut child, sort_expr, None)?;
+ add_sort_above(&mut child, required_input_ordering,
None);
}
}
// Stop tracking distribution changing operators
@@ -1715,7 +1710,7 @@ mod tests {
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr::{
expressions, expressions::binary, expressions::lit,
expressions::Column,
- LexOrdering, PhysicalExpr, PhysicalSortExpr,
+ LexOrdering, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
};
/// Models operators like BoundedWindowExec that require an input
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 92db3bbd05..913dae07fa 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -399,7 +399,11 @@ fn parallelize_sorts(
let mut prev_layer = plan.clone();
update_child_to_remove_coalesce(&mut prev_layer, &mut
coalesce_onwards[0])?;
let (sort_exprs, fetch) = get_sort_exprs(&plan)?;
- add_sort_above(&mut prev_layer, sort_exprs.to_vec(), fetch)?;
+ add_sort_above(
+ &mut prev_layer,
+ &PhysicalSortRequirement::from_sort_exprs(sort_exprs),
+ fetch,
+ );
let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), prev_layer)
.with_fetch(fetch);
return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions {
@@ -456,9 +460,7 @@ fn ensure_sorting(
) {
// Make sure we preserve the ordering requirements:
update_child_to_remove_unnecessary_sort(child,
sort_onwards, &plan)?;
- let sort_expr =
-
PhysicalSortRequirement::to_sort_exprs(required_ordering);
- add_sort_above(child, sort_expr, None)?;
+ add_sort_above(child, &required_ordering, None);
if is_sort(child) {
*sort_onwards = Some(ExecTree::new(child.clone(), idx,
vec![]));
} else {
@@ -468,8 +470,7 @@ fn ensure_sorting(
}
(Some(required), None) => {
// Ordering requirement is not met, we should add a `SortExec`
to the plan.
- let sort_expr =
PhysicalSortRequirement::to_sort_exprs(required);
- add_sort_above(child, sort_expr, None)?;
+ add_sort_above(child, &required, None);
*sort_onwards = Some(ExecTree::new(child.clone(), idx,
vec![]));
}
(None, Some(_)) => {
@@ -603,9 +604,8 @@ fn analyze_window_sort_removal(
.required_input_ordering()
.swap_remove(0)
.unwrap_or_default();
- let sort_expr = PhysicalSortRequirement::to_sort_exprs(reqs);
// Satisfy the ordering requirement so that the window can run:
- add_sort_above(&mut window_child, sort_expr, None)?;
+ add_sort_above(&mut window_child, &reqs, None);
let uses_bounded_memory = window_expr.iter().all(|e|
e.uses_bounded_memory());
let new_window = if uses_bounded_memory {
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index fb1a50e18d..fb75c083a7 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -71,14 +71,15 @@ impl OrderPreservationContext {
// ordering, (or that can maintain ordering with the
replacement of
// its variant)
let plan = item.plan;
+ let children = plan.children();
let ordering_onwards = item.ordering_onwards;
- if plan.children().is_empty() {
+ if children.is_empty() {
// Plan has no children, there is nothing to propagate.
None
} else if ordering_onwards[0].is_none()
&& ((is_repartition(&plan) &&
!plan.maintains_input_order()[0])
|| (is_coalesce_partitions(&plan)
- && plan.children()[0].output_ordering().is_some()))
+ && children[0].output_ordering().is_some()))
{
Some(ExecTree::new(plan, idx, vec![]))
} else {
@@ -175,19 +176,18 @@ fn get_updated_plan(
// When a `RepartitionExec` doesn't preserve ordering, replace it with
// a `SortPreservingRepartitionExec` if appropriate:
if is_repartition(&plan) && !plan.maintains_input_order()[0] &&
is_spr_better {
- let child = plan.children()[0].clone();
- plan = Arc::new(
- RepartitionExec::try_new(child, plan.output_partitioning())?
- .with_preserve_order(true),
- ) as _
+ let child = plan.children().swap_remove(0);
+ let repartition = RepartitionExec::try_new(child,
plan.output_partitioning())?;
+ plan = Arc::new(repartition.with_preserve_order(true)) as _
}
// When the input of a `CoalescePartitionsExec` has an ordering, replace it
// with a `SortPreservingMergeExec` if appropriate:
+ let mut children = plan.children();
if is_coalesce_partitions(&plan)
- && plan.children()[0].output_ordering().is_some()
+ && children[0].output_ordering().is_some()
&& is_spm_better
{
- let child = plan.children()[0].clone();
+ let child = children.swap_remove(0);
plan = Arc::new(SortPreservingMergeExec::new(
child.output_ordering().unwrap_or(&[]).to_vec(),
child,
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index 9b81ad3efb..808c4a3dad 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -139,23 +139,21 @@ pub(crate) fn pushdown_sorts(
|| plan.ordering_equivalence_properties(),
) {
// If the current plan is a SortExec, modify it to satisfy parent
requirements:
- let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
- parent_required.ok_or_else(err)?.iter().cloned(),
- );
+ let parent_required_expr = parent_required.ok_or_else(err)?;
new_plan = sort_exec.input().clone();
- add_sort_above(&mut new_plan, parent_required_expr,
sort_exec.fetch())?;
+ add_sort_above(&mut new_plan, parent_required_expr,
sort_exec.fetch());
};
let required_ordering = new_plan
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs);
// Since new_plan is a SortExec, we can safely get the 0th index.
- let child = &new_plan.children()[0];
+ let child = new_plan.children().swap_remove(0);
if let Some(adjusted) =
- pushdown_requirement_to_children(child,
required_ordering.as_deref())?
+ pushdown_requirement_to_children(&child,
required_ordering.as_deref())?
{
// Can push down requirements
Ok(Transformed::Yes(SortPushDown {
- plan: child.clone(),
+ plan: child,
required_ordering: None,
adjusted_request_ordering: adjusted,
}))
@@ -180,17 +178,15 @@ pub(crate) fn pushdown_sorts(
// Can not satisfy the parent requirements, check whether the
requirements can be pushed down:
if let Some(adjusted) = pushdown_requirement_to_children(plan,
parent_required)? {
Ok(Transformed::Yes(SortPushDown {
- plan: plan.clone(),
+ plan: requirements.plan,
required_ordering: None,
adjusted_request_ordering: adjusted,
}))
} else {
// Can not push down requirements, add new SortExec:
- let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
- parent_required.ok_or_else(err)?.iter().cloned(),
- );
- let mut new_plan = plan.clone();
- add_sort_above(&mut new_plan, parent_required_expr, None)?;
+ let parent_required_expr = parent_required.ok_or_else(err)?;
+ let mut new_plan = requirements.plan;
+ add_sort_above(&mut new_plan, parent_required_expr, None);
Ok(Transformed::Yes(SortPushDown::init(new_plan)))
}
}
@@ -206,7 +202,7 @@ fn pushdown_requirement_to_children(
if is_window(plan) {
let required_input_ordering = plan.required_input_ordering();
let request_child = required_input_ordering[0].as_deref();
- let child_plan = plan.children()[0].clone();
+ let child_plan = plan.children().swap_remove(0);
match determine_children_requirement(parent_required, request_child,
child_plan) {
RequirementsCompatibility::Satisfy => {
Ok(Some(vec![request_child.map(|r| r.to_vec())]))
@@ -355,16 +351,17 @@ fn try_pushdown_requirements_to_join(
|| smj.ordering_equivalence_properties(),
)
.then(|| {
- let required_input_ordering = smj.required_input_ordering();
+ let mut required_input_ordering = smj.required_input_ordering();
let new_req =
Some(PhysicalSortRequirement::from_sort_exprs(&sort_expr));
match push_side {
JoinSide::Left => {
- vec![new_req, required_input_ordering[1].clone()]
+ required_input_ordering[0] = new_req;
}
JoinSide::Right => {
- vec![required_input_ordering[0].clone(), new_req]
+ required_input_ordering[1] = new_req;
}
}
+ required_input_ordering
}))
}
diff --git a/datafusion/core/src/physical_optimizer/utils.rs
b/datafusion/core/src/physical_optimizer/utils.rs
index 0d6c85f9f2..403af4b16e 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -21,7 +21,6 @@ use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
-use crate::error::Result;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::repartition::RepartitionExec;
@@ -31,8 +30,8 @@ use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{displayable, ExecutionPlan};
-use datafusion_physical_expr::utils::ordering_satisfy;
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_expr::utils::ordering_satisfy_requirement;
+use datafusion_physical_expr::PhysicalSortRequirement;
/// This object implements a tree that we use while keeping track of paths
/// leading to [`SortExec`]s.
@@ -101,16 +100,17 @@ pub(crate) fn get_children_exectrees(
/// given ordering requirements while preserving the original partitioning.
pub fn add_sort_above(
node: &mut Arc<dyn ExecutionPlan>,
- sort_expr: Vec<PhysicalSortExpr>,
+ sort_requirement: &[PhysicalSortRequirement],
fetch: Option<usize>,
-) -> Result<()> {
+) {
// If the ordering requirement is already satisfied, do not add a sort.
- if !ordering_satisfy(
+ if !ordering_satisfy_requirement(
node.output_ordering(),
- Some(&sort_expr),
+ Some(sort_requirement),
|| node.equivalence_properties(),
|| node.ordering_equivalence_properties(),
) {
+ let sort_expr =
PhysicalSortRequirement::to_sort_exprs(sort_requirement.to_vec());
let new_sort = SortExec::new(sort_expr,
node.clone()).with_fetch(fetch);
*node = Arc::new(if node.output_partitioning().partition_count() > 1 {
@@ -119,7 +119,6 @@ pub fn add_sort_above(
new_sort
}) as _
}
- Ok(())
}
/// Checks whether the given operator is a limit;