This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new acdd263812 Complete migrating `enforce_distrubution` tests to insta
(#18185)
acdd263812 is described below
commit acdd263812eb449c856d74e1d0395dd1705f0cd7
Author: Dmitrii Blaginin <[email protected]>
AuthorDate: Wed Nov 5 10:08:40 2025 +0000
Complete migrating `enforce_distrubution` tests to insta (#18185)
- Closes https://github.com/apache/datafusion/issues/15791
- Closes https://github.com/apache/datafusion/issues/15178 🥳
- Surpasses part of https://github.com/apache/datafusion/pull/16978
---------
Co-authored-by: Claude <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
.../physical_optimizer/enforce_distribution.rs | 634 +++++++++------------
1 file changed, 282 insertions(+), 352 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index db011c4be4..5b7d9ac8fb 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -66,8 +66,8 @@ use datafusion_physical_plan::projection::{ProjectionExec,
ProjectionExpr};
use
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::{
- displayable, get_plan_string, DisplayAs, DisplayFormatType,
ExecutionPlanProperties,
- PlanProperties, Statistics,
+ displayable, DisplayAs, DisplayFormatType, ExecutionPlanProperties,
PlanProperties,
+ Statistics,
};
use insta::Settings;
@@ -469,83 +469,6 @@ impl TestConfig {
self
}
- // This be deleted in https://github.com/apache/datafusion/pull/18185
- /// Perform a series of runs using the current [`TestConfig`],
- /// assert the expected plan result,
- /// and return the result plan (for potential subsequent runs).
- fn run(
- &self,
- expected_lines: &[&str],
- plan: Arc<dyn ExecutionPlan>,
- optimizers_to_run: &[Run],
- ) -> Result<Arc<dyn ExecutionPlan>> {
- let expected_lines: Vec<&str> = expected_lines.to_vec();
-
- // Add the ancillary output requirements operator at the start:
- let optimizer = OutputRequirements::new_add_mode();
- let mut optimized = optimizer.optimize(plan.clone(), &self.config)?;
-
- // This file has 2 rules that use tree node, apply these rules to
original plan consecutively
- // After these operations tree nodes should be in a consistent state.
- // This code block makes sure that these rules doesn't violate tree
node integrity.
- {
- let adjusted = if
self.config.optimizer.top_down_join_key_reordering {
- // Run adjust_input_keys_ordering rule
- let plan_requirements =
- PlanWithKeyRequirements::new_default(plan.clone());
- let adjusted = plan_requirements
- .transform_down(adjust_input_keys_ordering)
- .data()
- .and_then(check_integrity)?;
- // TODO: End state payloads will be checked here.
- adjusted.plan
- } else {
- // Run reorder_join_keys_to_inputs rule
- plan.clone()
- .transform_up(|plan| {
-
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
- })
- .data()?
- };
-
- // Then run ensure_distribution rule
- DistributionContext::new_default(adjusted)
- .transform_up(|distribution_context| {
- ensure_distribution(distribution_context, &self.config)
- })
- .data()
- .and_then(check_integrity)?;
- // TODO: End state payloads will be checked here.
- }
-
- for run in optimizers_to_run {
- optimized = match run {
- Run::Distribution => {
- let optimizer = EnforceDistribution::new();
- optimizer.optimize(optimized, &self.config)?
- }
- Run::Sorting => {
- let optimizer = EnforceSorting::new();
- optimizer.optimize(optimized, &self.config)?
- }
- };
- }
-
- // Remove the ancillary output requirements operator when done:
- let optimizer = OutputRequirements::new_remove_mode();
- let optimized = optimizer.optimize(optimized, &self.config)?;
-
- // Now format correctly
- let actual_lines = get_plan_string(&optimized);
-
- assert_eq!(
- &expected_lines, &actual_lines,
-
"\n\nexpected:\n\n{expected_lines:#?}\nactual:\n\n{actual_lines:#?}\n\n"
- );
-
- Ok(optimized)
- }
-
/// Perform a series of runs using the current [`TestConfig`],
/// assert the expected plan result,
/// and return the result plan (for potential subsequent runs).
@@ -1503,15 +1426,6 @@ fn multi_smj_joins() -> Result<()> {
for join_type in join_types {
let join =
sort_merge_join_exec(left.clone(), right.clone(), &join_on,
&join_type);
- let join_plan = |shift| -> String {
- format!(
- "{}SortMergeJoin: join_type={join_type}, on=[(a@0, b1@1)]",
- " ".repeat(shift)
- )
- };
- let join_plan_indent2 = join_plan(2);
- let join_plan_indent6 = join_plan(6);
- let join_plan_indent10 = join_plan(10);
// Top join on (a == c)
let top_join_on = vec![(
@@ -1520,235 +1434,246 @@ fn multi_smj_joins() -> Result<()> {
)];
let top_join =
sort_merge_join_exec(join.clone(), parquet_exec(), &top_join_on,
&join_type);
- let top_join_plan =
- format!("SortMergeJoin: join_type={join_type}, on=[(a@0, c@2)]");
-
- let expected = match join_type {
- // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3
SortExecs
- JoinType::Inner | JoinType::Left | JoinType::LeftSemi |
JoinType::LeftAnti =>
- vec![
- top_join_plan.as_str(),
- &join_plan_indent2,
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4
SortExecs
- // Since ordering of the left child is not preserved after
SortMergeJoin
- // when mode is Right, RightSemi, RightAnti, Full
- // - We need to add one additional SortExec after SortMergeJoin in
contrast the test cases
- // when mode is Inner, Left, LeftSemi, LeftAnti
- // Similarly, since partitioning of the left side is not preserved
- // when mode is Right, RightSemi, RightAnti, Full
- // - We need to add one additional Hash Repartition after
SortMergeJoin in contrast the test
- // cases when mode is Inner, Left, LeftSemi, LeftAnti
- _ => vec![
- top_join_plan.as_str(),
- // Below 2 operators are differences introduced, when join
mode is changed
- " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- &join_plan_indent6,
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- };
- // TODO(wiedld): show different test result if enforce sorting first.
- test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
-
- let expected_first_sort_enforcement = match join_type {
- // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3
SortExecs
- JoinType::Inner | JoinType::Left | JoinType::LeftSemi |
JoinType::LeftAnti =>
- vec![
- top_join_plan.as_str(),
- &join_plan_indent2,
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
- " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- // Should include 8 RepartitionExecs (4 hash, 8 round-robin), 4
SortExecs
- // Since ordering of the left child is not preserved after
SortMergeJoin
- // when mode is Right, RightSemi, RightAnti, Full
- // - We need to add one additional SortExec after SortMergeJoin in
contrast the test cases
- // when mode is Inner, Left, LeftSemi, LeftAnti
- // Similarly, since partitioning of the left side is not preserved
- // when mode is Right, RightSemi, RightAnti, Full
- // - We need to add one additional Hash Repartition and Roundrobin
repartition after
- // SortMergeJoin in contrast the test cases when mode is Inner,
Left, LeftSemi, LeftAnti
- _ => vec![
- top_join_plan.as_str(),
- // Below 4 operators are differences introduced, when join
mode is changed
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
- " CoalescePartitionsExec",
- &join_plan_indent10,
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
- " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- };
- // TODO(wiedld): show different test result if enforce distribution
first.
- test_config.run(
- &expected_first_sort_enforcement,
- top_join,
- &SORT_DISTRIB_DISTRIB,
- )?;
- match join_type {
- JoinType::Inner | JoinType::Left | JoinType::Right |
JoinType::Full => {
- // This time we use (b1 == c) for top join
- // Join on (b1 == c)
- let top_join_on = vec![(
- Arc::new(Column::new_with_schema("b1",
&join.schema()).unwrap()) as _,
- Arc::new(Column::new_with_schema("c", &schema()).unwrap())
as _,
- )];
- let top_join =
- sort_merge_join_exec(join, parquet_exec(), &top_join_on,
&join_type);
- let top_join_plan =
- format!("SortMergeJoin: join_type={join_type}, on=[(b1@6,
c@2)]");
-
- let expected = match join_type {
- // Should include 6 RepartitionExecs(3 hash, 3
round-robin) and 3 SortExecs
- JoinType::Inner | JoinType::Right => vec![
- top_join_plan.as_str(),
- &join_plan_indent2,
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[c@2 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- // Should include 7 RepartitionExecs (4 hash, 3
round-robin) and 4 SortExecs
- JoinType::Left | JoinType::Full => vec![
- top_join_plan.as_str(),
- " SortExec: expr=[b1@6 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([b1@6], 10),
input_partitions=10",
- &join_plan_indent6,
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([a@0],
10), input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([b1@1],
10), input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " ProjectionExec: expr=[a@0 as a1, b@1 as
b1, c@2 as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[c@2 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- // this match arm cannot be reached
- _ => unreachable!()
- };
- // TODO(wiedld): show different test result if enforce sorting
first.
- test_config.run(&expected, top_join.clone(),
&DISTRIB_DISTRIB_SORT)?;
-
- let expected_first_sort_enforcement = match join_type {
- // Should include 6 RepartitionExecs (3 of them preserves
order) and 3 SortExecs
- JoinType::Inner | JoinType::Right => vec![
- top_join_plan.as_str(),
- &join_plan_indent2,
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
- " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- // Should include 8 RepartitionExecs (4 of them preserves
order) and 4 SortExecs
- JoinType::Left | JoinType::Full => vec![
- top_join_plan.as_str(),
- " RepartitionExec: partitioning=Hash([b1@6], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[b1@6 ASC],
preserve_partitioning=[false]",
- " CoalescePartitionsExec",
- &join_plan_indent10,
- " RepartitionExec: partitioning=Hash([a@0],
10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1
group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec:
partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true,
sort_exprs=b1@1 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
- " ProjectionExec: expr=[a@0 as a1,
b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1
group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- // this match arm cannot be reached
- _ => unreachable!()
- };
+ let mut settings = Settings::clone_current();
+ settings.add_filter(&format!("join_type={join_type}"),
"join_type=...");
- // TODO(wiedld): show different test result if enforce
distribution first.
- test_config.run(
- &expected_first_sort_enforcement,
- top_join,
- &SORT_DISTRIB_DISTRIB,
- )?;
- }
- _ => {}
+ #[rustfmt::skip]
+ insta::allow_duplicates! {
+ settings.bind(|| {
+ let plan_distrib = test_config.to_plan(top_join.clone(),
&DISTRIB_DISTRIB_SORT);
+
+ match join_type {
+ // Should include 6 RepartitionExecs (3 hash, 3
round-robin), 3 SortExecs
+ JoinType::Inner | JoinType::Left | JoinType::LeftSemi |
JoinType::LeftAnti => {
+ assert_plan!(plan_distrib, @r"
+SortMergeJoin: join_type=..., on=[(a@0, c@2)]
+ SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+ SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1,
e@4 as e1]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ SortExec: expr=[c@2 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+");
+ }
+ // Should include 7 RepartitionExecs (4 hash, 3
round-robin), 4 SortExecs
+ // Since ordering of the left child is not preserved after
SortMergeJoin
+ // when mode is Right, RightSemi, RightAnti, Full
+ // - We need to add one additional SortExec after
SortMergeJoin in contrast the test cases
+ // when mode is Inner, Left, LeftSemi, LeftAnti
+ // Similarly, since partitioning of the left side is not
preserved
+ // when mode is Right, RightSemi, RightAnti, Full
+ // - We need to add one additional Hash Repartition after
SortMergeJoin in contrast the test
+ // cases when mode is Inner, Left, LeftSemi, LeftAnti
+ _ => {
+ assert_plan!(plan_distrib, @r"
+SortMergeJoin: join_type=..., on=[(a@0, c@2)]
+ SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10
+ SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+ SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10
+ RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet
+ SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10
+ RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as
d1, e@4 as e1]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet
+ SortExec: expr=[c@2 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+");
+ }
+ }
+
+ let plan_sort = test_config.to_plan(top_join.clone(),
&SORT_DISTRIB_DISTRIB);
+
+ match join_type {
+ // Should include 6 RepartitionExecs (3 hash, 3
round-robin), 3 SortExecs
+ JoinType::Inner | JoinType::Left | JoinType::LeftSemi |
JoinType::LeftAnti => {
+ // TODO(wiedld): show different test result if enforce
distribution first.
+ assert_plan!(plan_sort, @r"
+SortMergeJoin: join_type=..., on=[(a@0, c@2)]
+ SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+ RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10,
preserve_order=true, sort_exprs=a@0 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10,
preserve_order=true, sort_exprs=b1@1 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]
+ ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1,
e@4 as e1]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10,
preserve_order=true, sort_exprs=c@2 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+");
+ }
+ // Should include 8 RepartitionExecs (4 hash, 8
round-robin), 4 SortExecs
+ // Since ordering of the left child is not preserved after
SortMergeJoin
+ // when mode is Right, RightSemi, RightAnti, Full
+ // - We need to add one additional SortExec after
SortMergeJoin in contrast the test cases
+ // when mode is Inner, Left, LeftSemi, LeftAnti
+ // Similarly, since partitioning of the left side is not
preserved
+ // when mode is Right, RightSemi, RightAnti, Full
+ // - We need to add one additional Hash Repartition and
Roundrobin repartition after
+ // SortMergeJoin in contrast the test cases when mode is
Inner, Left, LeftSemi, LeftAnti
+ _ => {
+ // TODO(wiedld): show different test result if enforce
distribution first.
+ assert_plan!(plan_sort, @r"
+SortMergeJoin: join_type=..., on=[(a@0, c@2)]
+ RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10,
preserve_order=true, sort_exprs=a@0 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ CoalescePartitionsExec
+ SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+ RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet
+ RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]
+ ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3
as d1, e@4 as e1]
+ DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet
+ RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10,
preserve_order=true, sort_exprs=c@2 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+");
+ }
+ }
+
+ match join_type {
+ JoinType::Inner | JoinType::Left | JoinType::Right |
JoinType::Full => {
+ // This time we use (b1 == c) for top join
+ // Join on (b1 == c)
+ let top_join_on = vec![(
+ Arc::new(Column::new_with_schema("b1",
&join.schema()).unwrap()) as _,
+ Arc::new(Column::new_with_schema("c",
&schema()).unwrap()) as _,
+ )];
+ let top_join = sort_merge_join_exec(join,
parquet_exec(), &top_join_on, &join_type);
+
+ let plan_distrib =
test_config.to_plan(top_join.clone(), &DISTRIB_DISTRIB_SORT);
+
+ match join_type {
+ // Should include 6 RepartitionExecs(3 hash, 3
round-robin) and 3 SortExecs
+ JoinType::Inner | JoinType::Right => {
+ // TODO(wiedld): show different test result if
enforce sorting first.
+ assert_plan!(plan_distrib, @r"
+SortMergeJoin: join_type=..., on=[(b1@6, c@2)]
+ SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+ SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1,
e@4 as e1]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ SortExec: expr=[c@2 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+");
+ }
+ // Should include 7 RepartitionExecs (4 hash, 3
round-robin) and 4 SortExecs
+ JoinType::Left | JoinType::Full => {
+ // TODO(wiedld): show different test result if
enforce sorting first.
+ assert_plan!(plan_distrib, @r"
+SortMergeJoin: join_type=..., on=[(b1@6, c@2)]
+ SortExec: expr=[b1@6 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10
+ SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+ SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10
+ RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet
+ SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10
+ RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as
d1, e@4 as e1]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet
+ SortExec: expr=[c@2 ASC], preserve_partitioning=[true]
+ RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+");
+ }
+ // this match arm cannot be reached
+ _ => unreachable!()
+ }
+
+ let plan_sort = test_config.to_plan(top_join,
&SORT_DISTRIB_DISTRIB);
+
+ match join_type {
+ // Should include 6 RepartitionExecs (3 of them
preserves order) and 3 SortExecs
+ JoinType::Inner | JoinType::Right => {
+ // TODO(wiedld): show different test result if
enforce distribution first.
+ assert_plan!(plan_sort, @r"
+SortMergeJoin: join_type=..., on=[(b1@6, c@2)]
+ SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+ RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10,
preserve_order=true, sort_exprs=a@0 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10,
preserve_order=true, sort_exprs=b1@1 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]
+ ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1,
e@4 as e1]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10,
preserve_order=true, sort_exprs=c@2 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+");
+ }
+ // Should include 8 RepartitionExecs (4 of them
preserves order) and 4 SortExecs
+ JoinType::Left | JoinType::Full => {
+ // TODO(wiedld): show different test result if
enforce distribution first.
+ assert_plan!(plan_sort, @r"
+SortMergeJoin: join_type=..., on=[(b1@6, c@2)]
+ RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10,
preserve_order=true, sort_exprs=b1@6 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ SortExec: expr=[b1@6 ASC], preserve_partitioning=[false]
+ CoalescePartitionsExec
+ SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+ RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=parquet
+ RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1
+ SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]
+ ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3
as d1, e@4 as e1]
+ DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet
+ RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10,
preserve_order=true, sort_exprs=c@2 ASC
+ RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+ SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+");
+ }
+ // this match arm cannot be reached
+ _ => unreachable!()
+ }
+ }
+ _ => {}
+ }
+ });
}
}
-
Ok(())
}
@@ -2667,46 +2592,51 @@ fn parallelization_compressed_csv() -> Result<()> {
FileCompressionType::UNCOMPRESSED,
];
- let expected_not_partitioned = [
- "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
- " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- " RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
- " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], file_type=csv, has_header=false",
- ];
-
- let expected_partitioned = [
- "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
- " RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
- " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
- " DataSourceExec: file_groups={2 groups: [[x:0..50],
[x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
- ];
+ #[rustfmt::skip]
+ insta::allow_duplicates! {
+ for compression_type in compression_types {
+ let plan = aggregate_exec_with_alias(
+ DataSourceExec::from_data_source(
+ FileScanConfigBuilder::new(
+ ObjectStoreUrl::parse("test:///").unwrap(),
+ schema(),
+ Arc::new(CsvSource::new(false, b',', b'"')),
+ )
+ .with_file(PartitionedFile::new("x".to_string(), 100))
+ .with_file_compression_type(compression_type)
+ .build(),
+ ),
+ vec![("a".to_string(), "a".to_string())],
+ );
+ let test_config = TestConfig::default()
+ .with_query_execution_partitions(2)
+ .with_prefer_repartition_file_scans(10);
+
+ let plan_distrib = test_config.to_plan(plan.clone(),
&DISTRIB_DISTRIB_SORT);
+ if compression_type.is_compressed() {
+ // Compressed files cannot be partitioned
+ assert_plan!(plan_distrib,
+ @r"
+AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+ RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
+ AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+ RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=csv, has_header=false
+");
+ } else {
+ // Uncompressed files can be partitioned
+ assert_plan!(plan_distrib,
+ @r"
+AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+ RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
+ AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+ DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]},
projection=[a, b, c, d, e], file_type=csv, has_header=false
+");
+ }
- for compression_type in compression_types {
- let expected = if compression_type.is_compressed() {
- &expected_not_partitioned[..]
- } else {
- &expected_partitioned[..]
- };
-
- let plan = aggregate_exec_with_alias(
- DataSourceExec::from_data_source(
- FileScanConfigBuilder::new(
- ObjectStoreUrl::parse("test:///").unwrap(),
- schema(),
- Arc::new(CsvSource::new(false, b',', b'"')),
- )
- .with_file(PartitionedFile::new("x".to_string(), 100))
- .with_file_compression_type(compression_type)
- .build(),
- ),
- vec![("a".to_string(), "a".to_string())],
- );
- let test_config = TestConfig::default()
- .with_query_execution_partitions(2)
- .with_prefer_repartition_file_scans(10);
- test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
- test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
+ let plan_sort = test_config.to_plan(plan, &SORT_DISTRIB_DISTRIB);
+ assert_plan!(plan_distrib, plan_sort);
+ }
}
Ok(())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]