bert-beyondloops opened a new issue, #16784:
URL: https://github.com/apache/datafusion/issues/16784

   ### Describe the bug
   
   Following logical plan :
   
   ```
       Union
         CopyTo: format=csv 
output_url=/var/folders/jj/jnb45q9s6h9cv9mxxw2bbr800000gn/T/.tmpxeMDM4/target_ordered.csv
 options: ()
           Sort: column1 DESC NULLS FIRST
             Values: (UInt64(1)), (UInt64(10)), (UInt64(20)), (UInt64(100))
         Values: (UInt64(1))
   ```
   
   results in optimized physical plan : 
   
   ```
    UnionExec
         DataSinkExec: sink=CsvSink(file_groups=[])
           DataSourceExec: partitions=1, partition_sizes=[1]
         DataSourceExec: partitions=1, partition_sizes=[1]
   ```
   
   **The sorting requirement has been faulty eliminated.**
   
   
   As a reference, at the starting point, the physical plan does include the 
SortExec plan before optimization on physical plan level takes place.
   
   Input physical plan before optimizations:
   ```
   UnionExec
         DataSinkExec: sink=CsvSink(file_groups=[])
           SortExec: expr=[column1@0 DESC], preserve_partitioning=[false]
             DataSourceExec: partitions=1, partition_sizes=[1]
         DataSourceExec: partitions=1, partition_sizes=[1]
   ```
   
    
   
   ### To Reproduce
   
   Adding rust test code to reproduce, since the above logical plan cannot be 
constructed via a SQL statement.
   SQL does not allow using the COPY TO (DML) as another (sub) query, although 
with the dataframe API, it is possible.
   
   ```
   #[tokio::test]
   async fn test_copy_to_preserves_order() -> Result<()> {
       let tmp_dir = TempDir::new()?;
   
       let session_state = 
SessionStateBuilder::new_with_default_features().build();
       let session_ctx = SessionContext::new_with_state(session_state);
   
       let target_path = tmp_dir.path().join("target_ordered.csv");
       let csv_file_format = session_ctx
           .state()
           .get_file_format_factory("csv")
           .map(|file_format_factory| format_as_file_type(file_format_factory))
           .unwrap();
   
       let ordered_select_plan = LogicalPlanBuilder::values(vec![
           vec![lit(1u64)],
           vec![lit(10u64)],
           vec![lit(20u64)],
           vec![lit(100u64)],
       ])?
       .sort(vec![SortExpr::new(col("column1"), false, true)])?
       .build()?;
   
       let copy_to_plan = LogicalPlanBuilder::copy_to(
           ordered_select_plan,
           target_path.to_str().unwrap().to_string(),
           csv_file_format,
           HashMap::new(),
           vec![],
       )?
       .build()?;
   
       let union_side_branch = 
LogicalPlanBuilder::values(vec![vec![lit(1u64)]])?.build()?;
       let union_plan = LogicalPlanBuilder::from(copy_to_plan)
           .union(union_side_branch)?
           .build()?;
   
       let frame = session_ctx.execute_logical_plan(union_plan).await?;
       let physical_plan = frame.create_physical_plan().await?;
   
       let physical_plan_format =
           displayable(physical_plan.as_ref()).indent(true).to_string();
   
       assert_snapshot!(
           physical_plan_format,
           @r###"
       UnionExec
         DataSinkExec: sink=CsvSink(file_groups=[])
           SortExec: expr=[column1@0 DESC], preserve_partitioning=[false]
             DataSourceExec: partitions=1, partition_sizes=[1]
         DataSourceExec: partitions=1, partition_sizes=[1]
       "###
       );
       Ok(())
   }
   ```
   
   This test will fail.
   
   
   ### Expected behavior
   
   The data sink should write the data in the order specified.
   The SortExec may not be eliminated.
   
   The actual physical plan should look like :
   ```
   UnionExec
         DataSinkExec: sink=CsvSink(file_groups=[])
           SortExec: expr=[column1@0 DESC], preserve_partitioning=[false]
             DataSourceExec: partitions=1, partition_sizes=[1]
         DataSourceExec: partitions=1, partition_sizes=[1]
   ```
   
   ### Additional context
   
   I had to use a UNION to trigger this bug. It seems that in the other cases 
via standard COPY TO SQL, the ordering is kept.
   
   The OutputRequirementExec concept is in this case not pushed down below the 
DataSinkExec since somehow this algorithm only runs on single (non-multi) child 
plans?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to