This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c4f8bf  Add write_parquet to dataframe (#1940)
4c4f8bf is described below

commit 4c4f8bfd6f53bcbe79329b1d2e05da6eb457ccf1
Author: Matthew Turner <[email protected]>
AuthorDate: Tue Mar 8 10:31:01 2022 -0500

    Add write_parquet to dataframe (#1940)
---
 datafusion/src/dataframe.rs                        |   8 ++
 datafusion/src/execution/context.rs                | 145 +--------------------
 datafusion/src/execution/dataframe_impl.rs         |  14 +-
 datafusion/src/physical_plan/file_format/mod.rs    |   1 +
 .../src/physical_plan/file_format/parquet.rs       | 138 ++++++++++++++++++++
 5 files changed, 163 insertions(+), 143 deletions(-)

diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index dfbbc61..7748a83 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -22,6 +22,7 @@ use crate::error::Result;
 use crate::logical_plan::{
     DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, Partitioning,
 };
+use parquet::file::properties::WriterProperties;
 use std::sync::Arc;
 
 use crate::physical_plan::SendableRecordBatchStream;
@@ -408,4 +409,11 @@ pub trait DataFrame: Send + Sync {
 
     /// Write a `DataFrame` to a CSV file.
     async fn write_csv(&self, path: &str) -> Result<()>;
+
+    /// Write a `DataFrame` to a Parquet file.
+    async fn write_parquet(
+        &self,
+        path: &str,
+        writer_properties: Option<WriterProperties>,
+    ) -> Result<()>;
 }
diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index a554a7c..72bf131 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -41,13 +41,9 @@ use crate::{
 use log::{debug, trace};
 use parking_lot::Mutex;
 use std::collections::{HashMap, HashSet};
-use std::path::Path;
+use std::path::PathBuf;
 use std::string::String;
 use std::sync::Arc;
-use std::{fs, path::PathBuf};
-
-use futures::{StreamExt, TryStreamExt};
-use tokio::task::{self, JoinHandle};
 
 use arrow::datatypes::SchemaRef;
 
@@ -80,7 +76,7 @@ use crate::physical_optimizer::repartition::Repartition;
 
 use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use crate::logical_plan::plan::Explain;
-use crate::physical_plan::file_format::plan_to_csv;
+use crate::physical_plan::file_format::{plan_to_csv, plan_to_parquet};
 use crate::physical_plan::planner::DefaultPhysicalPlanner;
 use crate::physical_plan::udf::ScalarUDF;
 use crate::physical_plan::ExecutionPlan;
@@ -93,7 +89,6 @@ use crate::variable::{VarProvider, VarType};
 use crate::{dataframe::DataFrame, physical_plan::udaf::AggregateUDF};
 use async_trait::async_trait;
 use chrono::{DateTime, Utc};
-use parquet::arrow::ArrowWriter;
 use parquet::file::properties::WriterProperties;
 
 use super::{
@@ -728,42 +723,7 @@ impl ExecutionContext {
         path: impl AsRef<str>,
         writer_properties: Option<WriterProperties>,
     ) -> Result<()> {
-        let path = path.as_ref();
-        // create directory to contain the Parquet files (one per partition)
-        let fs_path = Path::new(path);
-        let runtime = self.runtime_env();
-        match fs::create_dir(fs_path) {
-            Ok(()) => {
-                let mut tasks = vec![];
-                for i in 0..plan.output_partitioning().partition_count() {
-                    let plan = plan.clone();
-                    let filename = format!("part-{}.parquet", i);
-                    let path = fs_path.join(&filename);
-                    let file = fs::File::create(path)?;
-                    let mut writer = ArrowWriter::try_new(
-                        file.try_clone().unwrap(),
-                        plan.schema(),
-                        writer_properties.clone(),
-                    )?;
-                    let stream = plan.execute(i, runtime.clone()).await?;
-                    let handle: JoinHandle<Result<()>> = task::spawn(async 
move {
-                        stream
-                            .map(|batch| writer.write(&batch?))
-                            .try_collect()
-                            .await
-                            .map_err(DataFusionError::from)?;
-                        writer.close().map_err(DataFusionError::from).map(|_| 
())
-                    });
-                    tasks.push(handle);
-                }
-                futures::future::join_all(tasks).await;
-                Ok(())
-            }
-            Err(e) => Err(DataFusionError::Execution(format!(
-                "Could not create directory {}: {:?}",
-                path, e
-            ))),
-        }
+        plan_to_parquet(self, plan, path, writer_properties).await
     }
 
     /// Optimizes the logical plan by applying optimizer rules, and
@@ -2681,79 +2641,6 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn write_csv_results() -> Result<()> {
-        // create partitioned input file and context
-        let tmp_dir = TempDir::new()?;
-        let mut ctx = create_ctx(&tmp_dir, 4).await?;
-
-        // execute a simple query and write the results to CSV
-        let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
-        write_csv(&mut ctx, "SELECT c1, c2 FROM test", &out_dir).await?;
-
-        // create a new context and verify that the results were saved to a 
partitioned csv file
-        let mut ctx = ExecutionContext::new();
-
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("c1", DataType::UInt32, false),
-            Field::new("c2", DataType::UInt64, false),
-        ]));
-
-        // register each partition as well as the top level dir
-        let csv_read_option = CsvReadOptions::new().schema(&schema);
-        ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), 
csv_read_option)
-            .await?;
-        ctx.register_csv("allparts", &out_dir, csv_read_option)
-            .await?;
-
-        let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
part0").await?;
-        let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
allparts").await?;
-
-        let allparts_count: usize = allparts.iter().map(|batch| 
batch.num_rows()).sum();
-
-        assert_eq!(part0[0].schema(), allparts[0].schema());
-
-        assert_eq!(allparts_count, 40);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn write_parquet_results() -> Result<()> {
-        // create partitioned input file and context
-        let tmp_dir = TempDir::new()?;
-        let mut ctx = create_ctx(&tmp_dir, 4).await?;
-
-        // execute a simple query and write the results to parquet
-        let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
-        write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir, 
None).await?;
-
-        // create a new context and verify that the results were saved to a 
partitioned csv file
-        let mut ctx = ExecutionContext::new();
-
-        // register each partition as well as the top level dir
-        ctx.register_parquet("part0", &format!("{}/part-0.parquet", out_dir))
-            .await?;
-        ctx.register_parquet("part1", &format!("{}/part-1.parquet", out_dir))
-            .await?;
-        ctx.register_parquet("part2", &format!("{}/part-2.parquet", out_dir))
-            .await?;
-        ctx.register_parquet("part3", &format!("{}/part-3.parquet", out_dir))
-            .await?;
-        ctx.register_parquet("allparts", &out_dir).await?;
-
-        let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
part0").await?;
-        let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM 
allparts").await?;
-
-        let allparts_count: usize = allparts.iter().map(|batch| 
batch.num_rows()).sum();
-
-        assert_eq!(part0[0].schema(), allparts[0].schema());
-
-        assert_eq!(allparts_count, 40);
-
-        Ok(())
-    }
-
-    #[tokio::test]
     async fn query_csv_with_custom_partition_extension() -> Result<()> {
         let tmp_dir = TempDir::new()?;
 
@@ -3224,32 +3111,6 @@ mod tests {
         plan_and_collect(&mut ctx, sql).await
     }
 
-    /// Execute SQL and write results to partitioned csv files
-    async fn write_csv(
-        ctx: &mut ExecutionContext,
-        sql: &str,
-        out_dir: &str,
-    ) -> Result<()> {
-        let logical_plan = ctx.create_logical_plan(sql)?;
-        let logical_plan = ctx.optimize(&logical_plan)?;
-        let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
-        ctx.write_csv(physical_plan, out_dir).await
-    }
-
-    /// Execute SQL and write results to partitioned parquet files
-    async fn write_parquet(
-        ctx: &mut ExecutionContext,
-        sql: &str,
-        out_dir: &str,
-        writer_properties: Option<WriterProperties>,
-    ) -> Result<()> {
-        let logical_plan = ctx.create_logical_plan(sql)?;
-        let logical_plan = ctx.optimize(&logical_plan)?;
-        let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
-        ctx.write_parquet(physical_plan, out_dir, writer_properties)
-            .await
-    }
-
     /// Generate CSV partitions within the supplied directory
     fn populate_csv_partitions(
         tmp_dir: &TempDir,
diff --git a/datafusion/src/execution/dataframe_impl.rs 
b/datafusion/src/execution/dataframe_impl.rs
index c00729b..2af1cd4 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -35,11 +35,12 @@ use crate::{
     dataframe::*,
     physical_plan::{collect, collect_partitioned},
 };
+use parquet::file::properties::WriterProperties;
 
 use crate::arrow::util::pretty;
 use crate::datasource::TableProvider;
 use crate::datasource::TableType;
-use crate::physical_plan::file_format::plan_to_csv;
+use crate::physical_plan::file_format::{plan_to_csv, plan_to_parquet};
 use crate::physical_plan::{
     execute_stream, execute_stream_partitioned, ExecutionPlan, 
SendableRecordBatchStream,
 };
@@ -321,6 +322,17 @@ impl DataFrame for DataFrameImpl {
         let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
         plan_to_csv(&ctx, plan, path).await
     }
+
+    async fn write_parquet(
+        &self,
+        path: &str,
+        writer_properties: Option<WriterProperties>,
+    ) -> Result<()> {
+        let plan = self.create_physical_plan().await?;
+        let state = self.ctx_state.lock().clone();
+        let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
+        plan_to_parquet(&ctx, plan, path, writer_properties).await
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/src/physical_plan/file_format/mod.rs 
b/datafusion/src/physical_plan/file_format/mod.rs
index f38cf41..0e1e859 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -23,6 +23,7 @@ mod file_stream;
 mod json;
 mod parquet;
 
+pub(crate) use self::parquet::plan_to_parquet;
 pub use self::parquet::ParquetExec;
 use arrow::{
     array::{ArrayData, ArrayRef, DictionaryArray},
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs 
b/datafusion/src/physical_plan/file_format/parquet.rs
index 1ae3012..d4fa9db 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -17,13 +17,17 @@
 
 //! Execution plan for reading Parquet files
 
+use futures::{StreamExt, TryStreamExt};
 use std::fmt;
+use std::fs;
+use std::path::Path;
 use std::sync::Arc;
 use std::{any::Any, convert::TryInto};
 
 use crate::datasource::file_format::parquet::ChunkObjectReader;
 use crate::datasource::object_store::ObjectStore;
 use crate::datasource::PartitionedFile;
+use crate::execution::context::ExecutionContext;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::{
     error::{DataFusionError, Result},
@@ -47,6 +51,7 @@ use arrow::{
     record_batch::RecordBatch,
 };
 use log::debug;
+use parquet::arrow::ArrowWriter;
 use parquet::file::{
     metadata::RowGroupMetaData,
     reader::{FileReader, SerializedFileReader},
@@ -55,7 +60,9 @@ use parquet::file::{
 
 use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
+use parquet::file::properties::WriterProperties;
 
+use tokio::task::JoinHandle;
 use tokio::{
     sync::mpsc::{channel, Receiver, Sender},
     task,
@@ -517,6 +524,51 @@ fn read_partition(
     Ok(())
 }
 
+/// Executes a query and writes the results to a partitioned Parquet file.
+pub async fn plan_to_parquet(
+    context: &ExecutionContext,
+    plan: Arc<dyn ExecutionPlan>,
+    path: impl AsRef<str>,
+    writer_properties: Option<WriterProperties>,
+) -> Result<()> {
+    let path = path.as_ref();
+    // create directory to contain the Parquet files (one per partition)
+    let fs_path = Path::new(path);
+    let runtime = context.runtime_env();
+    match fs::create_dir(fs_path) {
+        Ok(()) => {
+            let mut tasks = vec![];
+            for i in 0..plan.output_partitioning().partition_count() {
+                let plan = plan.clone();
+                let filename = format!("part-{}.parquet", i);
+                let path = fs_path.join(&filename);
+                let file = fs::File::create(path)?;
+                let mut writer = ArrowWriter::try_new(
+                    file.try_clone().unwrap(),
+                    plan.schema(),
+                    writer_properties.clone(),
+                )?;
+                let stream = plan.execute(i, runtime.clone()).await?;
+                let handle: JoinHandle<Result<()>> = task::spawn(async move {
+                    stream
+                        .map(|batch| writer.write(&batch?))
+                        .try_collect()
+                        .await
+                        .map_err(DataFusionError::from)?;
+                    writer.close().map_err(DataFusionError::from).map(|_| ())
+                });
+                tasks.push(handle);
+            }
+            futures::future::join_all(tasks).await;
+            Ok(())
+        }
+        Err(e) => Err(DataFusionError::Execution(format!(
+            "Could not create directory {}: {:?}",
+            path, e
+        ))),
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::{
@@ -534,6 +586,8 @@ mod tests {
     };
 
     use super::*;
+    use crate::execution::options::CsvReadOptions;
+    use crate::prelude::ExecutionConfig;
     use arrow::array::Float32Array;
     use arrow::{
         array::{Int64Array, Int8Array, StringArray},
@@ -549,6 +603,9 @@ mod tests {
         },
         schema::types::SchemaDescPtr,
     };
+    use std::fs::File;
+    use std::io::Write;
+    use tempfile::TempDir;
 
     /// writes each RecordBatch as an individual parquet file and then
     /// reads it back in to the named location.
@@ -1225,4 +1282,85 @@ mod tests {
 
         Arc::new(SchemaDescriptor::new(Arc::new(schema)))
     }
+
+    fn populate_csv_partitions(
+        tmp_dir: &TempDir,
+        partition_count: usize,
+        file_extension: &str,
+    ) -> Result<SchemaRef> {
+        // define schema for data source (csv file)
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::UInt32, false),
+            Field::new("c2", DataType::UInt64, false),
+            Field::new("c3", DataType::Boolean, false),
+        ]));
+
+        // generate a partitioned file
+        for partition in 0..partition_count {
+            let filename = format!("partition-{}.{}", partition, 
file_extension);
+            let file_path = tmp_dir.path().join(&filename);
+            let mut file = File::create(file_path)?;
+
+            // generate some data
+            for i in 0..=10 {
+                let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
+                file.write_all(data.as_bytes())?;
+            }
+        }
+
+        Ok(schema)
+    }
+
+    #[tokio::test]
+    async fn write_parquet_results() -> Result<()> {
+        // create partitioned input file and context
+        let tmp_dir = TempDir::new()?;
+        // let mut ctx = create_ctx(&tmp_dir, 4).await?;
+        let mut ctx = ExecutionContext::with_config(
+            ExecutionConfig::new().with_target_partitions(8),
+        );
+        let schema = populate_csv_partitions(&tmp_dir, 4, ".csv")?;
+        // register csv file with the execution context
+        ctx.register_csv(
+            "test",
+            tmp_dir.path().to_str().unwrap(),
+            CsvReadOptions::new().schema(&schema),
+        )
+        .await?;
+
+        // execute a simple query and write the results to parquet
+        let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
+        let df = ctx.sql("SELECT c1, c2 FROM test").await?;
+        df.write_parquet(&out_dir, None).await?;
+        // write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir, 
None).await?;
+
+        // create a new context and verify that the results were saved to a 
partitioned csv file
+        let mut ctx = ExecutionContext::new();
+
+        // register each partition as well as the top level dir
+        ctx.register_parquet("part0", &format!("{}/part-0.parquet", out_dir))
+            .await?;
+        ctx.register_parquet("part1", &format!("{}/part-1.parquet", out_dir))
+            .await?;
+        ctx.register_parquet("part2", &format!("{}/part-2.parquet", out_dir))
+            .await?;
+        ctx.register_parquet("part3", &format!("{}/part-3.parquet", out_dir))
+            .await?;
+        ctx.register_parquet("allparts", &out_dir).await?;
+
+        let part0 = ctx.sql("SELECT c1, c2 FROM 
part0").await?.collect().await?;
+        let allparts = ctx
+            .sql("SELECT c1, c2 FROM allparts")
+            .await?
+            .collect()
+            .await?;
+
+        let allparts_count: usize = allparts.iter().map(|batch| 
batch.num_rows()).sum();
+
+        assert_eq!(part0[0].schema(), allparts[0].schema());
+
+        assert_eq!(allparts_count, 40);
+
+        Ok(())
+    }
 }

Reply via email to