This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 01234eb66d Fix: Preserve sorting for the COPY TO plan (#16785)
01234eb66d is described below

commit 01234eb66d242366e372efe6403a47065d56659b
Author: Bert Vermeiren <103956021+bert-beyondlo...@users.noreply.github.com>
AuthorDate: Thu Jul 17 23:31:42 2025 +0200

    Fix: Preserve sorting for the COPY TO plan (#16785)
    
    * Fix: Preserve sorting for the COPY TO plan
    
    * Update datafusion/core/tests/dataframe/mod.rs
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
    
    * Update mod.rs
    
    ---------
    
    Co-authored-by: Bert Vermeiren <bert.vermei...@datadobi.com>
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
    Co-authored-by: xudong.w <wxd963996...@gmail.com>
---
 datafusion/core/src/physical_planner.rs |  9 ++++-
 datafusion/core/tests/dataframe/mod.rs  | 62 +++++++++++++++++++++++++++++++--
 2 files changed, 68 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 293f2cfc96..ab123dccea 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -556,8 +556,15 @@ impl DefaultPhysicalPlanner {
                     file_extension,
                 };
 
+                let ordering = 
input_exec.properties().output_ordering().cloned();
+
                 sink_format
-                    .create_writer_physical_plan(input_exec, session_state, 
config, None)
+                    .create_writer_physical_plan(
+                        input_exec,
+                        session_state,
+                        config,
+                        ordering.map(Into::into),
+                    )
                     .await?
             }
             LogicalPlan::Dml(DmlStatement {
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index 84a9cdb1fa..36a1161541 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -68,6 +68,7 @@ use datafusion_common::{
     TableReference, UnnestOptions,
 };
 use datafusion_common_runtime::SpawnedTask;
+use datafusion_datasource::file_format::format_as_file_type;
 use datafusion_execution::config::SessionConfig;
 use datafusion_execution::runtime_env::RuntimeEnv;
 use datafusion_expr::expr::{FieldMetadata, GroupingSet, Sort, WindowFunction};
@@ -75,8 +76,8 @@ use datafusion_expr::var_provider::{VarProvider, VarType};
 use datafusion_expr::{
     cast, col, create_udf, exists, in_subquery, lit, out_ref_col, placeholder,
     scalar_subquery, when, wildcard, Expr, ExprFunctionExt, ExprSchemable, 
LogicalPlan,
-    ScalarFunctionImplementation, WindowFrame, WindowFrameBound, 
WindowFrameUnits,
-    WindowFunctionDefinition,
+    LogicalPlanBuilder, ScalarFunctionImplementation, SortExpr, WindowFrame,
+    WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
 };
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::Partitioning;
@@ -6193,3 +6194,60 @@ async fn test_copy_schema() -> Result<()> {
     assert_logical_expr_schema_eq_physical_expr_schema(result).await?;
     Ok(())
 }
+
+#[tokio::test]
+async fn test_copy_to_preserves_order() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+
+    let session_state = 
SessionStateBuilder::new_with_default_features().build();
+    let session_ctx = SessionContext::new_with_state(session_state);
+
+    let target_path = tmp_dir.path().join("target_ordered.csv");
+    let csv_file_format = session_ctx
+        .state()
+        .get_file_format_factory("csv")
+        .map(format_as_file_type)
+        .unwrap();
+
+    let ordered_select_plan = LogicalPlanBuilder::values(vec![
+        vec![lit(1u64)],
+        vec![lit(10u64)],
+        vec![lit(20u64)],
+        vec![lit(100u64)],
+    ])?
+    .sort(vec![SortExpr::new(col("column1"), false, true)])?
+    .build()?;
+
+    let copy_to_plan = LogicalPlanBuilder::copy_to(
+        ordered_select_plan,
+        target_path.to_str().unwrap().to_string(),
+        csv_file_format,
+        HashMap::new(),
+        vec![],
+    )?
+    .build()?;
+
+    let union_side_branch = 
LogicalPlanBuilder::values(vec![vec![lit(1u64)]])?.build()?;
+    let union_plan = LogicalPlanBuilder::from(copy_to_plan)
+        .union(union_side_branch)?
+        .build()?;
+
+    let frame = session_ctx.execute_logical_plan(union_plan).await?;
+    let physical_plan = frame.create_physical_plan().await?;
+
+    let physical_plan_format =
+        displayable(physical_plan.as_ref()).indent(true).to_string();
+
+    // Expect that input to the DataSinkExec is sorted correctly
+    assert_snapshot!(
+        physical_plan_format,
+        @r###"
+    UnionExec
+      DataSinkExec: sink=CsvSink(file_groups=[])
+        SortExec: expr=[column1@0 DESC], preserve_partitioning=[false]
+          DataSourceExec: partitions=1, partition_sizes=[1]
+      DataSourceExec: partitions=1, partition_sizes=[1]
+    "###
+    );
+    Ok(())
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to