alamb commented on a change in pull request #1776:
URL: https://github.com/apache/arrow-datafusion/pull/1776#discussion_r801054089
##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -178,38 +335,46 @@ mod tests {
.collect()
}
+ /// Runs the repartition optimizer and asserts the plan against the
expected
+ macro_rules! assert_optimized {
+ ($EXPECTED_LINES: expr, $PLAN: expr) => {
+ let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s|
*s).collect();
+
+ // run optimizer
+ let optimizer = Repartition {};
+ let optimized = optimizer
+ .optimize($PLAN,
&ExecutionConfig::new().with_target_partitions(10))?;
+
+ // Now format correctly
+ let plan = displayable(optimized.as_ref()).indent().to_string();
+ let actual_lines = trim_plan_display(&plan);
+
+ assert_eq!(
+ &expected_lines, &actual_lines,
+ "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ expected_lines, actual_lines
+ );
+ };
+ }
+
#[test]
fn added_repartition_to_single_partition() -> Result<()> {
- let optimizer = Repartition {};
+ let plan = hash_aggregate(parquet_exec());
Review comment:
I cleaned up the tests here to reduce the ceremony of invoking the
optimizer. The plans are all the same
##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -36,33 +128,70 @@ impl Repartition {
}
}
+/// Recursively visits all `plan`s puts and then optionally adds a
+/// `RepartitionExec` at the output of `plan` to match
+/// `target_partitions`
+///
+/// if `can_reorder` is false, means that the output of this node
+/// can not be reordered as as something upstream is relying on that order
+///
+/// If 'would_benefit` is false, the upstream operator doesn't
+/// benefit from additional reordering
+///
fn optimize_partitions(
target_partitions: usize,
plan: Arc<dyn ExecutionPlan>,
- should_repartition: bool,
+ can_reorder: bool,
Review comment:
Here is the change to the repartition logic to not repartition if it
would produce incorrect answers
##########
File path: datafusion/src/physical_plan/analyze.rs
##########
@@ -82,6 +83,10 @@ impl ExecutionPlan for AnalyzeExec {
Partitioning::UnknownPartitioning(1)
}
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
Review comment:
Having to sprinkle `output_ordering` around was annoying -- but I think
it may be worth it to try and avoid some nasty bugs.
##########
File path: datafusion/src/physical_plan/limit.rs
##########
@@ -300,11 +335,6 @@ impl ExecutionPlan for LocalLimitExec {
_ => Statistics::default(),
}
}
-
- fn should_repartition_children(&self) -> bool {
Review comment:
this is effectively renamed to `benefits_from_input_partitioning`
##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -219,20 +384,64 @@ mod tests {
"ParquetExec: limit=None, partitions=[x]",
];
- assert_eq!(&trim_plan_display(&plan), &expected);
+ assert_optimized!(expected, plan);
Ok(())
}
#[test]
- fn repartition_ignores_limit() -> Result<()> {
- let optimizer = Repartition {};
+ fn repartition_unsorted_limit() -> Result<()> {
+ let plan = limit_exec(filter_exec(parquet_exec()));
+
+ let expected = &[
+ "GlobalLimitExec: limit=100",
+ "LocalLimitExec: limit=100",
+ "FilterExec: c1@0",
+ // nothing sorts the data, so the local limit doesn't require
sorted data either
+ "RepartitionExec: partitioning=RoundRobinBatch(10)",
+ "ParquetExec: limit=None, partitions=[x]",
+ ];
+
+ assert_optimized!(expected, plan);
+ Ok(())
+ }
+
+ #[test]
+ fn repartition_sorted_limit() -> Result<()> {
+ let plan = limit_exec(sort_exec(parquet_exec()));
+
+ let expected = &[
+ "GlobalLimitExec: limit=100",
+ "LocalLimitExec: limit=100",
+ // data is sorted so can't repartition here
+ "SortExec: [c1@0 ASC]",
+ "ParquetExec: limit=None, partitions=[x]",
+ ];
+
+ assert_optimized!(expected, plan);
+ Ok(())
+ }
+
+ #[test]
+ fn repartition_sorted_limit_with_filter() -> Result<()> {
+ let plan = limit_exec(filter_exec(sort_exec(parquet_exec())));
- let optimized = optimizer.optimize(
-
hash_aggregate(limit_exec(filter_exec(limit_exec(parquet_exec())))),
- &ExecutionConfig::new().with_target_partitions(10),
- )?;
+ let expected = &[
+ "GlobalLimitExec: limit=100",
+ "LocalLimitExec: limit=100",
+ "FilterExec: c1@0",
+ // data is sorted so can't repartition here even though
Review comment:
However, once you put a sort here then repartitioning can't happen
without potentially getting wrong results
##########
File path: datafusion/src/physical_optimizer/repartition.rs
##########
@@ -219,20 +384,64 @@ mod tests {
"ParquetExec: limit=None, partitions=[x]",
];
- assert_eq!(&trim_plan_display(&plan), &expected);
+ assert_optimized!(expected, plan);
Ok(())
}
#[test]
- fn repartition_ignores_limit() -> Result<()> {
- let optimizer = Repartition {};
+ fn repartition_unsorted_limit() -> Result<()> {
Review comment:
new plans showing that data isn't repartitioned below limits if sorts
are present
##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -147,24 +147,59 @@ pub trait ExecutionPlan: Debug + Send + Sync {
Distribution::UnspecifiedDistribution
}
- /// Returns `true` if the direct children of this `ExecutionPlan` should
be repartitioned
- /// to introduce greater concurrency to the plan
+ /// Returns `true` if this operator relies on its inputs being
Review comment:
Here is the new API for `ExecutionPlan` that signal how / when
repartitioning occurs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]