alamb commented on code in PR #14637:
URL: https://github.com/apache/datafusion/pull/14637#discussion_r1953591399


##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -2280,3 +2285,62 @@ async fn 
test_not_replaced_with_partial_sort_for_unbounded_input() -> Result<()>
     assert_optimized!(expected_input, expected_no_change, physical_plan, true);
     Ok(())
 }
+
+#[tokio::test]
+async fn test_preserve_needed_coalesce() -> Result<()> {
+    // Input to EnforceSorting, from our test case.
+    let plan = projection_exec_with_alias(
+        union_exec(vec![parquet_exec_with_stats(); 2]),
+        vec![
+            ("a".to_string(), "a".to_string()),
+            ("b".to_string(), "value".to_string()),
+        ],
+    );
+    let plan = Arc::new(CoalescePartitionsExec::new(plan));
+    let schema = schema();
+    let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
+        expr: col("a", &schema).unwrap(),
+        options: SortOptions::default(),
+    }]);
+    let plan: Arc<dyn ExecutionPlan> =
+        single_partitioned_aggregate(plan, vec![("a".to_string(), 
"a1".to_string())]);
+    let plan = sort_exec(sort_key, plan);
+
+    // Starting plan: as in our test case.
+    assert_eq!(
+        get_plan_string(&plan),
+        vec![
+            "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
+            "  AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], 
aggr=[]",
+            "    CoalescePartitionsExec",
+            "      ProjectionExec: expr=[a@0 as a, b@1 as value]",
+            "        UnionExec",
+            "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+            "          DataSourceExec: file_groups={1 group: [[x]]}, 
projection=[a, b, c, d, e], file_type=parquet",
+        ],
+    );
+    // Test: plan is valid.
+    assert_sanity_check(&plan, true);
+
+    // EnforceSorting will remove the coalesce, and add an SPM further up 
(above the aggregate).
+    let optimizer = EnforceSorting::new();
+    let optimized = optimizer.optimize(plan, &Default::default())?;
+    assert_eq!(
+        get_plan_string(&optimized),
+        vec![
+            "SortPreservingMergeExec: [a@0 ASC]",
+            "  SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
+            "    AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], 
aggr=[]",

Review Comment:
   I agree this is a bug -- the `AggregateExec (with mode=SinglePartitioned)` 
requires the input to have a single partition
   
   In the input plan,  `CoalescePartitionsExec` makes a single partition
   
   Int he new plan, that `CoalescePartitionsExec` is removed incorrectly
   
   I would expect that the AggregateExec reports that its input distribution 
requirement was required a single partition and that the enforce sorting pass 
would respect this
   
   However, the actual `AggregateExec::required_input_distribution` seems 
somewhat more subtle.:
   
   
https://docs.rs/datafusion-physical-plan/45.0.0/src/datafusion_physical_plan/aggregates/mod.rs.html#812-824
   
   I think it is saying the input needs to be hash partitioned by the group 
keys (which this plan has clearly violated)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to