bert-beyondloops opened a new issue, #16784: URL: https://github.com/apache/datafusion/issues/16784
### Describe the bug Following logical plan : ``` Union CopyTo: format=csv output_url=/var/folders/jj/jnb45q9s6h9cv9mxxw2bbr800000gn/T/.tmpxeMDM4/target_ordered.csv options: () Sort: column1 DESC NULLS FIRST Values: (UInt64(1)), (UInt64(10)), (UInt64(20)), (UInt64(100)) Values: (UInt64(1)) ``` results in optimized physical plan : ``` UnionExec DataSinkExec: sink=CsvSink(file_groups=[]) DataSourceExec: partitions=1, partition_sizes=[1] DataSourceExec: partitions=1, partition_sizes=[1] ``` **The sorting requirement has been faulty eliminated.** As a reference, at the starting point, the physical plan does include the SortExec plan before optimization on physical plan level takes place. Input physical plan before optimizations: ``` UnionExec DataSinkExec: sink=CsvSink(file_groups=[]) SortExec: expr=[column1@0 DESC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[1] DataSourceExec: partitions=1, partition_sizes=[1] ``` ### To Reproduce Adding rust test code to reproduce, since the above logical plan cannot be constructed via a SQL statement. SQL does not allow using the COPY TO (DML) as another (sub) query, although with the dataframe API, it is possible. ``` #[tokio::test] async fn test_copy_to_preserves_order() -> Result<()> { let tmp_dir = TempDir::new()?; let session_state = SessionStateBuilder::new_with_default_features().build(); let session_ctx = SessionContext::new_with_state(session_state); let target_path = tmp_dir.path().join("target_ordered.csv"); let csv_file_format = session_ctx .state() .get_file_format_factory("csv") .map(|file_format_factory| format_as_file_type(file_format_factory)) .unwrap(); let ordered_select_plan = LogicalPlanBuilder::values(vec![ vec![lit(1u64)], vec![lit(10u64)], vec![lit(20u64)], vec![lit(100u64)], ])? .sort(vec![SortExpr::new(col("column1"), false, true)])? .build()?; let copy_to_plan = LogicalPlanBuilder::copy_to( ordered_select_plan, target_path.to_str().unwrap().to_string(), csv_file_format, HashMap::new(), vec![], )? .build()?; let union_side_branch = LogicalPlanBuilder::values(vec![vec![lit(1u64)]])?.build()?; let union_plan = LogicalPlanBuilder::from(copy_to_plan) .union(union_side_branch)? .build()?; let frame = session_ctx.execute_logical_plan(union_plan).await?; let physical_plan = frame.create_physical_plan().await?; let physical_plan_format = displayable(physical_plan.as_ref()).indent(true).to_string(); assert_snapshot!( physical_plan_format, @r###" UnionExec DataSinkExec: sink=CsvSink(file_groups=[]) SortExec: expr=[column1@0 DESC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[1] DataSourceExec: partitions=1, partition_sizes=[1] "### ); Ok(()) } ``` This test will fail. ### Expected behavior The data sink should write the data in the order specified. The SortExec may not be eliminated. The actual physical plan should look like : ``` UnionExec DataSinkExec: sink=CsvSink(file_groups=[]) SortExec: expr=[column1@0 DESC], preserve_partitioning=[false] DataSourceExec: partitions=1, partition_sizes=[1] DataSourceExec: partitions=1, partition_sizes=[1] ``` ### Additional context I had to use a UNION to trigger this bug. It seems that in the other cases via standard COPY TO SQL, the ordering is kept. The OutputRequirementExec concept is in this case not pushed down below the DataSinkExec since somehow this algorithm only runs on single (non-multi) child plans? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org