This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new acdd263812 Complete migrating  `enforce_distrubution` tests to insta 
(#18185)
acdd263812 is described below

commit acdd263812eb449c856d74e1d0395dd1705f0cd7
Author: Dmitrii Blaginin <[email protected]>
AuthorDate: Wed Nov 5 10:08:40 2025 +0000

    Complete migrating  `enforce_distrubution` tests to insta (#18185)
    
    - Closes https://github.com/apache/datafusion/issues/15791
    - Closes https://github.com/apache/datafusion/issues/15178 🥳
    
    - Surpasses part of https://github.com/apache/datafusion/pull/16978
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../physical_optimizer/enforce_distribution.rs     | 634 +++++++++------------
 1 file changed, 282 insertions(+), 352 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index db011c4be4..5b7d9ac8fb 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -66,8 +66,8 @@ use datafusion_physical_plan::projection::{ProjectionExec, 
ProjectionExpr};
 use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use datafusion_physical_plan::union::UnionExec;
 use datafusion_physical_plan::{
-    displayable, get_plan_string, DisplayAs, DisplayFormatType, 
ExecutionPlanProperties,
-    PlanProperties, Statistics,
+    displayable, DisplayAs, DisplayFormatType, ExecutionPlanProperties, 
PlanProperties,
+    Statistics,
 };
 use insta::Settings;
 
@@ -469,83 +469,6 @@ impl TestConfig {
         self
     }
 
-    // This be deleted in https://github.com/apache/datafusion/pull/18185
-    /// Perform a series of runs using the current [`TestConfig`],
-    /// assert the expected plan result,
-    /// and return the result plan (for potential subsequent runs).
-    fn run(
-        &self,
-        expected_lines: &[&str],
-        plan: Arc<dyn ExecutionPlan>,
-        optimizers_to_run: &[Run],
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        let expected_lines: Vec<&str> = expected_lines.to_vec();
-
-        // Add the ancillary output requirements operator at the start:
-        let optimizer = OutputRequirements::new_add_mode();
-        let mut optimized = optimizer.optimize(plan.clone(), &self.config)?;
-
-        // This file has 2 rules that use tree node, apply these rules to 
original plan consecutively
-        // After these operations tree nodes should be in a consistent state.
-        // This code block makes sure that these rules doesn't violate tree 
node integrity.
-        {
-            let adjusted = if 
self.config.optimizer.top_down_join_key_reordering {
-                // Run adjust_input_keys_ordering rule
-                let plan_requirements =
-                    PlanWithKeyRequirements::new_default(plan.clone());
-                let adjusted = plan_requirements
-                    .transform_down(adjust_input_keys_ordering)
-                    .data()
-                    .and_then(check_integrity)?;
-                // TODO: End state payloads will be checked here.
-                adjusted.plan
-            } else {
-                // Run reorder_join_keys_to_inputs rule
-                plan.clone()
-                    .transform_up(|plan| {
-                        
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
-                    })
-                    .data()?
-            };
-
-            // Then run ensure_distribution rule
-            DistributionContext::new_default(adjusted)
-                .transform_up(|distribution_context| {
-                    ensure_distribution(distribution_context, &self.config)
-                })
-                .data()
-                .and_then(check_integrity)?;
-            // TODO: End state payloads will be checked here.
-        }
-
-        for run in optimizers_to_run {
-            optimized = match run {
-                Run::Distribution => {
-                    let optimizer = EnforceDistribution::new();
-                    optimizer.optimize(optimized, &self.config)?
-                }
-                Run::Sorting => {
-                    let optimizer = EnforceSorting::new();
-                    optimizer.optimize(optimized, &self.config)?
-                }
-            };
-        }
-
-        // Remove the ancillary output requirements operator when done:
-        let optimizer = OutputRequirements::new_remove_mode();
-        let optimized = optimizer.optimize(optimized, &self.config)?;
-
-        // Now format correctly
-        let actual_lines = get_plan_string(&optimized);
-
-        assert_eq!(
-            &expected_lines, &actual_lines,
-            
"\n\nexpected:\n\n{expected_lines:#?}\nactual:\n\n{actual_lines:#?}\n\n"
-        );
-
-        Ok(optimized)
-    }
-
     /// Perform a series of runs using the current [`TestConfig`],
     /// assert the expected plan result,
     /// and return the result plan (for potential subsequent runs).
@@ -1503,15 +1426,6 @@ fn multi_smj_joins() -> Result<()> {
     for join_type in join_types {
         let join =
             sort_merge_join_exec(left.clone(), right.clone(), &join_on, 
&join_type);
-        let join_plan = |shift| -> String {
-            format!(
-                "{}SortMergeJoin: join_type={join_type}, on=[(a@0, b1@1)]",
-                " ".repeat(shift)
-            )
-        };
-        let join_plan_indent2 = join_plan(2);
-        let join_plan_indent6 = join_plan(6);
-        let join_plan_indent10 = join_plan(10);
 
         // Top join on (a == c)
         let top_join_on = vec![(
@@ -1520,235 +1434,246 @@ fn multi_smj_joins() -> Result<()> {
         )];
         let top_join =
             sort_merge_join_exec(join.clone(), parquet_exec(), &top_join_on, 
&join_type);
-        let top_join_plan =
-            format!("SortMergeJoin: join_type={join_type}, on=[(a@0, c@2)]");
-
-        let expected = match join_type {
-            // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 
SortExecs
-            JoinType::Inner | JoinType::Left | JoinType::LeftSemi | 
JoinType::LeftAnti =>
-                vec![
-                    top_join_plan.as_str(),
-                    &join_plan_indent2,
-                    "    SortExec: expr=[a@0 ASC], 
preserve_partitioning=[true]",
-                    "      RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                    "        RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                    "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    "    SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[true]",
-                    "      RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
-                    "        RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                    "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
-                    "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    "  SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
-                    "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
-                    "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                ],
-            // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4 
SortExecs
-            // Since ordering of the left child is not preserved after 
SortMergeJoin
-            // when mode is Right, RightSemi, RightAnti, Full
-            // - We need to add one additional SortExec after SortMergeJoin in 
contrast the test cases
-            //   when mode is Inner, Left, LeftSemi, LeftAnti
-            // Similarly, since partitioning of the left side is not preserved
-            // when mode is Right, RightSemi, RightAnti, Full
-            // - We need to add one additional Hash Repartition after 
SortMergeJoin in contrast the test
-            //   cases when mode is Inner, Left, LeftSemi, LeftAnti
-            _ => vec![
-                    top_join_plan.as_str(),
-                    // Below 2 operators are differences introduced, when join 
mode is changed
-                    "  SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
-                    "    RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                    &join_plan_indent6,
-                    "        SortExec: expr=[a@0 ASC], 
preserve_partitioning=[true]",
-                    "          RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                    "            RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                    "              DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
-                    "        SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[true]",
-                    "          RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
-                    "            RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                    "              ProjectionExec: expr=[a@0 as a1, b@1 as b1, 
c@2 as c1, d@3 as d1, e@4 as e1]",
-                    "                DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
-                    "  SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
-                    "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
-                    "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-            ],
-        };
-        // TODO(wiedld): show different test result if enforce sorting first.
-        test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
-
-        let expected_first_sort_enforcement = match join_type {
-            // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 
SortExecs
-            JoinType::Inner | JoinType::Left | JoinType::LeftSemi | 
JoinType::LeftAnti =>
-                vec![
-                    top_join_plan.as_str(),
-                    &join_plan_indent2,
-                    "    RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
-                    "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "        SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]",
-                    "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    "    RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
-                    "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "        SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[false]",
-                    "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 
as c1, d@3 as d1, e@4 as e1]",
-                    "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
-                    "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                    "      SortExec: expr=[c@2 ASC], 
preserve_partitioning=[false]",
-                    "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                ],
-            // Should include 8 RepartitionExecs (4 hash, 8 round-robin), 4 
SortExecs
-            // Since ordering of the left child is not preserved after 
SortMergeJoin
-            // when mode is Right, RightSemi, RightAnti, Full
-            // - We need to add one additional SortExec after SortMergeJoin in 
contrast the test cases
-            //   when mode is Inner, Left, LeftSemi, LeftAnti
-            // Similarly, since partitioning of the left side is not preserved
-            // when mode is Right, RightSemi, RightAnti, Full
-            // - We need to add one additional Hash Repartition and Roundrobin 
repartition after
-            //   SortMergeJoin in contrast the test cases when mode is Inner, 
Left, LeftSemi, LeftAnti
-            _ => vec![
-                top_join_plan.as_str(),
-                // Below 4 operators are differences introduced, when join 
mode is changed
-                "  RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
-                "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                "      SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]",
-                "        CoalescePartitionsExec",
-                &join_plan_indent10,
-                "            RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
-                "              RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                "                SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]",
-                "                  DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
-                "            RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
-                "              RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                "                SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[false]",
-                "                  ProjectionExec: expr=[a@0 as a1, b@1 as b1, 
c@2 as c1, d@3 as d1, e@4 as e1]",
-                "                    DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
-                "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
-                "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-                "      SortExec: expr=[c@2 ASC], 
preserve_partitioning=[false]",
-                "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-            ],
-        };
-        // TODO(wiedld): show different test result if enforce distribution 
first.
-        test_config.run(
-            &expected_first_sort_enforcement,
-            top_join,
-            &SORT_DISTRIB_DISTRIB,
-        )?;
 
-        match join_type {
-            JoinType::Inner | JoinType::Left | JoinType::Right | 
JoinType::Full => {
-                // This time we use (b1 == c) for top join
-                // Join on (b1 == c)
-                let top_join_on = vec![(
-                    Arc::new(Column::new_with_schema("b1", 
&join.schema()).unwrap()) as _,
-                    Arc::new(Column::new_with_schema("c", &schema()).unwrap()) 
as _,
-                )];
-                let top_join =
-                    sort_merge_join_exec(join, parquet_exec(), &top_join_on, 
&join_type);
-                let top_join_plan =
-                    format!("SortMergeJoin: join_type={join_type}, on=[(b1@6, 
c@2)]");
-
-                let expected = match join_type {
-                    // Should include 6 RepartitionExecs(3 hash, 3 
round-robin) and 3 SortExecs
-                    JoinType::Inner | JoinType::Right => vec![
-                        top_join_plan.as_str(),
-                        &join_plan_indent2,
-                        "    SortExec: expr=[a@0 ASC], 
preserve_partitioning=[true]",
-                        "      RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10",
-                        "        RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                        "          DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
-                        "    SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[true]",
-                        "      RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10",
-                        "        RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                        "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, 
c@2 as c1, d@3 as d1, e@4 as e1]",
-                        "            DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
-                        "  SortExec: expr=[c@2 ASC], 
preserve_partitioning=[true]",
-                        "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
-                        "      RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                        "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    ],
-                    // Should include 7 RepartitionExecs (4 hash, 3 
round-robin) and 4 SortExecs
-                    JoinType::Left | JoinType::Full => vec![
-                        top_join_plan.as_str(),
-                        "  SortExec: expr=[b1@6 ASC], 
preserve_partitioning=[true]",
-                        "    RepartitionExec: partitioning=Hash([b1@6], 10), 
input_partitions=10",
-                        &join_plan_indent6,
-                        "        SortExec: expr=[a@0 ASC], 
preserve_partitioning=[true]",
-                        "          RepartitionExec: partitioning=Hash([a@0], 
10), input_partitions=10",
-                        "            RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                        "              DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
-                        "        SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[true]",
-                        "          RepartitionExec: partitioning=Hash([b1@1], 
10), input_partitions=10",
-                        "            RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                        "              ProjectionExec: expr=[a@0 as a1, b@1 as 
b1, c@2 as c1, d@3 as d1, e@4 as e1]",
-                        "                DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
-                        "  SortExec: expr=[c@2 ASC], 
preserve_partitioning=[true]",
-                        "    RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10",
-                        "      RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                        "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    ],
-                    // this match arm cannot be reached
-                    _ => unreachable!()
-                };
-                // TODO(wiedld): show different test result if enforce sorting 
first.
-                test_config.run(&expected, top_join.clone(), 
&DISTRIB_DISTRIB_SORT)?;
-
-                let expected_first_sort_enforcement = match join_type {
-                    // Should include 6 RepartitionExecs (3 of them preserves 
order) and 3 SortExecs
-                    JoinType::Inner | JoinType::Right => vec![
-                        top_join_plan.as_str(),
-                        &join_plan_indent2,
-                        "    RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
-                        "      RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                        "        SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]",
-                        "          DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
-                        "    RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
-                        "      RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                        "        SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[false]",
-                        "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, 
c@2 as c1, d@3 as d1, e@4 as e1]",
-                        "            DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
-                        "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
-                        "    RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                        "      SortExec: expr=[c@2 ASC], 
preserve_partitioning=[false]",
-                        "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    ],
-                    // Should include 8 RepartitionExecs (4 of them preserves 
order) and 4 SortExecs
-                    JoinType::Left | JoinType::Full => vec![
-                        top_join_plan.as_str(),
-                        "  RepartitionExec: partitioning=Hash([b1@6], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC",
-                        "    RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                        "      SortExec: expr=[b1@6 ASC], 
preserve_partitioning=[false]",
-                        "        CoalescePartitionsExec",
-                        &join_plan_indent10,
-                        "            RepartitionExec: partitioning=Hash([a@0], 
10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
-                        "              RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                        "                SortExec: expr=[a@0 ASC], 
preserve_partitioning=[false]",
-                        "                  DataSourceExec: file_groups={1 
group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
-                        "            RepartitionExec: 
partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, 
sort_exprs=b1@1 ASC",
-                        "              RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                        "                SortExec: expr=[b1@1 ASC], 
preserve_partitioning=[false]",
-                        "                  ProjectionExec: expr=[a@0 as a1, 
b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
-                        "                    DataSourceExec: file_groups={1 
group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
-                        "  RepartitionExec: partitioning=Hash([c@2], 10), 
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
-                        "    RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1",
-                        "      SortExec: expr=[c@2 ASC], 
preserve_partitioning=[false]",
-                        "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
-                    ],
-                    // this match arm cannot be reached
-                    _ => unreachable!()
-                };
+        let mut settings = Settings::clone_current();
+        settings.add_filter(&format!("join_type={join_type}"), 
"join_type=...");
 
-                // TODO(wiedld): show different test result if enforce 
distribution first.
-                test_config.run(
-                    &expected_first_sort_enforcement,
-                    top_join,
-                    &SORT_DISTRIB_DISTRIB,
-                )?;
-            }
-            _ => {}
+        #[rustfmt::skip]
+        insta::allow_duplicates! {
+            settings.bind(|| {
+                let plan_distrib = test_config.to_plan(top_join.clone(), 
&DISTRIB_DISTRIB_SORT);
+
+                match join_type {
+                    // Should include 6 RepartitionExecs (3 hash, 3 
round-robin), 3 SortExecs
+                    JoinType::Inner | JoinType::Left | JoinType::LeftSemi | 
JoinType::LeftAnti => {
+                        assert_plan!(plan_distrib, @r"
+SortMergeJoin: join_type=..., on=[(a@0, c@2)]
+  SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+    SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
+      RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10
+        RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet
+    SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]
+      RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10
+        RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+          ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, 
e@4 as e1]
+            DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet
+  SortExec: expr=[c@2 ASC], preserve_partitioning=[true]
+    RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
+      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet
+");
+                    }
+                    // Should include 7 RepartitionExecs (4 hash, 3 
round-robin), 4 SortExecs
+                    // Since ordering of the left child is not preserved after 
SortMergeJoin
+                    // when mode is Right, RightSemi, RightAnti, Full
+                    // - We need to add one additional SortExec after 
SortMergeJoin in contrast the test cases
+                    //   when mode is Inner, Left, LeftSemi, LeftAnti
+                    // Similarly, since partitioning of the left side is not 
preserved
+                    // when mode is Right, RightSemi, RightAnti, Full
+                    // - We need to add one additional Hash Repartition after 
SortMergeJoin in contrast the test
+                    //   cases when mode is Inner, Left, LeftSemi, LeftAnti
+                    _ => {
+                        assert_plan!(plan_distrib, @r"
+SortMergeJoin: join_type=..., on=[(a@0, c@2)]
+  SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
+    RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10
+      SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+        SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
+          RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10
+            RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1
+              DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet
+        SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]
+          RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10
+            RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1
+              ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as 
d1, e@4 as e1]
+                DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet
+  SortExec: expr=[c@2 ASC], preserve_partitioning=[true]
+    RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
+      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet
+");
+                    }
+                }
+
+                let plan_sort = test_config.to_plan(top_join.clone(), 
&SORT_DISTRIB_DISTRIB);
+
+                match join_type {
+                    // Should include 6 RepartitionExecs (3 hash, 3 
round-robin), 3 SortExecs
+                    JoinType::Inner | JoinType::Left | JoinType::LeftSemi | 
JoinType::LeftAnti => {
+                        // TODO(wiedld): show different test result if enforce 
distribution first.
+                        assert_plan!(plan_sort, @r"
+SortMergeJoin: join_type=..., on=[(a@0, c@2)]
+  SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+    RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, 
preserve_order=true, sort_exprs=a@0 ASC
+      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+        SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet
+    RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, 
preserve_order=true, sort_exprs=b1@1 ASC
+      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+        SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]
+          ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, 
e@4 as e1]
+            DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet
+  RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, 
preserve_order=true, sort_exprs=c@2 ASC
+    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+      SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
+        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet
+");
+                    }
+                    // Should include 8 RepartitionExecs (4 hash, 8 
round-robin), 4 SortExecs
+                    // Since ordering of the left child is not preserved after 
SortMergeJoin
+                    // when mode is Right, RightSemi, RightAnti, Full
+                    // - We need to add one additional SortExec after 
SortMergeJoin in contrast the test cases
+                    //   when mode is Inner, Left, LeftSemi, LeftAnti
+                    // Similarly, since partitioning of the left side is not 
preserved
+                    // when mode is Right, RightSemi, RightAnti, Full
+                    // - We need to add one additional Hash Repartition and 
Roundrobin repartition after
+                    //   SortMergeJoin in contrast the test cases when mode is 
Inner, Left, LeftSemi, LeftAnti
+                    _ => {
+                        // TODO(wiedld): show different test result if enforce 
distribution first.
+                        assert_plan!(plan_sort, @r"
+SortMergeJoin: join_type=..., on=[(a@0, c@2)]
+  RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, 
preserve_order=true, sort_exprs=a@0 ASC
+    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+      SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+        CoalescePartitionsExec
+          SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+            RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC
+              RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1
+                SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+                  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet
+            RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC
+              RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1
+                SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]
+                  ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 
as d1, e@4 as e1]
+                    DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet
+  RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, 
preserve_order=true, sort_exprs=c@2 ASC
+    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+      SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
+        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet
+");
+                    }
+                }
+
+                match join_type {
+                    JoinType::Inner | JoinType::Left | JoinType::Right | 
JoinType::Full => {
+                        // This time we use (b1 == c) for top join
+                        // Join on (b1 == c)
+                        let top_join_on = vec![(
+                            Arc::new(Column::new_with_schema("b1", 
&join.schema()).unwrap()) as _,
+                            Arc::new(Column::new_with_schema("c", 
&schema()).unwrap()) as _,
+                        )];
+                        let top_join = sort_merge_join_exec(join, 
parquet_exec(), &top_join_on, &join_type);
+
+                        let plan_distrib = 
test_config.to_plan(top_join.clone(), &DISTRIB_DISTRIB_SORT);
+
+                        match join_type {
+                            // Should include 6 RepartitionExecs(3 hash, 3 
round-robin) and 3 SortExecs
+                            JoinType::Inner | JoinType::Right => {
+                                // TODO(wiedld): show different test result if 
enforce sorting first.
+                                assert_plan!(plan_distrib, @r"
+SortMergeJoin: join_type=..., on=[(b1@6, c@2)]
+  SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+    SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
+      RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10
+        RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet
+    SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]
+      RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10
+        RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+          ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, 
e@4 as e1]
+            DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet
+  SortExec: expr=[c@2 ASC], preserve_partitioning=[true]
+    RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
+      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet
+");
+                            }
+                            // Should include 7 RepartitionExecs (4 hash, 3 
round-robin) and 4 SortExecs
+                            JoinType::Left | JoinType::Full => {
+                                // TODO(wiedld): show different test result if 
enforce sorting first.
+                                assert_plan!(plan_distrib, @r"
+SortMergeJoin: join_type=..., on=[(b1@6, c@2)]
+  SortExec: expr=[b1@6 ASC], preserve_partitioning=[true]
+    RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10
+      SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+        SortExec: expr=[a@0 ASC], preserve_partitioning=[true]
+          RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10
+            RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1
+              DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet
+        SortExec: expr=[b1@1 ASC], preserve_partitioning=[true]
+          RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10
+            RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1
+              ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as 
d1, e@4 as e1]
+                DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet
+  SortExec: expr=[c@2 ASC], preserve_partitioning=[true]
+    RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10
+      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet
+");
+                            }
+                            // this match arm cannot be reached
+                            _ => unreachable!()
+                        }
+
+                        let plan_sort = test_config.to_plan(top_join, 
&SORT_DISTRIB_DISTRIB);
+
+                        match join_type {
+                            // Should include 6 RepartitionExecs (3 of them 
preserves order) and 3 SortExecs
+                            JoinType::Inner | JoinType::Right => {
+                                // TODO(wiedld): show different test result if 
enforce distribution first.
+                                assert_plan!(plan_sort, @r"
+SortMergeJoin: join_type=..., on=[(b1@6, c@2)]
+  SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+    RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, 
preserve_order=true, sort_exprs=a@0 ASC
+      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+        SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet
+    RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, 
preserve_order=true, sort_exprs=b1@1 ASC
+      RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+        SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]
+          ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, 
e@4 as e1]
+            DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet
+  RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, 
preserve_order=true, sort_exprs=c@2 ASC
+    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+      SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
+        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet
+");
+                            }
+                            // Should include 8 RepartitionExecs (4 of them 
preserves order) and 4 SortExecs
+                            JoinType::Left | JoinType::Full => {
+                                // TODO(wiedld): show different test result if 
enforce distribution first.
+                                assert_plan!(plan_sort, @r"
+SortMergeJoin: join_type=..., on=[(b1@6, c@2)]
+  RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10, 
preserve_order=true, sort_exprs=b1@6 ASC
+    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+      SortExec: expr=[b1@6 ASC], preserve_partitioning=[false]
+        CoalescePartitionsExec
+          SortMergeJoin: join_type=..., on=[(a@0, b1@1)]
+            RepartitionExec: partitioning=Hash([a@0], 10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC
+              RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1
+                SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
+                  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet
+            RepartitionExec: partitioning=Hash([b1@1], 10), 
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC
+              RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1
+                SortExec: expr=[b1@1 ASC], preserve_partitioning=[false]
+                  ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 
as d1, e@4 as e1]
+                    DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet
+  RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, 
preserve_order=true, sort_exprs=c@2 ASC
+    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
+      SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
+        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=parquet
+");
+                            }
+                            // this match arm cannot be reached
+                            _ => unreachable!()
+                        }
+                    }
+                    _ => {}
+                }
+            });
         }
     }
-
     Ok(())
 }
 
@@ -2667,46 +2592,51 @@ fn parallelization_compressed_csv() -> Result<()> {
         FileCompressionType::UNCOMPRESSED,
     ];
 
-    let expected_not_partitioned = [
-        "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
-        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "      RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
-        "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=csv, has_header=false",
-    ];
-
-    let expected_partitioned = [
-        "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
-        "  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
-        "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
-        "      DataSourceExec: file_groups={2 groups: [[x:0..50], 
[x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
-    ];
+    #[rustfmt::skip]
+    insta::allow_duplicates! {
+        for compression_type in compression_types {
+            let plan = aggregate_exec_with_alias(
+                DataSourceExec::from_data_source(
+                    FileScanConfigBuilder::new(
+                        ObjectStoreUrl::parse("test:///").unwrap(),
+                        schema(),
+                        Arc::new(CsvSource::new(false, b',', b'"')),
+                    )
+                    .with_file(PartitionedFile::new("x".to_string(), 100))
+                    .with_file_compression_type(compression_type)
+                    .build(),
+                ),
+                vec![("a".to_string(), "a".to_string())],
+            );
+            let test_config = TestConfig::default()
+                .with_query_execution_partitions(2)
+                .with_prefer_repartition_file_scans(10);
+
+            let plan_distrib = test_config.to_plan(plan.clone(), 
&DISTRIB_DISTRIB_SORT);
+            if compression_type.is_compressed() {
+                // Compressed files cannot be partitioned
+                assert_plan!(plan_distrib,
+                    @r"
+AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
+    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+      RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], file_type=csv, has_header=false
+");
+            } else {
+                // Uncompressed files can be partitioned
+                assert_plan!(plan_distrib,
+                    @r"
+AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+  RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
+    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+      DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, 
projection=[a, b, c, d, e], file_type=csv, has_header=false
+");
+            }
 
-    for compression_type in compression_types {
-        let expected = if compression_type.is_compressed() {
-            &expected_not_partitioned[..]
-        } else {
-            &expected_partitioned[..]
-        };
-
-        let plan = aggregate_exec_with_alias(
-            DataSourceExec::from_data_source(
-                FileScanConfigBuilder::new(
-                    ObjectStoreUrl::parse("test:///").unwrap(),
-                    schema(),
-                    Arc::new(CsvSource::new(false, b',', b'"')),
-                )
-                .with_file(PartitionedFile::new("x".to_string(), 100))
-                .with_file_compression_type(compression_type)
-                .build(),
-            ),
-            vec![("a".to_string(), "a".to_string())],
-        );
-        let test_config = TestConfig::default()
-            .with_query_execution_partitions(2)
-            .with_prefer_repartition_file_scans(10);
-        test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?;
-        test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?;
+            let plan_sort = test_config.to_plan(plan, &SORT_DISTRIB_DISTRIB);
+            assert_plan!(plan_distrib, plan_sort);
+        }
     }
     Ok(())
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to