This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 297af9533b refactor(15003): refactor test suite in 
EnforceDistribution, to use test config builder (#15010)
297af9533b is described below

commit 297af9533b7779b7d4bf847b8cea48e739bc16cc
Author: wiedld <[email protected]>
AuthorDate: Fri Mar 7 05:57:47 2025 -0800

    refactor(15003): refactor test suite in EnforceDistribution, to use test 
config builder (#15010)
---
 .../physical_optimizer/enforce_distribution.rs     | 657 +++++++++++++++------
 1 file changed, 488 insertions(+), 169 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index 85d826109f..f22a896c18 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -372,46 +372,91 @@ macro_rules! plans_matches_expected {
     }
 }
 
+fn test_suite_default_config_options() -> ConfigOptions {
+    let mut config = ConfigOptions::new();
+
+    // By default, will not repartition / resort data if it is already sorted.
+    config.optimizer.prefer_existing_sort = false;
+
+    // By default, will attempt to convert Union to Interleave.
+    config.optimizer.prefer_existing_union = false;
+
+    // By default, will not repartition file scans.
+    config.optimizer.repartition_file_scans = false;
+    config.optimizer.repartition_file_min_size = 1024;
+
+    // By default, set query execution concurrency to 10.
+    config.execution.target_partitions = 10;
+
+    // Use a small batch size, to trigger RoundRobin in tests
+    config.execution.batch_size = 1;
+
+    config
+}
+
+/// How the optimizers are run.
+#[derive(PartialEq, Clone)]
+enum DoFirst {
+    /// Runs: (EnforceDistribution, EnforceDistribution, EnforceSorting)
+    Distribution,
+    /// Runs: (EnforceSorting, EnforceDistribution, EnforceDistribution)
+    Sorting,
+}
+
+#[derive(Clone)]
+struct TestConfig {
+    config: ConfigOptions,
+    optimizers_to_run: DoFirst,
+}
+
+impl TestConfig {
+    fn new(optimizers_to_run: DoFirst) -> Self {
+        Self {
+            config: test_suite_default_config_options(),
+            optimizers_to_run,
+        }
+    }
+
+    /// If preferred, will not repartition / resort data if it is already 
sorted.
+    fn with_prefer_existing_sort(mut self) -> Self {
+        self.config.optimizer.prefer_existing_sort = true;
+        self
+    }
+
+    /// If preferred, will not attempt to convert Union to Interleave.
+    fn with_prefer_existing_union(mut self) -> Self {
+        self.config.optimizer.prefer_existing_union = true;
+        self
+    }
+
+    /// If preferred, will repartition file scans.
+    /// Accepts a minimum file size to repartition.
+    fn with_prefer_repartition_file_scans(mut self, file_min_size: usize) -> 
Self {
+        self.config.optimizer.repartition_file_scans = true;
+        self.config.optimizer.repartition_file_min_size = file_min_size;
+        self
+    }
+
+    /// Set the preferred target partitions for query execution concurrency.
+    fn with_query_execution_partitions(mut self, target_partitions: usize) -> 
Self {
+        self.config.execution.target_partitions = target_partitions;
+        self
+    }
+}
+
 /// Runs the repartition optimizer and asserts the plan against the expected
 /// Arguments
 /// * `EXPECTED_LINES` - Expected output plan
 /// * `PLAN` - Input plan
-/// * `FIRST_ENFORCE_DIST` -
-///     true: (EnforceDistribution, EnforceDistribution,  EnforceSorting)
-///     false: else runs (EnforceSorting, EnforceDistribution, 
EnforceDistribution)
-/// * `PREFER_EXISTING_SORT` (optional) - if true, will not repartition / 
resort data if it is already sorted
-/// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to
-/// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file 
scans
-/// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition
-/// * `PREFER_EXISTING_UNION` (optional) - if true, will not attempt to 
convert Union to Interleave
+/// * `CONFIG` - [`TestConfig`]
 macro_rules! assert_optimized {
-    ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
-        assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 
10, false, 1024, false);
-    };
-
-    ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, 
$PREFER_EXISTING_SORT: expr) => {
-        assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, 
$PREFER_EXISTING_SORT, 10, false, 1024, false);
-    };
-
-    ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, 
$PREFER_EXISTING_SORT: expr, $PREFER_EXISTING_UNION: expr) => {
-        assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, 
$PREFER_EXISTING_SORT, 10, false, 1024, $PREFER_EXISTING_UNION);
-    };
-
-    ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, 
$PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: 
expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
-        assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, 
$PREFER_EXISTING_SORT, $TARGET_PARTITIONS, $REPARTITION_FILE_SCANS, 
$REPARTITION_FILE_MIN_SIZE, false);
-    };
-
-    ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, 
$PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: 
expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr) => {
+    ($EXPECTED_LINES: expr, $PLAN: expr, $CONFIG: expr) => {
         let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| 
*s).collect();
 
-        let mut config = ConfigOptions::new();
-        config.execution.target_partitions = $TARGET_PARTITIONS;
-        config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
-        config.optimizer.repartition_file_min_size = 
$REPARTITION_FILE_MIN_SIZE;
-        config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT;
-        config.optimizer.prefer_existing_union = $PREFER_EXISTING_UNION;
-        // Use a small batch size, to trigger RoundRobin in tests
-        config.execution.batch_size = 1;
+        let TestConfig {
+            config,
+            optimizers_to_run,
+        } = $CONFIG;
 
         // NOTE: These tests verify the joint `EnforceDistribution` + 
`EnforceSorting` cascade
         //       because they were written prior to the separation of 
`BasicEnforcement` into
@@ -455,7 +500,7 @@ macro_rules! assert_optimized {
             // TODO: End state payloads will be checked here.
         }
 
-        let optimized = if $FIRST_ENFORCE_DIST {
+        let optimized = if *optimizers_to_run == DoFirst::Distribution {
             // Run enforce distribution rule first:
             let optimizer = EnforceDistribution::new();
             let optimized = optimizer.optimize(optimized, &config)?;
@@ -602,8 +647,12 @@ fn multi_hash_joins() -> Result<()> {
                         "      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
                     ],
                 };
-                assert_optimized!(expected, top_join.clone(), true);
-                assert_optimized!(expected, top_join, false);
+                assert_optimized!(
+                    expected,
+                    top_join.clone(),
+                    &TestConfig::new(DoFirst::Distribution)
+                );
+                assert_optimized!(expected, top_join, 
&TestConfig::new(DoFirst::Sorting));
             }
             JoinType::RightSemi | JoinType::RightAnti => {}
         }
@@ -666,8 +715,12 @@ fn multi_hash_joins() -> Result<()> {
                             "      DataSourceExec: file_groups={1 group: 
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
                         ],
                 };
-                assert_optimized!(expected, top_join.clone(), true);
-                assert_optimized!(expected, top_join, false);
+                assert_optimized!(
+                    expected,
+                    top_join.clone(),
+                    &TestConfig::new(DoFirst::Distribution)
+                );
+                assert_optimized!(expected, top_join, 
&TestConfig::new(DoFirst::Sorting));
             }
             JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {}
         }
@@ -723,8 +776,12 @@ fn multi_joins_after_alias() -> Result<()> {
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, top_join.clone(), true);
-    assert_optimized!(expected, top_join, false);
+    assert_optimized!(
+        expected,
+        top_join.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting));
 
     // Join on (a2 == c)
     let top_join_on = vec![(
@@ -749,8 +806,12 @@ fn multi_joins_after_alias() -> Result<()> {
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, top_join.clone(), true);
-    assert_optimized!(expected, top_join, false);
+    assert_optimized!(
+        expected,
+        top_join.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -803,8 +864,12 @@ fn multi_joins_after_multi_alias() -> Result<()> {
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
 
-    assert_optimized!(expected, top_join.clone(), true);
-    assert_optimized!(expected, top_join, false);
+    assert_optimized!(
+        expected,
+        top_join.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -843,8 +908,12 @@ fn join_after_agg_alias() -> Result<()> {
         "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, join.clone(), true);
-    assert_optimized!(expected, join, false);
+    assert_optimized!(
+        expected,
+        join.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -896,8 +965,12 @@ fn hash_join_key_ordering() -> Result<()> {
         "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, join.clone(), true);
-    assert_optimized!(expected, join, false);
+    assert_optimized!(
+        expected,
+        join.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -1022,8 +1095,16 @@ fn multi_hash_join_key_ordering() -> Result<()> {
         "          ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]",
         "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, filter_top_join.clone(), true);
-    assert_optimized!(expected, filter_top_join, false);
+    assert_optimized!(
+        expected,
+        filter_top_join.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(
+        expected,
+        filter_top_join,
+        &TestConfig::new(DoFirst::Sorting)
+    );
 
     Ok(())
 }
@@ -1301,6 +1382,7 @@ fn reorder_join_keys_to_right_input() -> Result<()> {
     Ok(())
 }
 
+/// These test cases use [`TestConfig::with_prefer_existing_sort`].
 #[test]
 fn multi_smj_joins() -> Result<()> {
     let left = parquet_exec();
@@ -1402,7 +1484,11 @@ fn multi_smj_joins() -> Result<()> {
                     "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
             ],
         };
-        assert_optimized!(expected, top_join.clone(), true, true);
+        assert_optimized!(
+            expected,
+            top_join.clone(),
+            &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
+        );
 
         let expected_first_sort_enforcement = match join_type {
             // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 
SortExecs
@@ -1456,7 +1542,11 @@ fn multi_smj_joins() -> Result<()> {
                 "        DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
             ],
         };
-        assert_optimized!(expected_first_sort_enforcement, top_join, false, 
true);
+        assert_optimized!(
+            expected_first_sort_enforcement,
+            top_join,
+            &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
+        );
 
         match join_type {
             JoinType::Inner | JoinType::Left | JoinType::Right | 
JoinType::Full => {
@@ -1513,7 +1603,11 @@ fn multi_smj_joins() -> Result<()> {
                     // this match arm cannot be reached
                     _ => unreachable!()
                 };
-                assert_optimized!(expected, top_join.clone(), true, true);
+                assert_optimized!(
+                    expected,
+                    top_join.clone(),
+                    
&TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
+                );
 
                 let expected_first_sort_enforcement = match join_type {
                     // Should include 6 RepartitionExecs (3 of them preserves 
order) and 3 SortExecs
@@ -1559,7 +1653,12 @@ fn multi_smj_joins() -> Result<()> {
                     // this match arm cannot be reached
                     _ => unreachable!()
                 };
-                assert_optimized!(expected_first_sort_enforcement, top_join, 
false, true);
+
+                assert_optimized!(
+                    expected_first_sort_enforcement,
+                    top_join,
+                    
&TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
+                );
             }
             _ => {}
         }
@@ -1568,6 +1667,7 @@ fn multi_smj_joins() -> Result<()> {
     Ok(())
 }
 
+/// These test cases use [`TestConfig::with_prefer_existing_sort`].
 #[test]
 fn smj_join_key_ordering() -> Result<()> {
     // group by (a as a1, b as b1)
@@ -1633,7 +1733,11 @@ fn smj_join_key_ordering() -> Result<()> {
         "            RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "              DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, join.clone(), true, true);
+    assert_optimized!(
+        expected,
+        join.clone(),
+        &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
+    );
 
     let expected_first_sort_enforcement = &[
         "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
@@ -1659,7 +1763,11 @@ fn smj_join_key_ordering() -> Result<()> {
         "                  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "                    DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected_first_sort_enforcement, join, false, true);
+    assert_optimized!(
+        expected_first_sort_enforcement,
+        join,
+        &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
+    );
 
     Ok(())
 }
@@ -1690,7 +1798,7 @@ fn merge_does_not_need_sort() -> Result<()> {
         "  CoalesceBatchesExec: target_batch_size=4096",
         "    DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, exec, true);
+    assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Distribution));
 
     // In this case preserving ordering through order preserving operators is 
not desirable
     // (according to flag: PREFER_EXISTING_SORT)
@@ -1702,7 +1810,7 @@ fn merge_does_not_need_sort() -> Result<()> {
         "    CoalesceBatchesExec: target_batch_size=4096",
         "      DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, exec, false);
+    assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -1743,8 +1851,12 @@ fn union_to_interleave() -> Result<()> {
         "            RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "              DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, plan.clone(), true);
-    assert_optimized!(expected, plan.clone(), false);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, plan.clone(), 
&TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -1786,24 +1898,16 @@ fn union_not_to_interleave() -> Result<()> {
         "              RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "                DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    // no sort in the plan but since we need it as a parameter, make it 
default false
-    let prefer_existing_sort = false;
-    let first_enforce_distribution = true;
-    let prefer_existing_union = true;
 
     assert_optimized!(
         expected,
         plan.clone(),
-        first_enforce_distribution,
-        prefer_existing_sort,
-        prefer_existing_union
+        &TestConfig::new(DoFirst::Distribution).with_prefer_existing_union()
     );
     assert_optimized!(
         expected,
         plan,
-        !first_enforce_distribution,
-        prefer_existing_sort,
-        prefer_existing_union
+        &TestConfig::new(DoFirst::Sorting).with_prefer_existing_union()
     );
 
     Ok(())
@@ -1821,8 +1925,12 @@ fn added_repartition_to_single_partition() -> Result<()> 
{
         "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, plan.clone(), true);
-    assert_optimized!(expected, plan, false);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -1840,14 +1948,17 @@ fn repartition_deepest_node() -> Result<()> {
         "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, plan.clone(), true);
-    assert_optimized!(expected, plan, false);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
 
 #[test]
-
 fn repartition_unsorted_limit() -> Result<()> {
     let plan = limit_exec(filter_exec(parquet_exec()));
 
@@ -1861,8 +1972,12 @@ fn repartition_unsorted_limit() -> Result<()> {
         "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
 
-    assert_optimized!(expected, plan.clone(), true);
-    assert_optimized!(expected, plan, false);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -1883,8 +1998,12 @@ fn repartition_sorted_limit() -> Result<()> {
         "    SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, plan.clone(), true);
-    assert_optimized!(expected, plan, false);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -1911,8 +2030,12 @@ fn repartition_sorted_limit_with_filter() -> Result<()> {
         "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
 
-    assert_optimized!(expected, plan.clone(), true);
-    assert_optimized!(expected, plan, false);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -1941,8 +2064,12 @@ fn repartition_ignores_limit() -> Result<()> {
         // Expect no repartition to happen for local limit
         "                      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, plan.clone(), true);
-    assert_optimized!(expected, plan, false);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -1961,8 +2088,12 @@ fn repartition_ignores_union() -> Result<()> {
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
     ];
 
-    assert_optimized!(expected, plan.clone(), true);
-    assert_optimized!(expected, plan, false);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -1982,8 +2113,12 @@ fn repartition_through_sort_preserving_merge() -> 
Result<()> {
         "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, plan.clone(), true);
-    assert_optimized!(expected, plan, false);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -2008,14 +2143,18 @@ fn repartition_ignores_sort_preserving_merge() -> 
Result<()> {
         "  DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
 
-    assert_optimized!(expected, plan.clone(), true);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     let expected = &[
         "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
         "  CoalescePartitionsExec",
         "    DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, plan, false);
+    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -2039,7 +2178,11 @@ fn 
repartition_ignores_sort_preserving_merge_with_union() -> Result<()> {
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
 
-    assert_optimized!(expected, plan.clone(), true);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     let expected = &[
         "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]",
@@ -2048,11 +2191,12 @@ fn 
repartition_ignores_sort_preserving_merge_with_union() -> Result<()> {
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, plan, false);
+    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
 
+/// These test cases use [`TestConfig::with_prefer_existing_sort`].
 #[test]
 fn repartition_does_not_destroy_sort() -> Result<()> {
     //  SortRequired
@@ -2075,8 +2219,16 @@ fn repartition_does_not_destroy_sort() -> Result<()> {
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
     ];
 
-    assert_optimized!(expected, plan.clone(), true, true);
-    assert_optimized!(expected, plan, false, true);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
+    );
+    assert_optimized!(
+        expected,
+        plan,
+        &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
+    );
 
     Ok(())
 }
@@ -2116,8 +2268,12 @@ fn repartition_does_not_destroy_sort_more_complex() -> 
Result<()> {
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, plan.clone(), true);
-    assert_optimized!(expected, plan, false);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -2150,7 +2306,11 @@ fn repartition_transitively_with_projection() -> 
Result<()> {
         "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
 
-    assert_optimized!(expected, plan.clone(), true);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     let expected_first_sort_enforcement = &[
         "SortExec: expr=[sum@0 ASC], preserve_partitioning=[false]",
@@ -2160,7 +2320,11 @@ fn repartition_transitively_with_projection() -> 
Result<()> {
         "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected_first_sort_enforcement, plan, false);
+    assert_optimized!(
+        expected_first_sort_enforcement,
+        plan,
+        &TestConfig::new(DoFirst::Sorting)
+    );
 
     Ok(())
 }
@@ -2192,8 +2356,12 @@ fn repartition_ignores_transitively_with_projection() -> 
Result<()> {
         "  ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
         "    DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, plan.clone(), true);
-    assert_optimized!(expected, plan, false);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -2225,8 +2393,12 @@ fn repartition_transitively_past_sort_with_projection() 
-> Result<()> {
         "  ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, plan.clone(), true);
-    assert_optimized!(expected, plan, false);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -2249,7 +2421,11 @@ fn repartition_transitively_past_sort_with_filter() -> 
Result<()> {
         "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
 
-    assert_optimized!(expected, plan.clone(), true);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     let expected_first_sort_enforcement = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
@@ -2259,7 +2435,11 @@ fn repartition_transitively_past_sort_with_filter() -> 
Result<()> {
         "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "        DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected_first_sort_enforcement, plan, false);
+    assert_optimized!(
+        expected_first_sort_enforcement,
+        plan,
+        &TestConfig::new(DoFirst::Sorting)
+    );
 
     Ok(())
 }
@@ -2296,7 +2476,11 @@ fn 
repartition_transitively_past_sort_with_projection_and_filter() -> Result<()>
         "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
 
-    assert_optimized!(expected, plan.clone(), true);
+    assert_optimized!(
+        expected,
+        plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     let expected_first_sort_enforcement = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
@@ -2306,7 +2490,11 @@ fn 
repartition_transitively_past_sort_with_projection_and_filter() -> Result<()>
         "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected_first_sort_enforcement, plan, false);
+    assert_optimized!(
+        expected_first_sort_enforcement,
+        plan,
+        &TestConfig::new(DoFirst::Sorting)
+    );
 
     Ok(())
 }
@@ -2329,8 +2517,12 @@ fn parallelization_single_partition() -> Result<()> {
         "    AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
         "      DataSourceExec: file_groups={2 groups: [[x:0..50], 
[x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
     ];
-    assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 
10);
-    assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10);
+
+    let test_config = TestConfig::new(DoFirst::Distribution)
+        .with_prefer_repartition_file_scans(10)
+        .with_query_execution_partitions(2);
+    assert_optimized!(expected_parquet, plan_parquet, &test_config);
+    assert_optimized!(expected_csv, plan_csv, &test_config);
 
     Ok(())
 }
@@ -2346,24 +2538,22 @@ fn parallelization_multiple_files() -> Result<()> {
     let plan = 
filter_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()]));
     let plan = sort_required_exec_with_req(plan, sort_key);
 
+    let test_config = TestConfig::new(DoFirst::Distribution)
+        .with_prefer_existing_sort()
+        .with_prefer_repartition_file_scans(1);
+
     // The groups must have only contiguous ranges of rows from the same file
     // if any group has rows from multiple files, the data is no longer sorted 
destroyed
     // https://github.com/apache/datafusion/issues/8451
     let expected = [
         "SortRequiredExec: [a@0 ASC]",
         "  FilterExec: c@2 = 0",
-        "    DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100], 
[x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], 
file_type=parquet",        ];
-    let target_partitions = 3;
-    let repartition_size = 1;
+        "    DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100], 
[x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], 
file_type=parquet",
+    ];
     assert_optimized!(
         expected,
         plan,
-        true,
-        true,
-        target_partitions,
-        true,
-        repartition_size,
-        false
+        &test_config.clone().with_query_execution_partitions(3)
     );
 
     let expected = [
@@ -2371,17 +2561,10 @@ fn parallelization_multiple_files() -> Result<()> {
         "  FilterExec: c@2 = 0",
         "    DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25], 
[x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
-    let target_partitions = 8;
-    let repartition_size = 1;
     assert_optimized!(
         expected,
         plan,
-        true,
-        true,
-        target_partitions,
-        true,
-        repartition_size,
-        false
+        &test_config.with_query_execution_partitions(8)
     );
 
     Ok(())
@@ -2432,7 +2615,13 @@ fn parallelization_compressed_csv() -> Result<()> {
             .build(),
             vec![("a".to_string(), "a".to_string())],
         );
-        assert_optimized!(expected, plan, true, false, 2, true, 10, false);
+        assert_optimized!(
+            expected,
+            plan,
+            &TestConfig::new(DoFirst::Distribution)
+                .with_query_execution_partitions(2)
+                .with_prefer_repartition_file_scans(10)
+        );
     }
     Ok(())
 }
@@ -2457,8 +2646,11 @@ fn parallelization_two_partitions() -> Result<()> {
         // Plan already has two partitions
         "      DataSourceExec: file_groups={2 groups: [[x:0..100], 
[y:0..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false",
     ];
-    assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 
10);
-    assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10);
+    let test_config = TestConfig::new(DoFirst::Distribution)
+        .with_query_execution_partitions(2)
+        .with_prefer_repartition_file_scans(10);
+    assert_optimized!(expected_parquet, plan_parquet, &test_config);
+    assert_optimized!(expected_csv, plan_csv, &test_config);
     Ok(())
 }
 
@@ -2482,8 +2674,11 @@ fn parallelization_two_partitions_into_four() -> 
Result<()> {
         // Multiple source files splitted across partitions
         "      DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], 
[y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv, 
has_header=false",
     ];
-    assert_optimized!(expected_parquet, plan_parquet, true, false, 4, true, 
10);
-    assert_optimized!(expected_csv, plan_csv, true, false, 4, true, 10);
+    let test_config = TestConfig::new(DoFirst::Distribution)
+        .with_query_execution_partitions(4)
+        .with_prefer_repartition_file_scans(10);
+    assert_optimized!(expected_parquet, plan_parquet, &test_config);
+    assert_optimized!(expected_csv, plan_csv, &test_config);
 
     Ok(())
 }
@@ -2514,8 +2709,16 @@ fn parallelization_sorted_limit() -> Result<()> {
         // Doesn't parallelize for SortExec without preserve_partitioning
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=csv, has_header=false",
     ];
-    assert_optimized!(expected_parquet, plan_parquet, true);
-    assert_optimized!(expected_csv, plan_csv, true);
+    assert_optimized!(
+        expected_parquet,
+        plan_parquet,
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(
+        expected_csv,
+        plan_csv,
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     Ok(())
 }
@@ -2558,8 +2761,16 @@ fn parallelization_limit_with_filter() -> Result<()> {
         // SortExec doesn't benefit from input partitioning
         "            DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=csv, has_header=false",
     ];
-    assert_optimized!(expected_parquet, plan_parquet, true);
-    assert_optimized!(expected_csv, plan_csv, true);
+    assert_optimized!(
+        expected_parquet,
+        plan_parquet,
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(
+        expected_csv,
+        plan_csv,
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     Ok(())
 }
@@ -2606,8 +2817,16 @@ fn parallelization_ignores_limit() -> Result<()> {
         "                    LocalLimitExec: fetch=100",
         "                      DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=csv, has_header=false",
     ];
-    assert_optimized!(expected_parquet, plan_parquet, true);
-    assert_optimized!(expected_csv, plan_csv, true);
+    assert_optimized!(
+        expected_parquet,
+        plan_parquet,
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(
+        expected_csv,
+        plan_csv,
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     Ok(())
 }
@@ -2635,8 +2854,16 @@ fn parallelization_union_inputs() -> Result<()> {
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=csv, has_header=false",
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=csv, has_header=false",
     ];
-    assert_optimized!(expected_parquet, plan_parquet, true);
-    assert_optimized!(expected_csv, plan_csv, true);
+    assert_optimized!(
+        expected_parquet,
+        plan_parquet,
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(
+        expected_csv,
+        plan_csv,
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     Ok(())
 }
@@ -2663,8 +2890,16 @@ fn parallelization_prior_to_sort_preserving_merge() -> 
Result<()> {
     let expected_csv = &[
         "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, 
e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
     ];
-    assert_optimized!(expected_parquet, plan_parquet, true);
-    assert_optimized!(expected_csv, plan_csv, true);
+    assert_optimized!(
+        expected_parquet,
+        plan_parquet,
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(
+        expected_csv,
+        plan_csv,
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     Ok(())
 }
@@ -2697,8 +2932,16 @@ fn parallelization_sort_preserving_merge_with_union() -> 
Result<()> {
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
     ];
-    assert_optimized!(expected_parquet, plan_parquet, true);
-    assert_optimized!(expected_csv, plan_csv, true);
+    assert_optimized!(
+        expected_parquet,
+        plan_parquet,
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(
+        expected_csv,
+        plan_csv,
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     Ok(())
 }
@@ -2728,8 +2971,16 @@ fn parallelization_does_not_benefit() -> Result<()> {
         "SortRequiredExec: [c@2 ASC]",
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
     ];
-    assert_optimized!(expected_parquet, plan_parquet, true);
-    assert_optimized!(expected_csv, plan_csv, true);
+    assert_optimized!(
+        expected_parquet,
+        plan_parquet,
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(
+        expected_csv,
+        plan_csv,
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     Ok(())
 }
@@ -2768,7 +3019,11 @@ fn 
parallelization_ignores_transitively_with_projection_parquet() -> Result<()>
         "ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected_parquet, plan_parquet, true);
+    assert_optimized!(
+        expected_parquet,
+        plan_parquet,
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     Ok(())
 }
@@ -2807,7 +3062,11 @@ fn 
parallelization_ignores_transitively_with_projection_csv() -> Result<()> {
         "ProjectionExec: expr=[a@0 as a2, c@2 as c2]",
         "  DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false",
     ];
-    assert_optimized!(expected_csv, plan_csv, true);
+    assert_optimized!(
+        expected_csv,
+        plan_csv,
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     Ok(())
 }
@@ -2831,12 +3090,17 @@ fn remove_redundant_roundrobins() -> Result<()> {
         "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, physical_plan.clone(), true);
-    assert_optimized!(expected, physical_plan, false);
+    assert_optimized!(
+        expected,
+        physical_plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
 
+/// This test case uses [`TestConfig::with_prefer_existing_sort`].
 #[test]
 fn remove_unnecessary_spm_after_filter() -> Result<()> {
     let schema = schema();
@@ -2855,13 +3119,21 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> {
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC",
         "      DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    // last flag sets config.optimizer.PREFER_EXISTING_SORT
-    assert_optimized!(expected, physical_plan.clone(), true, true);
-    assert_optimized!(expected, physical_plan, false, true);
+    assert_optimized!(
+        expected,
+        physical_plan.clone(),
+        &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
+    );
+    assert_optimized!(
+        expected,
+        physical_plan,
+        &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
+    );
 
     Ok(())
 }
 
+/// This test case uses [`TestConfig::with_prefer_existing_sort`].
 #[test]
 fn preserve_ordering_through_repartition() -> Result<()> {
     let schema = schema();
@@ -2878,9 +3150,16 @@ fn preserve_ordering_through_repartition() -> Result<()> 
{
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC",
         "      DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet",
     ];
-    // last flag sets config.optimizer.PREFER_EXISTING_SORT
-    assert_optimized!(expected, physical_plan.clone(), true, true);
-    assert_optimized!(expected, physical_plan, false, true);
+    assert_optimized!(
+        expected,
+        physical_plan.clone(),
+        &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort()
+    );
+    assert_optimized!(
+        expected,
+        physical_plan,
+        &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort()
+    );
 
     Ok(())
 }
@@ -2903,7 +3182,11 @@ fn do_not_preserve_ordering_through_repartition() -> 
Result<()> {
         "        DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
 
-    assert_optimized!(expected, physical_plan.clone(), true);
+    assert_optimized!(
+        expected,
+        physical_plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     let expected = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
@@ -2912,7 +3195,7 @@ fn do_not_preserve_ordering_through_repartition() -> 
Result<()> {
         "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
         "        DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, physical_plan, false);
+    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -2935,8 +3218,12 @@ fn no_need_for_sort_after_filter() -> Result<()> {
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
         "      DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, physical_plan.clone(), true);
-    assert_optimized!(expected, physical_plan, false);
+    assert_optimized!(
+        expected,
+        physical_plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -2964,7 +3251,11 @@ fn do_not_preserve_ordering_through_repartition2() -> 
Result<()> {
         "        DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
 
-    assert_optimized!(expected, physical_plan.clone(), true);
+    assert_optimized!(
+        expected,
+        physical_plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
 
     let expected = &[
         "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
@@ -2974,7 +3265,7 @@ fn do_not_preserve_ordering_through_repartition2() -> 
Result<()> {
         "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
         "          DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, physical_plan, false);
+    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -2994,8 +3285,12 @@ fn do_not_preserve_ordering_through_repartition3() -> 
Result<()> {
         "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2",
         "    DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
-    assert_optimized!(expected, physical_plan.clone(), true);
-    assert_optimized!(expected, physical_plan, false);
+    assert_optimized!(
+        expected,
+        physical_plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -3091,8 +3386,16 @@ fn do_not_add_unnecessary_hash() -> Result<()> {
         "    DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     // Make sure target partition number is 1. In this case hash repartition 
is unnecessary
-    assert_optimized!(expected, physical_plan.clone(), true, false, 1, false, 
1024);
-    assert_optimized!(expected, physical_plan, false, false, 1, false, 1024);
+    assert_optimized!(
+        expected,
+        physical_plan.clone(),
+        
&TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(1)
+    );
+    assert_optimized!(
+        expected,
+        physical_plan,
+        &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(1)
+    );
 
     Ok(())
 }
@@ -3121,8 +3424,16 @@ fn do_not_add_unnecessary_hash2() -> Result<()> {
         "            DataSourceExec: file_groups={2 groups: [[x], [y]]}, 
projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet",
     ];
     // Make sure target partition number is larger than 2 (e.g partition 
number at the source).
-    assert_optimized!(expected, physical_plan.clone(), true, false, 4, false, 
1024);
-    assert_optimized!(expected, physical_plan, false, false, 4, false, 1024);
+    assert_optimized!(
+        expected,
+        physical_plan.clone(),
+        
&TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(4)
+    );
+    assert_optimized!(
+        expected,
+        physical_plan,
+        &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(4)
+    );
 
     Ok(())
 }
@@ -3139,8 +3450,12 @@ fn optimize_away_unnecessary_repartition() -> Result<()> 
{
 
     let expected =
         &["DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], file_type=parquet"];
-    assert_optimized!(expected, physical_plan.clone(), true);
-    assert_optimized!(expected, physical_plan, false);
+    assert_optimized!(
+        expected,
+        physical_plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }
@@ -3166,8 +3481,12 @@ fn optimize_away_unnecessary_repartition2() -> 
Result<()> {
         "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
         "      DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], file_type=parquet",
     ];
-    assert_optimized!(expected, physical_plan.clone(), true);
-    assert_optimized!(expected, physical_plan, false);
+    assert_optimized!(
+        expected,
+        physical_plan.clone(),
+        &TestConfig::new(DoFirst::Distribution)
+    );
+    assert_optimized!(expected, physical_plan, 
&TestConfig::new(DoFirst::Sorting));
 
     Ok(())
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to