This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6cfd1cf1e0 Support (order by / sort) for DataFrameWriteOptions (#13874)
6cfd1cf1e0 is described below

commit 6cfd1cf1e030ccfe3b17621cc51fdcefcceae018
Author: Qi Zhu <[email protected]>
AuthorDate: Tue Dec 24 18:10:54 2024 +0800

    Support (order by / sort) for DataFrameWriteOptions (#13874)
    
    * Support (order by / sort) for DataFrameWriteOptions
    
    * Fix fmt
    
    * Fix import
    
    * Add insert into example
---
 datafusion/core/src/dataframe/mod.rs     | 276 ++++++++++++++++++++++++++++++-
 datafusion/core/src/dataframe/parquet.rs |  10 +-
 2 files changed, 282 insertions(+), 4 deletions(-)

diff --git a/datafusion/core/src/dataframe/mod.rs 
b/datafusion/core/src/dataframe/mod.rs
index 3d4cfb57e7..60a09301ae 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -77,6 +77,9 @@ pub struct DataFrameWriteOptions {
     /// Sets which columns should be used for hive-style partitioned writes by 
name.
     /// Can be set to empty vec![] for non-partitioned writes.
     partition_by: Vec<String>,
+    /// Sets which columns should be used for sorting the output by name.
+    /// Can be set to empty vec![] for non-sorted writes.
+    sort_by: Vec<SortExpr>,
 }
 
 impl DataFrameWriteOptions {
@@ -86,6 +89,7 @@ impl DataFrameWriteOptions {
             insert_op: InsertOp::Append,
             single_file_output: false,
             partition_by: vec![],
+            sort_by: vec![],
         }
     }
 
@@ -106,6 +110,12 @@ impl DataFrameWriteOptions {
         self.partition_by = partition_by;
         self
     }
+
+    /// Sets the sort_by columns for output sorting
+    pub fn with_sort_by(mut self, sort_by: Vec<SortExpr>) -> Self {
+        self.sort_by = sort_by;
+        self
+    }
 }
 
 impl Default for DataFrameWriteOptions {
@@ -1517,8 +1527,17 @@ impl DataFrame {
         write_options: DataFrameWriteOptions,
     ) -> Result<Vec<RecordBatch>, DataFusionError> {
         let arrow_schema = Schema::from(self.schema());
+
+        let plan = if write_options.sort_by.is_empty() {
+            self.plan
+        } else {
+            LogicalPlanBuilder::from(self.plan)
+                .sort(write_options.sort_by)?
+                .build()?
+        };
+
         let plan = LogicalPlanBuilder::insert_into(
-            self.plan,
+            plan,
             table_name.to_owned(),
             &arrow_schema,
             write_options.insert_op,
@@ -1577,8 +1596,16 @@ impl DataFrame {
 
         let file_type = format_as_file_type(format);
 
+        let plan = if options.sort_by.is_empty() {
+            self.plan
+        } else {
+            LogicalPlanBuilder::from(self.plan)
+                .sort(options.sort_by)?
+                .build()?
+        };
+
         let plan = LogicalPlanBuilder::copy_to(
-            self.plan,
+            plan,
             path.into(),
             file_type,
             HashMap::new(),
@@ -1638,8 +1665,16 @@ impl DataFrame {
 
         let file_type = format_as_file_type(format);
 
+        let plan = if options.sort_by.is_empty() {
+            self.plan
+        } else {
+            LogicalPlanBuilder::from(self.plan)
+                .sort(options.sort_by)?
+                .build()?
+        };
+
         let plan = LogicalPlanBuilder::copy_to(
-            self.plan,
+            plan,
             path.into(),
             file_type,
             Default::default(),
@@ -1940,6 +1975,7 @@ mod tests {
     use crate::physical_plan::{ColumnarValue, Partitioning, PhysicalExpr};
     use crate::test_util::{register_aggregate_csv, test_table, 
test_table_with_name};
 
+    use crate::prelude::{CsvReadOptions, NdJsonReadOptions, 
ParquetReadOptions};
     use arrow::array::Int32Array;
     use datafusion_common::{assert_batches_eq, Constraint, Constraints, 
ScalarValue};
     use datafusion_common_runtime::SpawnedTask;
@@ -1954,6 +1990,7 @@ mod tests {
     use datafusion_physical_expr::expressions::Column;
     use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
     use sqlparser::ast::NullTreatment;
+    use tempfile::TempDir;
 
     // Get string representation of the plan
     async fn assert_physical_plan(df: &DataFrame, expected: Vec<&str>) {
@@ -4057,4 +4094,237 @@ mod tests {
 
         Ok(())
     }
+
+    // Test issue: https://github.com/apache/datafusion/issues/13873
+    #[tokio::test]
+    async fn write_parquet_with_order() -> Result<()> {
+        let tmp_dir = TempDir::new()?;
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, true),
+        ]));
+
+        let ctx = SessionContext::new();
+        let write_df = ctx.read_batch(RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 5, 7, 3, 2])),
+                Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])),
+            ],
+        )?)?;
+
+        let test_path = tmp_dir.path().join("test.parquet");
+
+        write_df
+            .clone()
+            .write_parquet(
+                test_path.to_str().unwrap(),
+                DataFrameWriteOptions::new()
+                    .with_sort_by(vec![col("a").sort(true, true)]),
+                None,
+            )
+            .await?;
+
+        let ctx = SessionContext::new();
+        ctx.register_parquet(
+            "data",
+            test_path.to_str().unwrap(),
+            ParquetReadOptions::default(),
+        )
+        .await?;
+
+        let df = ctx.sql("SELECT * FROM data").await?;
+        let results = df.collect().await?;
+
+        let df_explain = ctx.sql("explain SELECT a FROM data").await?;
+        let explain_result = df_explain.collect().await?;
+
+        println!("explain_result {:?}", explain_result);
+
+        assert_batches_eq!(
+            &[
+                "+---+---+",
+                "| a | b |",
+                "+---+---+",
+                "| 1 | 2 |",
+                "| 2 | 6 |",
+                "| 3 | 5 |",
+                "| 5 | 3 |",
+                "| 7 | 4 |",
+                "+---+---+",
+            ],
+            &results
+        );
+        Ok(())
+    }
+
+    // Test issue: https://github.com/apache/datafusion/issues/13873
+    #[tokio::test]
+    async fn write_csv_with_order() -> Result<()> {
+        let tmp_dir = TempDir::new()?;
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, true),
+        ]));
+
+        let ctx = SessionContext::new();
+        let write_df = ctx.read_batch(RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 5, 7, 3, 2])),
+                Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])),
+            ],
+        )?)?;
+
+        let test_path = tmp_dir.path().join("test.csv");
+
+        write_df
+            .clone()
+            .write_csv(
+                test_path.to_str().unwrap(),
+                DataFrameWriteOptions::new()
+                    .with_sort_by(vec![col("a").sort(true, true)]),
+                None,
+            )
+            .await?;
+
+        let ctx = SessionContext::new();
+        ctx.register_csv(
+            "data",
+            test_path.to_str().unwrap(),
+            CsvReadOptions::new().schema(&schema),
+        )
+        .await?;
+
+        let df = ctx.sql("SELECT * FROM data").await?;
+        let results = df.collect().await?;
+
+        assert_batches_eq!(
+            &[
+                "+---+---+",
+                "| a | b |",
+                "+---+---+",
+                "| 1 | 2 |",
+                "| 2 | 6 |",
+                "| 3 | 5 |",
+                "| 5 | 3 |",
+                "| 7 | 4 |",
+                "+---+---+",
+            ],
+            &results
+        );
+        Ok(())
+    }
+
+    // Test issue: https://github.com/apache/datafusion/issues/13873
+    #[tokio::test]
+    async fn write_json_with_order() -> Result<()> {
+        let tmp_dir = TempDir::new()?;
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, true),
+        ]));
+
+        let ctx = SessionContext::new();
+        let write_df = ctx.read_batch(RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 5, 7, 3, 2])),
+                Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])),
+            ],
+        )?)?;
+
+        let test_path = tmp_dir.path().join("test.json");
+
+        write_df
+            .clone()
+            .write_json(
+                test_path.to_str().unwrap(),
+                DataFrameWriteOptions::new()
+                    .with_sort_by(vec![col("a").sort(true, true)]),
+                None,
+            )
+            .await?;
+
+        let ctx = SessionContext::new();
+        ctx.register_json(
+            "data",
+            test_path.to_str().unwrap(),
+            NdJsonReadOptions::default().schema(&schema),
+        )
+        .await?;
+
+        let df = ctx.sql("SELECT * FROM data").await?;
+        let results = df.collect().await?;
+
+        assert_batches_eq!(
+            &[
+                "+---+---+",
+                "| a | b |",
+                "+---+---+",
+                "| 1 | 2 |",
+                "| 2 | 6 |",
+                "| 3 | 5 |",
+                "| 5 | 3 |",
+                "| 7 | 4 |",
+                "+---+---+",
+            ],
+            &results
+        );
+        Ok(())
+    }
+
+    // Test issue: https://github.com/apache/datafusion/issues/13873
+    #[tokio::test]
+    async fn write_table_with_order() -> Result<()> {
+        let tmp_dir = TempDir::new()?;
+        let ctx = SessionContext::new();
+        let location = tmp_dir.path().join("test_table/");
+
+        let mut write_df = ctx
+            .sql("values ('z'), ('x'), ('a'), ('b'), ('c')")
+            .await
+            .unwrap();
+
+        // Ensure the column names and types match the target table
+        write_df = write_df
+            .with_column_renamed("column1", "tablecol1")
+            .unwrap();
+        let sql_str =
+            "create external table data(tablecol1 varchar) stored as parquet 
location '"
+                .to_owned()
+                + location.to_str().unwrap()
+                + "'";
+
+        ctx.sql(sql_str.as_str()).await?.collect().await?;
+
+        // This is equivalent to INSERT INTO test.
+        write_df
+            .clone()
+            .write_table(
+                "data",
+                DataFrameWriteOptions::new()
+                    .with_sort_by(vec![col("tablecol1").sort(true, true)]),
+            )
+            .await?;
+
+        let df = ctx.sql("SELECT * FROM data").await?;
+        let results = df.collect().await?;
+
+        assert_batches_eq!(
+            &[
+                "+-----------+",
+                "| tablecol1 |",
+                "+-----------+",
+                "| a         |",
+                "| b         |",
+                "| c         |",
+                "| x         |",
+                "| z         |",
+                "+-----------+",
+            ],
+            &results
+        );
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/dataframe/parquet.rs 
b/datafusion/core/src/dataframe/parquet.rs
index 0af68783c4..1dd4d68fca 100644
--- a/datafusion/core/src/dataframe/parquet.rs
+++ b/datafusion/core/src/dataframe/parquet.rs
@@ -74,8 +74,16 @@ impl DataFrame {
 
         let file_type = format_as_file_type(format);
 
+        let plan = if options.sort_by.is_empty() {
+            self.plan
+        } else {
+            LogicalPlanBuilder::from(self.plan)
+                .sort(options.sort_by)?
+                .build()?
+        };
+
         let plan = LogicalPlanBuilder::copy_to(
-            self.plan,
+            plan,
             path.into(),
             file_type,
             Default::default(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to