NGA-TRAN commented on code in PR #18352:
URL: https://github.com/apache/datafusion/pull/18352#discussion_r2477450389


##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,119 @@ async fn test_count_wildcard_on_window() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> {
+    reproducer_e2e_impl(false).await?;
+
+    // 💥 Doesn't pass, and generates this plan:
+    //
+    // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], 
ordering_mode=Sorted
+    //   SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+    //     CoalescePartitionsExec
+    //       AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+    //         UnionExec
+    //           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], 
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+    //           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> {
+    reproducer_e2e_impl(true).await?;
+
+    // 💥 Doesn't pass, and generates this plan:
+    //
+    // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], 
ordering_mode=Sorted
+    //   SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+    //     SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
+    //       AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+    //         UnionExec
+    //           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], 
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+    //           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+    Ok(())
+}
+
+async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result<()> {
+    let config = SessionConfig::default()
+        .with_target_partitions(1)
+        .with_repartition_sorts(repartition_sorts);
+    let ctx = SessionContext::new_with_config(config);
+
+    let testdata = parquet_test_data();
+
+    // Register "sorted" table, that is sorted
+    ctx.register_parquet(
+        "sorted",
+        &format!("{testdata}/alltypes_tiny_pages.parquet"),
+        ParquetReadOptions::default()
+            .file_sort_order(vec![vec![col("id").sort(true, false)]]),

Review Comment:
   > Can this cause issues?
   Likely not but I am not % sure if we do anything special with parquet file.



##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -2996,6 +2997,119 @@ async fn test_count_wildcard_on_window() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_false() -> Result<()> {
+    reproducer_e2e_impl(false).await?;
+
+    // 💥 Doesn't pass, and generates this plan:
+    //
+    // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], 
ordering_mode=Sorted
+    //   SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
+    //     CoalescePartitionsExec
+    //       AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+    //         UnionExec
+    //           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], 
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+    //           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn reproducer_e2e_with_repartition_sorts_true() -> Result<()> {
+    reproducer_e2e_impl(true).await?;
+
+    // 💥 Doesn't pass, and generates this plan:
+    //
+    // AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], 
ordering_mode=Sorted
+    //   SortPreservingMergeExec: [id@0 ASC NULLS LAST]
+    //     SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
+    //       AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
+    //         UnionExec
+    //           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], 
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
+    //           DataSourceExec: file_groups={1 group: 
[[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
+
+    Ok(())
+}
+
+async fn reproducer_e2e_impl(repartition_sorts: bool) -> Result<()> {
+    let config = SessionConfig::default()
+        .with_target_partitions(1)
+        .with_repartition_sorts(repartition_sorts);
+    let ctx = SessionContext::new_with_config(config);
+
+    let testdata = parquet_test_data();
+
+    // Register "sorted" table, that is sorted
+    ctx.register_parquet(
+        "sorted",
+        &format!("{testdata}/alltypes_tiny_pages.parquet"),
+        ParquetReadOptions::default()
+            .file_sort_order(vec![vec![col("id").sort(true, false)]]),
+    )
+        .await?;
+
+    // Register "unsorted" table
+    ctx.register_parquet(
+        "unsorted",
+        &format!("{testdata}/alltypes_tiny_pages.parquet"),
+        ParquetReadOptions::default()
+    )
+        .await?;
+
+    let source_sorted = ctx
+        .table("sorted")
+        .await
+        .unwrap()
+        .select(vec![col("id")])
+        .unwrap();
+
+    let source_unsorted = ctx
+        .table("unsorted")
+        .await
+        .unwrap()
+        .select(vec![col("id")])
+        .unwrap();
+
+    let source_unsorted_resorted = source_unsorted
+        .sort(vec![col("id").sort(true, false)])?;
+
+    let union = source_sorted.union(source_unsorted_resorted)?;
+
+    let agg = union.aggregate(vec![col("id")], vec![])?;
+
+    let df = agg;

Review Comment:
   The verbose explain is super helpful. The issue is at `physical_plan after 
EnforceSorting`. When you create an issue, copy both explain and explain 
verbose in your description. In the code reproducer (this PR), can you copy 
from line 100 to line 120 of your verbose in comments to show the issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to