This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 01234eb66d Fix: Preserve sorting for the COPY TO plan (#16785) 01234eb66d is described below commit 01234eb66d242366e372efe6403a47065d56659b Author: Bert Vermeiren <103956021+bert-beyondlo...@users.noreply.github.com> AuthorDate: Thu Jul 17 23:31:42 2025 +0200 Fix: Preserve sorting for the COPY TO plan (#16785) * Fix: Preserve sorting for the COPY TO plan * Update datafusion/core/tests/dataframe/mod.rs Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> * Update mod.rs --------- Co-authored-by: Bert Vermeiren <bert.vermei...@datadobi.com> Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> Co-authored-by: xudong.w <wxd963996...@gmail.com> --- datafusion/core/src/physical_planner.rs | 9 ++++- datafusion/core/tests/dataframe/mod.rs | 62 +++++++++++++++++++++++++++++++-- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 293f2cfc96..ab123dccea 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -556,8 +556,15 @@ impl DefaultPhysicalPlanner { file_extension, }; + let ordering = input_exec.properties().output_ordering().cloned(); + sink_format - .create_writer_physical_plan(input_exec, session_state, config, None) + .create_writer_physical_plan( + input_exec, + session_state, + config, + ordering.map(Into::into), + ) .await? } LogicalPlan::Dml(DmlStatement { diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 84a9cdb1fa..36a1161541 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -68,6 +68,7 @@ use datafusion_common::{ TableReference, UnnestOptions, }; use datafusion_common_runtime::SpawnedTask; +use datafusion_datasource::file_format::format_as_file_type; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::expr::{FieldMetadata, GroupingSet, Sort, WindowFunction}; @@ -75,8 +76,8 @@ use datafusion_expr::var_provider::{VarProvider, VarType}; use datafusion_expr::{ cast, col, create_udf, exists, in_subquery, lit, out_ref_col, placeholder, scalar_subquery, when, wildcard, Expr, ExprFunctionExt, ExprSchemable, LogicalPlan, - ScalarFunctionImplementation, WindowFrame, WindowFrameBound, WindowFrameUnits, - WindowFunctionDefinition, + LogicalPlanBuilder, ScalarFunctionImplementation, SortExpr, WindowFrame, + WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, }; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::Partitioning; @@ -6193,3 +6194,60 @@ async fn test_copy_schema() -> Result<()> { assert_logical_expr_schema_eq_physical_expr_schema(result).await?; Ok(()) } + +#[tokio::test] +async fn test_copy_to_preserves_order() -> Result<()> { + let tmp_dir = TempDir::new()?; + + let session_state = SessionStateBuilder::new_with_default_features().build(); + let session_ctx = SessionContext::new_with_state(session_state); + + let target_path = tmp_dir.path().join("target_ordered.csv"); + let csv_file_format = session_ctx + .state() + .get_file_format_factory("csv") + .map(format_as_file_type) + .unwrap(); + + let ordered_select_plan = LogicalPlanBuilder::values(vec![ + vec![lit(1u64)], + vec![lit(10u64)], + vec![lit(20u64)], + vec![lit(100u64)], + ])? + .sort(vec![SortExpr::new(col("column1"), false, true)])? + .build()?; + + let copy_to_plan = LogicalPlanBuilder::copy_to( + ordered_select_plan, + target_path.to_str().unwrap().to_string(), + csv_file_format, + HashMap::new(), + vec![], + )? + .build()?; + + let union_side_branch = LogicalPlanBuilder::values(vec![vec![lit(1u64)]])?.build()?; + let union_plan = LogicalPlanBuilder::from(copy_to_plan) + .union(union_side_branch)? + .build()?; + + let frame = session_ctx.execute_logical_plan(union_plan).await?; + let physical_plan = frame.create_physical_plan().await?; + + let physical_plan_format = + displayable(physical_plan.as_ref()).indent(true).to_string(); + + // Expect that input to the DataSinkExec is sorted correctly + assert_snapshot!( + physical_plan_format, + @r###" + UnionExec + DataSinkExec: sink=CsvSink(file_groups=[]) + SortExec: expr=[column1@0 DESC], preserve_partitioning=[false] + DataSourceExec: partitions=1, partition_sizes=[1] + DataSourceExec: partitions=1, partition_sizes=[1] + "### + ); + Ok(()) +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org