This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ee2498ff7a chore(15003): add identation to plans, to make easier to
read (#15007)
ee2498ff7a is described below
commit ee2498ff7ab6202e023e8251c7b290db9200d9a5
Author: wiedld <[email protected]>
AuthorDate: Wed Mar 5 02:47:04 2025 -0800
chore(15003): add identation to plans, to make easier to read (#15007)
---
.../physical_optimizer/enforce_distribution.rs | 1171 ++++++++++----------
1 file changed, 590 insertions(+), 581 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index fc2394d889..85d826109f 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -19,11 +19,11 @@ use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;
+use crate::physical_optimizer::test_utils::parquet_exec_with_sort;
use crate::physical_optimizer::test_utils::{
check_integrity, coalesce_partitions_exec, repartition_exec, schema,
sort_merge_join_exec, sort_preserving_merge_exec,
};
-use crate::physical_optimizer::test_utils::{parquet_exec_with_sort,
trim_plan_display};
use arrow::compute::SortOptions;
use datafusion::config::ConfigOptions;
@@ -61,7 +61,9 @@ use
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeE
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::ExecutionPlanProperties;
use datafusion_physical_plan::PlanProperties;
-use datafusion_physical_plan::{displayable, DisplayAs, DisplayFormatType,
Statistics};
+use datafusion_physical_plan::{
+ get_plan_string, DisplayAs, DisplayFormatType, Statistics,
+};
/// Models operators like BoundedWindowExec that require an input
/// ordering but is easy to construct
@@ -358,8 +360,7 @@ fn ensure_distribution_helper(
macro_rules! plans_matches_expected {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let physical_plan = $PLAN;
- let formatted =
displayable(physical_plan.as_ref()).indent(true).to_string();
- let actual: Vec<&str> = formatted.trim().lines().collect();
+ let actual = get_plan_string(&physical_plan);
let expected_plan_lines: Vec<&str> = $EXPECTED_LINES
.iter().map(|s| *s).collect();
@@ -485,8 +486,7 @@ macro_rules! assert_optimized {
let optimized = optimizer.optimize(optimized, &config)?;
// Now format correctly
- let plan = displayable(optimized.as_ref()).indent(true).to_string();
- let actual_lines = trim_plan_display(&plan);
+ let actual_lines = get_plan_string(&optimized);
assert_eq!(
&expected_lines, &actual_lines,
@@ -500,8 +500,7 @@ macro_rules! assert_plan_txt {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s|
*s).collect();
// Now format correctly
- let plan = displayable($PLAN.as_ref()).indent(true).to_string();
- let actual_lines = trim_plan_display(&plan);
+ let actual_lines = get_plan_string(&$PLAN);
assert_eq!(
&expected_lines, &actual_lines,
@@ -542,9 +541,11 @@ fn multi_hash_joins() -> Result<()> {
for join_type in join_types {
let join = hash_join_exec(left.clone(), right.clone(), &join_on,
&join_type);
- let join_plan = format!(
- "HashJoinExec: mode=Partitioned, join_type={join_type}, on=[(a@0,
b1@1)]"
- );
+ let join_plan = |shift| -> String {
+ format!("{}HashJoinExec: mode=Partitioned, join_type={join_type},
on=[(a@0, b1@1)]", " ".repeat(shift))
+ };
+ let join_plan_indent2 = join_plan(2);
+ let join_plan_indent4 = join_plan(4);
match join_type {
JoinType::Inner
@@ -572,33 +573,33 @@ fn multi_hash_joins() -> Result<()> {
// Should include 3 RepartitionExecs
JoinType::Inner | JoinType::Left | JoinType::LeftSemi |
JoinType::LeftAnti | JoinType::LeftMark => vec![
top_join_plan.as_str(),
- join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as
c1, d@3 as d1, e@4 as e1]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ &join_plan_indent2,
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
// Should include 4 RepartitionExecs
_ => vec![
top_join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as
c1, d@3 as d1, e@4 as e1]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
+ &join_plan_indent4,
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
};
assert_optimized!(expected, top_join.clone(), true);
@@ -635,34 +636,34 @@ fn multi_hash_joins() -> Result<()> {
JoinType::Inner | JoinType::Right | JoinType::RightSemi |
JoinType::RightAnti =>
vec![
top_join_plan.as_str(),
- join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- "RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- "RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- "RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ &join_plan_indent2,
+ " RepartitionExec: partitioning=Hash([a@0],
10), input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b1@1],
10), input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as
b1, c@2 as c1, d@3 as d1, e@4 as e1]",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
],
// Should include 4 RepartitionExecs
_ =>
vec![
top_join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([b1@6], 10),
input_partitions=10",
- join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- "RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- "RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- "RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b1@6], 10),
input_partitions=10",
+ &join_plan_indent4,
+ " RepartitionExec: partitioning=Hash([a@0],
10), input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b1@1],
10), input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as
b1, c@2 as c1, d@3 as d1, e@4 as e1]",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
],
};
assert_optimized!(expected, top_join.clone(), true);
@@ -710,17 +711,17 @@ fn multi_joins_after_alias() -> Result<()> {
// Output partition need to respect the Alias and should not introduce
additional RepartitionExec
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, c@2)]",
- "ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
- "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
- "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b@1], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
assert_optimized!(expected, top_join.clone(), true);
assert_optimized!(expected, top_join, false);
@@ -736,17 +737,17 @@ fn multi_joins_after_alias() -> Result<()> {
// Output partition need to respect the Alias and should not introduce
additional RepartitionExec
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a2@1, c@2)]",
- "ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
- "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
- "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " ProjectionExec: expr=[a@0 as a1, a@0 as a2]",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b@1], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
assert_optimized!(expected, top_join.clone(), true);
assert_optimized!(expected, top_join, false);
@@ -787,19 +788,19 @@ fn multi_joins_after_multi_alias() -> Result<()> {
// The original Output partition can not satisfy the Join requirements and
need to add an additional RepartitionExec
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, c@2)]",
- "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
- "ProjectionExec: expr=[c1@0 as a]",
- "ProjectionExec: expr=[c@2 as c1]",
- "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]",
- "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
+ " ProjectionExec: expr=[c1@0 as a]",
+ " ProjectionExec: expr=[c@2 as c1]",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0,
b@1)]",
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b@1], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
assert_optimized!(expected, top_join.clone(), true);
@@ -831,16 +832,16 @@ fn join_after_agg_alias() -> Result<()> {
// Only two RepartitionExecs added
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, a2@0)]",
- "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
- "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
- "RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=10",
- "AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
+ " RepartitionExec: partitioning=Hash([a1@0], 10),
input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
+ " RepartitionExec: partitioning=Hash([a2@0], 10),
input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[a@0 as a2], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
assert_optimized!(expected, join.clone(), true);
assert_optimized!(expected, join, false);
@@ -883,17 +884,17 @@ fn hash_join_key_ordering() -> Result<()> {
// Only two RepartitionExecs added
let expected = &[
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b1@1, b@0),
(a1@0, a@1)]",
- "ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
- "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1],
aggr=[]",
- "RepartitionExec: partitioning=Hash([b1@0, a1@1], 10),
input_partitions=10",
- "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a],
aggr=[]",
- "RepartitionExec: partitioning=Hash([b@0, a@1], 10),
input_partitions=10",
- "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
+ " AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as
a1], aggr=[]",
+ " RepartitionExec: partitioning=Hash([b1@0, a1@1], 10),
input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1],
aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a],
aggr=[]",
+ " RepartitionExec: partitioning=Hash([b@0, a@1], 10),
input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
assert_optimized!(expected, join.clone(), true);
assert_optimized!(expected, join, false);
@@ -1002,24 +1003,24 @@ fn multi_hash_join_key_ordering() -> Result<()> {
// The bottom joins' join key ordering is adjusted based on the top join.
And the top join should not introduce additional RepartitionExec
let expected = &[
"FilterExec: c@6 > 1",
- "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(B@2, b1@6),
(C@3, c@2), (AA@1, a1@5)]",
- "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
- "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1),
(c@2, c1@2), (a@0, a1@0)]",
- "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1),
(c@2, c1@2), (a@0, a1@0)]",
- "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(B@2, b1@6),
(C@3, c@2), (AA@1, a1@5)]",
+ " ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1,
b1@1), (c@2, c1@2), (a@0, a1@0)]",
+ " RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1),
(c@2, c1@2), (a@0, a1@0)]",
+ " RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
assert_optimized!(expected, filter_top_join.clone(), true);
assert_optimized!(expected, filter_top_join, false);
@@ -1141,23 +1142,23 @@ fn reorder_join_keys_to_left_input() -> Result<()> {
// The top joins' join key ordering is adjusted based on the children
inputs.
let expected = &[
top_join_plan.as_str(),
- "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
- "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0),
(b@1, b1@1), (c@2, c1@2)]",
- "RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
- "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2),
(b@1, b1@1), (a@0, a1@0)]",
- "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
+ " ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0,
a1@0), (b@1, b1@1), (c@2, c1@2)]",
+ " RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2,
c1@2), (b@1, b1@1), (a@0, a1@0)]",
+ " RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
assert_plan_txt!(expected, reordered);
@@ -1275,23 +1276,23 @@ fn reorder_join_keys_to_right_input() -> Result<()> {
// The top joins' join key ordering is adjusted based on the children
inputs.
let expected = &[
top_join_plan.as_str(),
- "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
- "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0),
(b@1, b1@1)]",
- "RepartitionExec: partitioning=Hash([a@0, b@1], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([a1@0, b1@1], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
- "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2),
(b@1, b1@1), (a@0, a1@0)]",
- "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
+ " ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0,
a1@0), (b@1, b1@1)]",
+ " RepartitionExec: partitioning=Hash([a@0, b@1], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([a1@0, b1@1], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2,
c1@2), (b@1, b1@1), (a@0, a1@0)]",
+ " RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
assert_plan_txt!(expected, reordered);
@@ -1331,7 +1332,15 @@ fn multi_smj_joins() -> Result<()> {
for join_type in join_types {
let join =
sort_merge_join_exec(left.clone(), right.clone(), &join_on,
&join_type);
- let join_plan = format!("SortMergeJoin: join_type={join_type},
on=[(a@0, b1@1)]");
+ let join_plan = |shift| -> String {
+ format!(
+ "{}SortMergeJoin: join_type={join_type}, on=[(a@0, b1@1)]",
+ " ".repeat(shift)
+ )
+ };
+ let join_plan_indent2 = join_plan(2);
+ let join_plan_indent6 = join_plan(6);
+ let join_plan_indent10 = join_plan(10);
// Top join on (a == c)
let top_join_on = vec![(
@@ -1348,20 +1357,20 @@ fn multi_smj_joins() -> Result<()> {
JoinType::Inner | JoinType::Left | JoinType::LeftSemi |
JoinType::LeftAnti =>
vec![
top_join_plan.as_str(),
- join_plan.as_str(),
- "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1,
d@3 as d1, e@4 as e1]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ &join_plan_indent2,
+ " SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
// Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4
SortExecs
// Since ordering of the left child is not preserved after
SortMergeJoin
@@ -1375,22 +1384,22 @@ fn multi_smj_joins() -> Result<()> {
_ => vec![
top_join_plan.as_str(),
// Below 2 operators are differences introduced, when join
mode is changed
- "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- join_plan.as_str(),
- "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1,
d@3 as d1, e@4 as e1]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
+ &join_plan_indent6,
+ " SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
};
assert_optimized!(expected, top_join.clone(), true, true);
@@ -1400,20 +1409,20 @@ fn multi_smj_joins() -> Result<()> {
JoinType::Inner | JoinType::Left | JoinType::LeftSemi |
JoinType::LeftAnti =>
vec![
top_join_plan.as_str(),
- join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1,
d@3 as d1, e@4 as e1]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ &join_plan_indent2,
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
// Should include 8 RepartitionExecs (4 hash, 8 round-robin), 4
SortExecs
// Since ordering of the left child is not preserved after
SortMergeJoin
@@ -1427,24 +1436,24 @@ fn multi_smj_joins() -> Result<()> {
_ => vec![
top_join_plan.as_str(),
// Below 4 operators are differences introduced, when join
mode is changed
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
- "CoalescePartitionsExec",
- join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as
d1, e@4 as e1]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
+ " CoalescePartitionsExec",
+ &join_plan_indent10,
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
};
assert_optimized!(expected_first_sort_enforcement, top_join, false,
true);
@@ -1466,40 +1475,40 @@ fn multi_smj_joins() -> Result<()> {
// Should include 6 RepartitionExecs(3 hash, 3
round-robin) and 3 SortExecs
JoinType::Inner | JoinType::Right => vec![
top_join_plan.as_str(),
- join_plan.as_str(),
- "SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as
c1, d@3 as d1, e@4 as e1]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "SortExec: expr=[c@2 ASC],
preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ &join_plan_indent2,
+ " SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " SortExec: expr=[c@2 ASC],
preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
// Should include 7 RepartitionExecs (4 hash, 3
round-robin) and 4 SortExecs
JoinType::Left | JoinType::Full => vec![
top_join_plan.as_str(),
- "SortExec: expr=[b1@6 ASC],
preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([b1@6], 10),
input_partitions=10",
- join_plan.as_str(),
- "SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as
c1, d@3 as d1, e@4 as e1]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "SortExec: expr=[c@2 ASC],
preserve_partitioning=[true]",
- "RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " SortExec: expr=[b1@6 ASC],
preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([b1@6], 10),
input_partitions=10",
+ &join_plan_indent6,
+ " SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([a@0],
10), input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([b1@1],
10), input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as
b1, c@2 as c1, d@3 as d1, e@4 as e1]",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " SortExec: expr=[c@2 ASC],
preserve_partitioning=[true]",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
// this match arm cannot be reached
_ => unreachable!()
@@ -1510,42 +1519,42 @@ fn multi_smj_joins() -> Result<()> {
// Should include 6 RepartitionExecs (3 of them preserves
order) and 3 SortExecs
JoinType::Inner | JoinType::Right => vec![
top_join_plan.as_str(),
- join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as
c1, d@3 as d1, e@4 as e1]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ &join_plan_indent2,
+ " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
+ " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
+ " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
// Should include 8 RepartitionExecs (4 of them preserves
order) and 4 SortExecs
JoinType::Left | JoinType::Full => vec![
top_join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([b1@6], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[b1@6 ASC],
preserve_partitioning=[false]",
- "CoalescePartitionsExec",
- join_plan.as_str(),
- "RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
- "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as
c1, d@3 as d1, e@4 as e1]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
- "DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b1@6], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " SortExec: expr=[b1@6 ASC],
preserve_partitioning=[false]",
+ " CoalescePartitionsExec",
+ &join_plan_indent10,
+ " RepartitionExec: partitioning=Hash([a@0],
10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
+ " DataSourceExec: file_groups={1
group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec:
partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true,
sort_exprs=b1@1 ASC",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
+ " ProjectionExec: expr=[a@0 as a1,
b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
+ " DataSourceExec: file_groups={1
group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
],
// this match arm cannot be reached
_ => unreachable!()
@@ -1608,47 +1617,47 @@ fn smj_join_key_ordering() -> Result<()> {
// Only two RepartitionExecs added
let expected = &[
"SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
- "SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[true]",
- "ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
- "ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
- "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1],
aggr=[]",
- "RepartitionExec: partitioning=Hash([b1@0, a1@1], 10),
input_partitions=10",
- "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[true]",
- "ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
- "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a],
aggr=[]",
- "RepartitionExec: partitioning=Hash([b@0, a@1], 10),
input_partitions=10",
- "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[true]",
+ " ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
+ " ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
+ " AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1
as a1], aggr=[]",
+ " RepartitionExec: partitioning=Hash([b1@0, a1@1], 10),
input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1],
aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[true]",
+ " ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
+ " AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a],
aggr=[]",
+ " RepartitionExec: partitioning=Hash([b@0, a@1], 10),
input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a],
aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
assert_optimized!(expected, join.clone(), true, true);
let expected_first_sort_enforcement = &[
"SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
- "RepartitionExec: partitioning=Hash([b3@1, a3@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=b3@1 ASC, a3@0 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[b3@1 ASC, a3@0 ASC], preserve_partitioning=[false]",
- "CoalescePartitionsExec",
- "ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
- "ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
- "AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1, a1@1 as a1],
aggr=[]",
- "RepartitionExec: partitioning=Hash([b1@0, a1@1], 10),
input_partitions=10",
- "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "RepartitionExec: partitioning=Hash([b2@1, a2@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=b2@1 ASC, a2@0 ASC",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[b2@1 ASC, a2@0 ASC], preserve_partitioning=[false]",
- "CoalescePartitionsExec",
- "ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
- "AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1 as a],
aggr=[]",
- "RepartitionExec: partitioning=Hash([b@0, a@1], 10),
input_partitions=10",
- "AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b3@1, a3@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=b3@1 ASC, a3@0 ASC",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " SortExec: expr=[b3@1 ASC, a3@0 ASC],
preserve_partitioning=[false]",
+ " CoalescePartitionsExec",
+ " ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]",
+ " ProjectionExec: expr=[a1@1 as a1, b1@0 as b1]",
+ " AggregateExec: mode=FinalPartitioned, gby=[b1@0 as b1,
a1@1 as a1], aggr=[]",
+ " RepartitionExec: partitioning=Hash([b1@0, a1@1], 10),
input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as
a1], aggr=[]",
+ " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([b2@1, a2@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=b2@1 ASC, a2@0 ASC",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " SortExec: expr=[b2@1 ASC, a2@0 ASC],
preserve_partitioning=[false]",
+ " CoalescePartitionsExec",
+ " ProjectionExec: expr=[a@1 as a2, b@0 as b2]",
+ " AggregateExec: mode=FinalPartitioned, gby=[b@0 as b, a@1
as a], aggr=[]",
+ " RepartitionExec: partitioning=Hash([b@0, a@1], 10),
input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[b@1 as b, a@0 as
a], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
assert_optimized!(expected_first_sort_enforcement, join, false, true);
@@ -1678,8 +1687,8 @@ fn merge_does_not_need_sort() -> Result<()> {
// data is already sorted
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
- "CoalesceBatchesExec: target_batch_size=4096",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
assert_optimized!(expected, exec, true);
@@ -1689,9 +1698,9 @@ fn merge_does_not_need_sort() -> Result<()> {
// SortExec at the top.
let expected = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
- "CoalescePartitionsExec",
- "CoalesceBatchesExec: target_batch_size=4096",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
+ " CoalescePartitionsExec",
+ " CoalesceBatchesExec: target_batch_size=4096",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
assert_optimized!(expected, exec, false);
@@ -1721,18 +1730,18 @@ fn union_to_interleave() -> Result<()> {
// Only two RepartitionExecs added, no final RepartitionExec required
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
- "AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
- "InterleaveExec",
- "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
- "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
- "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
+ " InterleaveExec",
+ " AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1],
aggr=[]",
+ " RepartitionExec: partitioning=Hash([a1@0], 10),
input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1],
aggr=[]",
+ " RepartitionExec: partitioning=Hash([a1@0], 10),
input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan.clone(), false);
@@ -1763,19 +1772,19 @@ fn union_not_to_interleave() -> Result<()> {
// Only two RepartitionExecs added, no final RepartitionExec required
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a2@0 as a2], aggr=[]",
- "RepartitionExec: partitioning=Hash([a2@0], 10), input_partitions=20",
- "AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
- "UnionExec",
- "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
- "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
- "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([a2@0], 10),
input_partitions=20",
+ " AggregateExec: mode=Partial, gby=[a1@0 as a2], aggr=[]",
+ " UnionExec",
+ " AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1],
aggr=[]",
+ " RepartitionExec: partitioning=Hash([a1@0], 10),
input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1],
aggr=[]",
+ " RepartitionExec: partitioning=Hash([a1@0], 10),
input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
// no sort in the plan but since we need it as a parameter, make it
default false
let prefer_existing_sort = false;
@@ -1807,10 +1816,10 @@ fn added_repartition_to_single_partition() ->
Result<()> {
let expected = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
@@ -1825,11 +1834,11 @@ fn repartition_deepest_node() -> Result<()> {
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
@@ -1844,12 +1853,12 @@ fn repartition_unsorted_limit() -> Result<()> {
let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
- "CoalescePartitionsExec",
- "LocalLimitExec: fetch=100",
- "FilterExec: c@2 = 0",
+ " CoalescePartitionsExec",
+ " LocalLimitExec: fetch=100",
+ " FilterExec: c@2 = 0",
// nothing sorts the data, so the local limit doesn't require sorted
data either
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
@@ -1869,10 +1878,10 @@ fn repartition_sorted_limit() -> Result<()> {
let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
- "LocalLimitExec: fetch=100",
+ " LocalLimitExec: fetch=100",
// data is sorted so can't repartition here
- "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
@@ -1894,12 +1903,12 @@ fn repartition_sorted_limit_with_filter() -> Result<()>
{
let expected = &[
"SortRequiredExec: [c@2 ASC]",
- "FilterExec: c@2 = 0",
+ " FilterExec: c@2 = 0",
// We can use repartition here, ordering requirement by
SortRequiredExec
// is still satisfied.
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
@@ -1918,19 +1927,19 @@ fn repartition_ignores_limit() -> Result<()> {
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "GlobalLimitExec: skip=0, fetch=100",
- "CoalescePartitionsExec",
- "LocalLimitExec: fetch=100",
- "FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " GlobalLimitExec: skip=0, fetch=100",
+ " CoalescePartitionsExec",
+ " LocalLimitExec: fetch=100",
+ " FilterExec: c@2 = 0",
// repartition should happen prior to the filter to maximize
parallelism
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "GlobalLimitExec: skip=0, fetch=100",
- "LocalLimitExec: fetch=100",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " GlobalLimitExec: skip=0, fetch=100",
+ " LocalLimitExec: fetch=100",
// Expect no repartition to happen for local limit
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
@@ -1945,11 +1954,11 @@ fn repartition_ignores_union() -> Result<()> {
let expected = &[
"UnionExec",
// Expect no repartition of DataSourceExec
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
@@ -1971,7 +1980,7 @@ fn repartition_through_sort_preserving_merge() ->
Result<()> {
// need resort as the data was not sorted correctly
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
@@ -1996,15 +2005,15 @@ fn repartition_ignores_sort_preserving_merge() ->
Result<()> {
// should not repartition, since increased parallelism is not beneficial
for SortPReservingMerge
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
- "CoalescePartitionsExec",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ " CoalescePartitionsExec",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
assert_optimized!(expected, plan, false);
@@ -2025,19 +2034,19 @@ fn
repartition_ignores_sort_preserving_merge_with_union() -> Result<()> {
// should not repartition / sort (as the data was already sorted)
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
- "UnionExec",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=parquet",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=parquet",
+ " UnionExec",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
- "CoalescePartitionsExec",
- "UnionExec",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=parquet",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=parquet",
+ " CoalescePartitionsExec",
+ " UnionExec",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
assert_optimized!(expected, plan, false);
@@ -2061,9 +2070,9 @@ fn repartition_does_not_destroy_sort() -> Result<()> {
// during repartitioning ordering is preserved
let expected = &[
"SortRequiredExec: [d@3 ASC]",
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[d@3 ASC], file_type=parquet",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true, true);
@@ -2100,12 +2109,12 @@ fn repartition_does_not_destroy_sort_more_complex() ->
Result<()> {
let expected = &[
"UnionExec",
// union input 1: no repartitioning
- "SortRequiredExec: [c@2 ASC]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=parquet",
+ " SortRequiredExec: [c@2 ASC]",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
// union input 2: should repartition
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
@@ -2134,22 +2143,22 @@ fn repartition_transitively_with_projection() ->
Result<()> {
let expected = &[
"SortPreservingMergeExec: [sum@0 ASC]",
- "SortExec: expr=[sum@0 ASC], preserve_partitioning=[true]",
+ " SortExec: expr=[sum@0 ASC], preserve_partitioning=[true]",
// Since this projection is not trivial, increasing parallelism is
beneficial
- "ProjectionExec: expr=[a@0 + b@1 as sum]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " ProjectionExec: expr=[a@0 + b@1 as sum]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
let expected_first_sort_enforcement = &[
"SortExec: expr=[sum@0 ASC], preserve_partitioning=[false]",
- "CoalescePartitionsExec",
+ " CoalescePartitionsExec",
// Since this projection is not trivial, increasing parallelism is
beneficial
- "ProjectionExec: expr=[a@0 + b@1 as sum]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " ProjectionExec: expr=[a@0 + b@1 as sum]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
assert_optimized!(expected_first_sort_enforcement, plan, false);
@@ -2180,8 +2189,8 @@ fn repartition_ignores_transitively_with_projection() ->
Result<()> {
let expected = &[
"SortRequiredExec: [c@2 ASC]",
// Since this projection is trivial, increasing parallelism is not
beneficial
- "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
@@ -2213,8 +2222,8 @@ fn repartition_transitively_past_sort_with_projection()
-> Result<()> {
let expected = &[
"SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
// Since this projection is trivial, increasing parallelism is not
beneficial
- "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
assert_optimized!(expected, plan, false);
@@ -2233,22 +2242,22 @@ fn repartition_transitively_past_sort_with_filter() ->
Result<()> {
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
- "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
+ " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
// Expect repartition on the input to the sort (as it can benefit from
additional parallelism)
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
let expected_first_sort_enforcement = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
- "CoalescePartitionsExec",
- "FilterExec: c@2 = 0",
+ " CoalescePartitionsExec",
+ " FilterExec: c@2 = 0",
// Expect repartition on the input of the filter (as it can benefit
from additional parallelism)
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet",
];
assert_optimized!(expected_first_sort_enforcement, plan, false);
@@ -2279,23 +2288,23 @@ fn
repartition_transitively_past_sort_with_projection_and_filter() -> Result<()>
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
// Expect repartition on the input to the sort (as it can benefit from
additional parallelism)
- "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
- "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
- "FilterExec: c@2 = 0",
+ " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
+ " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
+ " FilterExec: c@2 = 0",
// repartition is lowest down
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
assert_optimized!(expected, plan.clone(), true);
let expected_first_sort_enforcement = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
- "CoalescePartitionsExec",
- "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " CoalescePartitionsExec",
+ " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
assert_optimized!(expected_first_sort_enforcement, plan, false);
@@ -2310,15 +2319,15 @@ fn parallelization_single_partition() -> Result<()> {
let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- "DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " DataSourceExec: file_groups={2 groups: [[x:0..50],
[x:50..100]]}, projection=[a, b, c, d, e], file_type=parquet",
];
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- "DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]},
projection=[a, b, c, d, e], file_type=csv, has_header=false",
+ " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " DataSourceExec: file_groups={2 groups: [[x:0..50],
[x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true,
10);
assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10);
@@ -2342,8 +2351,8 @@ fn parallelization_multiple_files() -> Result<()> {
// https://github.com/apache/datafusion/issues/8451
let expected = [
"SortRequiredExec: [a@0 ASC]",
- "FilterExec: c@2 = 0",
- "DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100],
[x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC],
file_type=parquet", ];
+ " FilterExec: c@2 = 0",
+ " DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100],
[x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC],
file_type=parquet", ];
let target_partitions = 3;
let repartition_size = 1;
assert_optimized!(
@@ -2359,8 +2368,8 @@ fn parallelization_multiple_files() -> Result<()> {
let expected = [
"SortRequiredExec: [a@0 ASC]",
- "FilterExec: c@2 = 0",
- "DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25],
[x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
+ " FilterExec: c@2 = 0",
+ " DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25],
[x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
let target_partitions = 8;
let repartition_size = 1;
@@ -2392,17 +2401,17 @@ fn parallelization_compressed_csv() -> Result<()> {
let expected_not_partitioned = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=csv, has_header=false",
+ " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=csv, has_header=false",
];
let expected_partitioned = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- "DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]},
projection=[a, b, c, d, e], file_type=csv, has_header=false",
+ " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " DataSourceExec: file_groups={2 groups: [[x:0..50],
[x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
];
for compression_type in compression_types {
@@ -2436,17 +2445,17 @@ fn parallelization_two_partitions() -> Result<()> {
let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
// Plan already has two partitions
- "DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]},
projection=[a, b, c, d, e], file_type=parquet",
+ " DataSourceExec: file_groups={2 groups: [[x:0..100],
[y:0..100]]}, projection=[a, b, c, d, e], file_type=parquet",
];
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
// Plan already has two partitions
- "DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]},
projection=[a, b, c, d, e], file_type=csv, has_header=false",
+ " DataSourceExec: file_groups={2 groups: [[x:0..100],
[y:0..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true,
10);
assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10);
@@ -2461,17 +2470,17 @@ fn parallelization_two_partitions_into_four() ->
Result<()> {
let expected_parquet = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
// Multiple source files splitted across partitions
- "DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100],
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=parquet",
+ " DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100],
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=parquet",
];
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
// Multiple source files splitted across partitions
- "DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100],
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv,
has_header=false",
+ " DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100],
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv,
has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true, false, 4, true,
10);
assert_optimized!(expected_csv, plan_csv, true, false, 4, true, 10);
@@ -2491,19 +2500,19 @@ fn parallelization_sorted_limit() -> Result<()> {
let expected_parquet = &[
"GlobalLimitExec: skip=0, fetch=100",
- "LocalLimitExec: fetch=100",
+ " LocalLimitExec: fetch=100",
// data is sorted so can't repartition here
- "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+ " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
// Doesn't parallelize for SortExec without preserve_partitioning
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
let expected_csv = &[
"GlobalLimitExec: skip=0, fetch=100",
- "LocalLimitExec: fetch=100",
+ " LocalLimitExec: fetch=100",
// data is sorted so can't repartition here
- "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+ " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
// Doesn't parallelize for SortExec without preserve_partitioning
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=csv, has_header=false",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=csv, has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
@@ -2527,27 +2536,27 @@ fn parallelization_limit_with_filter() -> Result<()> {
let expected_parquet = &[
"GlobalLimitExec: skip=0, fetch=100",
- "CoalescePartitionsExec",
- "LocalLimitExec: fetch=100",
- "FilterExec: c@2 = 0",
+ " CoalescePartitionsExec",
+ " LocalLimitExec: fetch=100",
+ " FilterExec: c@2 = 0",
// even though data is sorted, we can use repartition here. Since
// ordering is not used in subsequent stages anyway.
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
// SortExec doesn't benefit from input partitioning
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
let expected_csv = &[
"GlobalLimitExec: skip=0, fetch=100",
- "CoalescePartitionsExec",
- "LocalLimitExec: fetch=100",
- "FilterExec: c@2 = 0",
+ " CoalescePartitionsExec",
+ " LocalLimitExec: fetch=100",
+ " FilterExec: c@2 = 0",
// even though data is sorted, we can use repartition here. Since
// ordering is not used in subsequent stages anyway.
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
// SortExec doesn't benefit from input partitioning
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=csv, has_header=false",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=csv, has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
@@ -2567,35 +2576,35 @@ fn parallelization_ignores_limit() -> Result<()> {
let expected_parquet = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "GlobalLimitExec: skip=0, fetch=100",
- "CoalescePartitionsExec",
- "LocalLimitExec: fetch=100",
- "FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " GlobalLimitExec: skip=0, fetch=100",
+ " CoalescePartitionsExec",
+ " LocalLimitExec: fetch=100",
+ " FilterExec: c@2 = 0",
// repartition should happen prior to the filter to maximize
parallelism
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "GlobalLimitExec: skip=0, fetch=100",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " GlobalLimitExec: skip=0, fetch=100",
// Limit doesn't benefit from input partitioning - no parallelism
- "LocalLimitExec: fetch=100",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " LocalLimitExec: fetch=100",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
];
let expected_csv = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "GlobalLimitExec: skip=0, fetch=100",
- "CoalescePartitionsExec",
- "LocalLimitExec: fetch=100",
- "FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " GlobalLimitExec: skip=0, fetch=100",
+ " CoalescePartitionsExec",
+ " LocalLimitExec: fetch=100",
+ " FilterExec: c@2 = 0",
// repartition should happen prior to the filter to maximize
parallelism
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "GlobalLimitExec: skip=0, fetch=100",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " GlobalLimitExec: skip=0, fetch=100",
// Limit doesn't benefit from input partitioning - no parallelism
- "LocalLimitExec: fetch=100",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=csv, has_header=false",
+ " LocalLimitExec: fetch=100",
+ " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=csv, has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
@@ -2611,20 +2620,20 @@ fn parallelization_union_inputs() -> Result<()> {
let expected_parquet = &[
"UnionExec",
// Union doesn't benefit from input partitioning - no parallelism
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet",
];
let expected_csv = &[
"UnionExec",
// Union doesn't benefit from input partitioning - no parallelism
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=csv, has_header=false",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=csv, has_header=false",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=csv, has_header=false",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=csv, has_header=false",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=csv, has_header=false",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=csv, has_header=false",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=csv, has_header=false",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=csv, has_header=false",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=csv, has_header=false",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=csv, has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
@@ -2678,15 +2687,15 @@ fn parallelization_sort_preserving_merge_with_union()
-> Result<()> {
// should not sort (as the data was already sorted)
let expected_parquet = &[
"SortPreservingMergeExec: [c@2 ASC]",
- "UnionExec",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=parquet",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=parquet",
+ " UnionExec",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
let expected_csv = &[
"SortPreservingMergeExec: [c@2 ASC]",
- "UnionExec",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
+ " UnionExec",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
@@ -2713,11 +2722,11 @@ fn parallelization_does_not_benefit() -> Result<()> {
// no parallelization, because SortRequiredExec doesn't benefit from
increased parallelism
let expected_parquet = &[
"SortRequiredExec: [c@2 ASC]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
let expected_csv = &[
"SortRequiredExec: [c@2 ASC]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true);
assert_optimized!(expected_csv, plan_csv, true);
@@ -2757,7 +2766,7 @@ fn
parallelization_ignores_transitively_with_projection_parquet() -> Result<()>
// data should not be repartitioned / resorted
let expected_parquet = &[
"ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=parquet",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
assert_optimized!(expected_parquet, plan_parquet, true);
@@ -2796,7 +2805,7 @@ fn
parallelization_ignores_transitively_with_projection_csv() -> Result<()> {
// data should not be repartitioned / resorted
let expected_csv = &[
"ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
];
assert_optimized!(expected_csv, plan_csv, true);
@@ -2819,8 +2828,8 @@ fn remove_redundant_roundrobins() -> Result<()> {
let expected = &[
"FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
@@ -2842,9 +2851,9 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> {
// This is still satisfied since, after filter that column is constant.
let expected = &[
"CoalescePartitionsExec",
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
// last flag sets config.optimizer.PREFER_EXISTING_SORT
assert_optimized!(expected, physical_plan.clone(), true, true);
@@ -2865,9 +2874,9 @@ fn preserve_ordering_through_repartition() -> Result<()> {
let expected = &[
"SortPreservingMergeExec: [d@3 ASC]",
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
];
// last flag sets config.optimizer.PREFER_EXISTING_SORT
assert_optimized!(expected, physical_plan.clone(), true, true);
@@ -2888,20 +2897,20 @@ fn do_not_preserve_ordering_through_repartition() ->
Result<()> {
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
- "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
+ " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
assert_optimized!(expected, physical_plan.clone(), true);
let expected = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
- "CoalescePartitionsExec",
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
+ " CoalescePartitionsExec",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
assert_optimized!(expected, physical_plan, false);
@@ -2922,9 +2931,9 @@ fn no_need_for_sort_after_filter() -> Result<()> {
// After CoalescePartitionsExec c is still constant. Hence c@2 ASC
ordering is already satisfied.
"CoalescePartitionsExec",
// Since after this stage c is constant. c@2 ASC ordering is already
satisfied.
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
@@ -2949,21 +2958,21 @@ fn do_not_preserve_ordering_through_repartition2() ->
Result<()> {
let expected = &[
"SortPreservingMergeExec: [a@0 ASC]",
- "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
assert_optimized!(expected, physical_plan.clone(), true);
let expected = &[
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
- "CoalescePartitionsExec",
- "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ " CoalescePartitionsExec",
+ " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
assert_optimized!(expected, physical_plan, false);
@@ -2982,8 +2991,8 @@ fn do_not_preserve_ordering_through_repartition3() ->
Result<()> {
let expected = &[
"FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
@@ -3004,8 +3013,8 @@ fn do_not_put_sort_when_input_is_invalid() -> Result<()> {
// Ordering requirement of sort required exec is NOT satisfied
// by existing ordering at the source.
"SortRequiredExec: [a@0 ASC]",
- "FilterExec: c@2 = 0",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " FilterExec: c@2 = 0",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
assert_plan_txt!(expected, physical_plan);
@@ -3013,9 +3022,9 @@ fn do_not_put_sort_when_input_is_invalid() -> Result<()> {
"SortRequiredExec: [a@0 ASC]",
// Since at the start of the rule ordering requirement is not satisfied
// EnforceDistribution rule doesn't satisfy this requirement either.
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
let mut config = ConfigOptions::new();
@@ -3042,8 +3051,8 @@ fn put_sort_when_input_is_valid() -> Result<()> {
// Ordering requirement of sort required exec is satisfied
// by existing ordering at the source.
"SortRequiredExec: [a@0 ASC]",
- "FilterExec: c@2 = 0",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
+ " FilterExec: c@2 = 0",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
];
assert_plan_txt!(expected, physical_plan);
@@ -3051,8 +3060,8 @@ fn put_sort_when_input_is_valid() -> Result<()> {
// Since at the start of the rule ordering requirement is satisfied
// EnforceDistribution rule satisfy this requirement also.
"SortRequiredExec: [a@0 ASC]",
- "FilterExec: c@2 = 0",
- "DataSourceExec: file_groups={10 groups: [[x:0..20], [y:0..20],
[x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80],
[x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0
ASC], file_type=parquet",
+ " FilterExec: c@2 = 0",
+ " DataSourceExec: file_groups={10 groups: [[x:0..20], [y:0..20],
[x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80],
[x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0
ASC], file_type=parquet",
];
let mut config = ConfigOptions::new();
@@ -3078,8 +3087,8 @@ fn do_not_add_unnecessary_hash() -> Result<()> {
let expected = &[
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], output_ordering=[c@2 ASC], file_type=parquet",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
// Make sure target partition number is 1. In this case hash repartition
is unnecessary
assert_optimized!(expected, physical_plan.clone(), true, false, 1, false,
1024);
@@ -3104,12 +3113,12 @@ fn do_not_add_unnecessary_hash2() -> Result<()> {
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
// Since hash requirements of this operator is satisfied. There
shouldn't be
// a hash repartition here
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4",
- "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- "RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2",
- "DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b,
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
+ " RepartitionExec: partitioning=Hash([a@0], 4),
input_partitions=4",
+ " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=2",
+ " DataSourceExec: file_groups={2 groups: [[x], [y]]},
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
];
// Make sure target partition number is larger than 2 (e.g partition
number at the source).
assert_optimized!(expected, physical_plan.clone(), true, false, 4, false,
1024);
@@ -3153,9 +3162,9 @@ fn optimize_away_unnecessary_repartition2() -> Result<()>
{
let expected = &[
"FilterExec: c@2 = 0",
- "FilterExec: c@2 = 0",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet",
+ " FilterExec: c@2 = 0",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet",
];
assert_optimized!(expected, physical_plan.clone(), true);
assert_optimized!(expected, physical_plan, false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]