xudong963 commented on code in PR #14637:
URL: https://github.com/apache/datafusion/pull/14637#discussion_r1957606217


##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -2280,3 +2283,49 @@ async fn 
test_not_replaced_with_partial_sort_for_unbounded_input() -> Result<()>
     assert_optimized!(expected_input, expected_no_change, physical_plan, true);
     Ok(())
 }
+
+#[tokio::test]
+async fn test_preserve_needed_coalesce() -> Result<()> {
+    // Input to EnforceSorting, from our test case.
+    let plan = projection_exec_with_alias(
+        union_exec(vec![parquet_exec_with_stats(); 2]),
+        vec![
+            ("a".to_string(), "a".to_string()),
+            ("b".to_string(), "value".to_string()),
+        ],
+    );
+    let plan = Arc::new(CoalescePartitionsExec::new(plan));
+    let schema = schema();
+    let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
+        expr: col("a", &schema).unwrap(),
+        options: SortOptions::default(),
+    }]);
+    let plan: Arc<dyn ExecutionPlan> =
+        single_partitioned_aggregate(plan, vec![("a".to_string(), 
"a1".to_string())]);
+    let plan = sort_exec(sort_key, plan);
+
+    // Starting plan: as in our test case.
+    let starting_plan = vec![
+        "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
+        "  AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
+        "    CoalescePartitionsExec",

Review Comment:
   I'm curious why not the pattern
   ```
    "AggregateExec: mode=Final, ...",
         "AggregateExec: mode=Partial, ...",
   ```
   Is it because of the cost model?



##########
datafusion/physical-optimizer/src/enforce_sorting/mod.rs:
##########
@@ -246,32 +282,50 @@ fn replace_with_partial_sort(
 /// This function turns plans of the form
 /// ```text
 ///      "SortExec: expr=\[a@0 ASC\]",
-///      "  CoalescePartitionsExec",
-///      "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+///      "  ...nodes..."
+///      "    CoalescePartitionsExec",
+///      "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
 /// ```
 /// to
 /// ```text
 ///      "SortPreservingMergeExec: \[a@0 ASC\]",
 ///      "  SortExec: expr=\[a@0 ASC\]",
-///      "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+///      "    ...nodes..."
+///      "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
 /// ```
 /// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s.
 /// By performing sorting in parallel, we can increase performance in some 
scenarios.
+///
+/// This requires that there are no nodes between the [`SortExec`] and 
[`CoalescePartitionsExec`]

Review Comment:
   ```suggestion
   /// This requires that there are nodes between the [`SortExec`] and 
[`CoalescePartitionsExec`]
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to