This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch alamb/test_cleanup in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit 592a99cf4c6d38a2406454789ac6073df6c4fc96 Author: Andrew Lamb <and...@nerdnetworks.org> AuthorDate: Fri Dec 8 15:46:47 2023 -0500 Minor: Improve comments in EnforceDistribution tests --- .../src/physical_optimizer/enforce_distribution.rs | 34 ++++++++++++++-------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 4befea741c..3aed6555f3 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -256,7 +256,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { /// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering: /// Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan. /// Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan. -/// Requirements can be satisfied by adjusting keys ordering, clear the current requiements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan. +/// Requirements can be satisfied by adjusting keys ordering, clear the current requirements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan. /// /// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering: /// Requirements can not be satisfied, clear all the requirements, return the unchanged plan. @@ -928,7 +928,7 @@ fn add_roundrobin_on_top( // If any of the following conditions is true // - Preserving ordering is not helpful in terms of satisfying ordering requirements // - Usage of order preserving variants is not desirable - // (determined by flag `config.optimizer.bounded_order_preserving_variants`) + // (determined by flag `config.optimizer.prefer_existing_sort`) let partitioning = Partitioning::RoundRobinBatch(n_target); let repartition = RepartitionExec::try_new(input, partitioning)?.with_preserve_order(); @@ -996,7 +996,7 @@ fn add_hash_on_top( // - Preserving ordering is not helpful in terms of satisfying ordering // requirements. // - Usage of order preserving variants is not desirable (per the flag - // `config.optimizer.bounded_order_preserving_variants`). + // `config.optimizer.prefer_existing_sort`). let mut new_plan = if repartition_beneficial_stats { // Since hashing benefits from partitioning, add a round-robin repartition // before it: @@ -1045,7 +1045,7 @@ fn add_spm_on_top( // If any of the following conditions is true // - Preserving ordering is not helpful in terms of satisfying ordering requirements // - Usage of order preserving variants is not desirable - // (determined by flag `config.optimizer.bounded_order_preserving_variants`) + // (determined by flag `config.optimizer.prefer_existing_sort`) let should_preserve_ordering = input.output_ordering().is_some(); let new_plan: Arc<dyn ExecutionPlan> = if should_preserve_ordering { let existing_ordering = input.output_ordering().unwrap_or(&[]); @@ -2026,7 +2026,7 @@ pub(crate) mod tests { fn ensure_distribution_helper( plan: Arc<dyn ExecutionPlan>, target_partitions: usize, - bounded_order_preserving_variants: bool, + prefer_existing_sort: bool, ) -> Result<Arc<dyn ExecutionPlan>> { let distribution_context = DistributionContext::new(plan); let mut config = ConfigOptions::new(); @@ -2034,7 +2034,7 @@ pub(crate) mod tests { config.optimizer.enable_round_robin_repartition = false; config.optimizer.repartition_file_scans = false; config.optimizer.repartition_file_min_size = 1024; - config.optimizer.prefer_existing_sort = bounded_order_preserving_variants; + config.optimizer.prefer_existing_sort = prefer_existing_sort; ensure_distribution(distribution_context, &config).map(|item| item.into().plan) } @@ -2056,23 +2056,33 @@ pub(crate) mod tests { } /// Runs the repartition optimizer and asserts the plan against the expected + /// Arguments + /// * `EXPECTED_LINES` - Expected output plan + /// * `PLAN` - Input plan + /// * `FIRST_ENFORCE_DIST` - + /// true: (EnforceDistribution, EnforceDistribution, EnforceSorting) + /// false: else runs (EnforceSorting, EnforceDistribution, EnforceDistribution) + /// * `PREFER_EXISTING_SORT` (optional) - if true, will not repartition / resort data if it is already sorted + /// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to + /// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans + /// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition macro_rules! assert_optimized { ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => { assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024); }; - ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr) => { - assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $BOUNDED_ORDER_PRESERVING_VARIANTS, 10, false, 1024); + ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => { + assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024); }; - ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $BOUNDED_ORDER_PRESERVING_VARIANTS: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => { + ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); let mut config = ConfigOptions::new(); config.execution.target_partitions = $TARGET_PARTITIONS; config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS; config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE; - config.optimizer.prefer_existing_sort = $BOUNDED_ORDER_PRESERVING_VARIANTS; + config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT; // NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade // because they were written prior to the separation of `BasicEnforcement` into @@ -3294,7 +3304,7 @@ pub(crate) mod tests { ]; assert_optimized!(expected, exec, true); // In this case preserving ordering through order preserving operators is not desirable - // (according to flag: bounded_order_preserving_variants) + // (according to flag: PREFER_EXISTING_SORT) // hence in this case ordering lost during CoalescePartitionsExec and re-introduced with // SortExec at the top. let expected = &[ @@ -4341,7 +4351,7 @@ pub(crate) mod tests { "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]", ]; - // last flag sets config.optimizer.bounded_order_preserving_variants + // last flag sets config.optimizer.PREFER_EXISTING_SORT assert_optimized!(expected, physical_plan.clone(), true, true); assert_optimized!(expected, physical_plan, false, true);