This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7d774481ae Implement Support for Copy To Logical and Physical plans
(#7283)
7d774481ae is described below
commit 7d774481aedc027b7f68226b2c3a4fc0db959fc2
Author: Devin D'Angelo <[email protected]>
AuthorDate: Wed Aug 16 12:39:30 2023 -0400
Implement Support for Copy To Logical and Physical plans (#7283)
* rebase
* maybe windows fix
* rebase add explain copy tests
* rebase and fix pipeline
---
.gitignore | 5 +-
datafusion/core/src/dataframe.rs | 55 ++++++---
datafusion/core/src/datasource/file_format/csv.rs | 99 +++++++++++-----
datafusion/core/src/datasource/file_format/json.rs | 83 +++++++++----
.../core/src/datasource/file_format/parquet.rs | 79 +++++++-----
.../core/src/datasource/file_format/write.rs | 17 ++-
datafusion/core/src/datasource/listing/table.rs | 2 +
datafusion/core/src/datasource/listing/url.rs | 1 -
datafusion/core/src/datasource/memory.rs | 8 +-
.../core/src/datasource/physical_plan/csv.rs | 25 +++-
.../core/src/datasource/physical_plan/json.rs | 25 +++-
.../core/src/datasource/physical_plan/mod.rs | 4 +
.../core/src/datasource/physical_plan/parquet.rs | 27 ++++-
datafusion/core/src/datasource/provider.rs | 6 +-
datafusion/core/src/physical_plan/insert.rs | 14 +--
datafusion/core/src/physical_planner.rs | 73 ++++++++++++
datafusion/expr/src/logical_plan/builder.rs | 18 +++
datafusion/expr/src/logical_plan/dml.rs | 60 +++++++++-
datafusion/expr/src/logical_plan/mod.rs | 2 +-
datafusion/expr/src/logical_plan/plan.rs | 27 +++++
datafusion/expr/src/utils.rs | 14 +++
.../optimizer/src/common_subexpr_eliminate.rs | 1 +
datafusion/proto/src/logical_plan/mod.rs | 3 +
datafusion/sql/src/statement.rs | 78 ++++++++++--
datafusion/sql/tests/sql_integration.rs | 24 ++++
datafusion/sqllogictest/bin/sqllogictests.rs | 17 +++
datafusion/sqllogictest/test_files/copy.slt | 132 +++++++++++++++++++--
27 files changed, 756 insertions(+), 143 deletions(-)
diff --git a/.gitignore b/.gitignore
index 65d3c0f345..203455e4a7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -103,4 +103,7 @@ datafusion/CHANGELOG.md.bak
.githubchangeloggenerator.cache*
# Generated tpch data
-datafusion/core/tests/sqllogictests/test_files/tpch/data/*
+datafusion/sqllogictests/test_files/tpch/data/*
+
+# Scratch temp dir for sqllogictests
+datafusion/sqllogictest/test_files/scratch*
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 5b1983f567..232bedfe0b 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -25,6 +25,7 @@ use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::{DataFusionError, SchemaError, UnnestOptions};
+use datafusion_expr::dml::OutputFileFormat;
use parquet::file::properties::WriterProperties;
use datafusion_common::{Column, DFSchema, ScalarValue};
@@ -37,7 +38,6 @@ use crate::arrow::datatypes::Schema;
use crate::arrow::datatypes::SchemaRef;
use crate::arrow::record_batch::RecordBatch;
use crate::arrow::util::pretty;
-use crate::datasource::physical_plan::{plan_to_csv, plan_to_json,
plan_to_parquet};
use crate::datasource::{provider_as_source, MemTable, TableProvider};
use crate::error::Result;
use crate::execution::{
@@ -992,28 +992,55 @@ impl DataFrame {
}
/// Write a `DataFrame` to a CSV file.
- pub async fn write_csv(self, path: &str) -> Result<()> {
- let plan = self.session_state.create_physical_plan(&self.plan).await?;
- let task_ctx = Arc::new(self.task_ctx());
- plan_to_csv(task_ctx, plan, path).await
+ pub async fn write_csv(
+ self,
+ path: &str,
+ ) -> Result<Vec<RecordBatch>, DataFusionError> {
+ let plan = LogicalPlanBuilder::copy_to(
+ self.plan,
+ path.into(),
+ OutputFileFormat::CSV,
+ true,
+ // TODO implement options
+ vec![],
+ )?
+ .build()?;
+ DataFrame::new(self.session_state, plan).collect().await
}
/// Write a `DataFrame` to a Parquet file.
pub async fn write_parquet(
self,
path: &str,
- writer_properties: Option<WriterProperties>,
- ) -> Result<()> {
- let plan = self.session_state.create_physical_plan(&self.plan).await?;
- let task_ctx = Arc::new(self.task_ctx());
- plan_to_parquet(task_ctx, plan, path, writer_properties).await
+ _writer_properties: Option<WriterProperties>,
+ ) -> Result<Vec<RecordBatch>, DataFusionError> {
+ let plan = LogicalPlanBuilder::copy_to(
+ self.plan,
+ path.into(),
+ OutputFileFormat::PARQUET,
+ true,
+ // TODO implement options
+ vec![],
+ )?
+ .build()?;
+ DataFrame::new(self.session_state, plan).collect().await
}
/// Executes a query and writes the results to a partitioned JSON file.
- pub async fn write_json(self, path: impl AsRef<str>) -> Result<()> {
- let plan = self.session_state.create_physical_plan(&self.plan).await?;
- let task_ctx = Arc::new(self.task_ctx());
- plan_to_json(task_ctx, plan, path).await
+ pub async fn write_json(
+ self,
+ path: &str,
+ ) -> Result<Vec<RecordBatch>, DataFusionError> {
+ let plan = LogicalPlanBuilder::copy_to(
+ self.plan,
+ path.into(),
+ OutputFileFormat::JSON,
+ true,
+ // TODO implement options
+ vec![],
+ )?
+ .build()?;
+ DataFrame::new(self.session_state, plan).collect().await
}
/// Add an additional column to the DataFrame.
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index c3ab50fd43..59c4fedeff 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -47,7 +47,7 @@ use crate::datasource::physical_plan::{
};
use crate::error::Result;
use crate::execution::context::SessionState;
-use crate::physical_plan::insert::{DataSink, InsertExec};
+use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use rand::distributions::{Alphanumeric, DistString};
@@ -277,6 +277,7 @@ impl FileFormat for CsvFormat {
"Inserting compressed CSV is not implemented yet.".into(),
));
}
+
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(CsvSink::new(
conf,
@@ -285,7 +286,7 @@ impl FileFormat for CsvFormat {
self.file_compression_type,
));
- Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
+ Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
}
}
@@ -505,12 +506,14 @@ impl DataSink for CsvSink {
let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
-
// Construct serializer and writer for each file group
let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
let mut writers = vec![];
match self.config.writer_mode {
FileWriterMode::Append => {
+ if !self.config.per_thread_output {
+ return
Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented
for CsvSink in Append mode".into()));
+ }
for file_group in &self.config.file_groups {
// In append mode, consider has_header flag only when file
is empty (at the start).
// For other modes, use has_header flag as is.
@@ -542,38 +545,72 @@ impl DataSink for CsvSink {
FileWriterMode::PutMultipart => {
// Currently assuming only 1 partition path (i.e. not
hive-style partitioning on a column)
let base_path = &self.config.table_paths[0];
- // Uniquely identify this batch of files with a random string,
to prevent collisions overwriting files
- let write_id = Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
- for part_idx in 0..num_partitions {
- let header = self.has_header;
- let builder =
WriterBuilder::new().with_delimiter(self.delimiter);
- let serializer = CsvSerializer::new()
- .with_builder(builder)
- .with_header(header);
- let file_path = base_path
- .prefix()
- .child(format!("/{}_{}.csv", write_id, part_idx));
- let object_meta = ObjectMeta {
- location: file_path,
- last_modified: chrono::offset::Utc::now(),
- size: 0,
- e_tag: None,
- };
- let writer = create_writer(
- self.config.writer_mode,
- self.file_compression_type,
- object_meta.into(),
- object_store.clone(),
- )
- .await?;
-
- serializers.push(Box::new(serializer));
- writers.push(writer);
+ match self.config.per_thread_output {
+ true => {
+ // Uniquely identify this batch of files with a random
string, to prevent collisions overwriting files
+ let write_id =
+ Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
+ for part_idx in 0..num_partitions {
+ let header = self.has_header;
+ let builder =
+
WriterBuilder::new().with_delimiter(self.delimiter);
+ let serializer = CsvSerializer::new()
+ .with_builder(builder)
+ .with_header(header);
+ serializers.push(Box::new(serializer));
+ let file_path = base_path
+ .prefix()
+ .child(format!("{}_{}.csv", write_id,
part_idx));
+ let object_meta = ObjectMeta {
+ location: file_path,
+ last_modified: chrono::offset::Utc::now(),
+ size: 0,
+ e_tag: None,
+ };
+ let writer = create_writer(
+ self.config.writer_mode,
+ self.file_compression_type,
+ object_meta.into(),
+ object_store.clone(),
+ )
+ .await?;
+ writers.push(writer);
+ }
+ }
+ false => {
+ let header = self.has_header;
+ let builder =
WriterBuilder::new().with_delimiter(self.delimiter);
+ let serializer = CsvSerializer::new()
+ .with_builder(builder)
+ .with_header(header);
+ serializers.push(Box::new(serializer));
+ let file_path = base_path.prefix();
+ let object_meta = ObjectMeta {
+ location: file_path.clone(),
+ last_modified: chrono::offset::Utc::now(),
+ size: 0,
+ e_tag: None,
+ };
+ let writer = create_writer(
+ self.config.writer_mode,
+ self.file_compression_type,
+ object_meta.into(),
+ object_store.clone(),
+ )
+ .await?;
+ writers.push(writer);
+ }
}
}
}
- stateless_serialize_and_write_files(data, serializers, writers).await
+ stateless_serialize_and_write_files(
+ data,
+ serializers,
+ writers,
+ self.config.per_thread_output,
+ )
+ .await
}
}
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 8472f4e5c1..6870fc1b41 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -43,7 +43,7 @@ use object_store::{GetResult, ObjectMeta, ObjectStore};
use crate::datasource::physical_plan::FileGroupDisplay;
use crate::physical_plan::insert::DataSink;
-use crate::physical_plan::insert::InsertExec;
+use crate::physical_plan::insert::FileSinkExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
@@ -187,7 +187,7 @@ impl FileFormat for JsonFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(JsonSink::new(conf, self.file_compression_type));
- Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
+ Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
}
}
@@ -280,6 +280,9 @@ impl DataSink for JsonSink {
let mut writers = vec![];
match self.config.writer_mode {
FileWriterMode::Append => {
+ if !self.config.per_thread_output {
+ return
Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented
for JsonSink in Append mode".into()));
+ }
for file_group in &self.config.file_groups {
let serializer = JsonSerializer::new();
serializers.push(Box::new(serializer));
@@ -303,33 +306,63 @@ impl DataSink for JsonSink {
FileWriterMode::PutMultipart => {
// Currently assuming only 1 partition path (i.e. not
hive-style partitioning on a column)
let base_path = &self.config.table_paths[0];
- // Uniquely identify this batch of files with a random string,
to prevent collisions overwriting files
- let write_id = Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
- for part_idx in 0..num_partitions {
- let serializer = JsonSerializer::new();
- serializers.push(Box::new(serializer));
- let file_path = base_path
- .prefix()
- .child(format!("/{}_{}.json", write_id, part_idx));
- let object_meta = ObjectMeta {
- location: file_path,
- last_modified: chrono::offset::Utc::now(),
- size: 0,
- e_tag: None,
- };
- let writer = create_writer(
- self.config.writer_mode,
- self.file_compression_type,
- object_meta.into(),
- object_store.clone(),
- )
- .await?;
- writers.push(writer);
+ match self.config.per_thread_output {
+ true => {
+ // Uniquely identify this batch of files with a random
string, to prevent collisions overwriting files
+ let write_id =
+ Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
+ for part_idx in 0..num_partitions {
+ let serializer = JsonSerializer::new();
+ serializers.push(Box::new(serializer));
+ let file_path = base_path
+ .prefix()
+ .child(format!("{}_{}.json", write_id,
part_idx));
+ let object_meta = ObjectMeta {
+ location: file_path,
+ last_modified: chrono::offset::Utc::now(),
+ size: 0,
+ e_tag: None,
+ };
+ let writer = create_writer(
+ self.config.writer_mode,
+ self.file_compression_type,
+ object_meta.into(),
+ object_store.clone(),
+ )
+ .await?;
+ writers.push(writer);
+ }
+ }
+ false => {
+ let serializer = JsonSerializer::new();
+ serializers.push(Box::new(serializer));
+ let file_path = base_path.prefix();
+ let object_meta = ObjectMeta {
+ location: file_path.clone(),
+ last_modified: chrono::offset::Utc::now(),
+ size: 0,
+ e_tag: None,
+ };
+ let writer = create_writer(
+ self.config.writer_mode,
+ self.file_compression_type,
+ object_meta.into(),
+ object_store.clone(),
+ )
+ .await?;
+ writers.push(writer);
+ }
}
}
}
- stateless_serialize_and_write_files(data, serializers, writers).await
+ stateless_serialize_and_write_files(
+ data,
+ serializers,
+ writers,
+ self.config.per_thread_output,
+ )
+ .await
}
}
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index de3ec3ffb7..6688d3dd37 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -53,11 +53,12 @@ use crate::config::ConfigOptions;
use crate::datasource::physical_plan::{
FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter,
};
+
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::insert::{DataSink, InsertExec};
+use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::{
Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan,
SendableRecordBatchStream,
Statistics,
@@ -238,7 +239,7 @@ impl FileFormat for ParquetFormat {
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(ParquetSink::new(conf));
- Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
+ Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
}
}
@@ -604,7 +605,6 @@ impl DisplayAs for ParquetSink {
}
/// Parses datafusion.execution.parquet.encoding String to a
parquet::basic::Encoding
-/// TODO use upstream version: <https://github.com/apache/arrow-rs/issues/4693>
fn parse_encoding_string(str_setting: &str) ->
Result<parquet::basic::Encoding> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
@@ -668,7 +668,6 @@ fn require_level(codec: &str, level: Option<u32>) ->
Result<u32> {
}
/// Parses datafusion.execution.parquet.compression String to a
parquet::basic::Compression
-/// TODO use upstream version: <https://github.com/apache/arrow-rs/issues/4693>
fn parse_compression_string(str_setting: &str) ->
Result<parquet::basic::Compression> {
let str_setting_lower: &str = &str_setting.to_lowercase();
let (codec, level) = split_compression_string(str_setting_lower)?;
@@ -719,7 +718,6 @@ fn parse_compression_string(str_setting: &str) ->
Result<parquet::basic::Compres
}
}
-/// TODO use upstream version: <https://github.com/apache/arrow-rs/issues/4693>
fn parse_version_string(str_setting: &str) -> Result<WriterVersion> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
@@ -732,7 +730,6 @@ fn parse_version_string(str_setting: &str) ->
Result<WriterVersion> {
}
}
-/// TODO use upstream version: <https://github.com/apache/arrow-rs/issues/4693>
fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatistics> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
@@ -848,26 +845,48 @@ impl DataSink for ParquetSink {
FileWriterMode::PutMultipart => {
// Currently assuming only 1 partition path (i.e. not
hive-style partitioning on a column)
let base_path = &self.config.table_paths[0];
- // Uniquely identify this batch of files with a random string,
to prevent collisions overwriting files
- let write_id = Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
- for part_idx in 0..num_partitions {
- let file_path = base_path
- .prefix()
- .child(format!("/{}_{}.parquet", write_id, part_idx));
- let object_meta = ObjectMeta {
- location: file_path,
- last_modified: chrono::offset::Utc::now(),
- size: 0,
- e_tag: None,
- };
- let writer = self
- .create_writer(
- object_meta.into(),
- object_store.clone(),
- parquet_props.clone(),
- )
- .await?;
- writers.push(writer);
+ match self.config.per_thread_output {
+ true => {
+ // Uniquely identify this batch of files with a random
string, to prevent collisions overwriting files
+ let write_id =
+ Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
+ for part_idx in 0..num_partitions {
+ let file_path = base_path
+ .prefix()
+ .child(format!("{}_{}.parquet", write_id,
part_idx));
+ let object_meta = ObjectMeta {
+ location: file_path,
+ last_modified: chrono::offset::Utc::now(),
+ size: 0,
+ e_tag: None,
+ };
+ let writer = self
+ .create_writer(
+ object_meta.into(),
+ object_store.clone(),
+ parquet_props.clone(),
+ )
+ .await?;
+ writers.push(writer);
+ }
+ }
+ false => {
+ let file_path = base_path.prefix();
+ let object_meta = ObjectMeta {
+ location: file_path.clone(),
+ last_modified: chrono::offset::Utc::now(),
+ size: 0,
+ e_tag: None,
+ };
+ let writer = self
+ .create_writer(
+ object_meta.into(),
+ object_store.clone(),
+ parquet_props.clone(),
+ )
+ .await?;
+ writers.push(writer);
+ }
}
}
}
@@ -875,8 +894,12 @@ impl DataSink for ParquetSink {
let mut row_count = 0;
// TODO parallelize serialization accross partitions and batches
within partitions
// see: https://github.com/apache/arrow-datafusion/issues/7079
- for idx in 0..num_partitions {
- while let Some(batch) = data[idx].next().await.transpose()? {
+ for (part_idx, data_stream) in
data.iter_mut().enumerate().take(num_partitions) {
+ let idx = match self.config.per_thread_output {
+ true => part_idx,
+ false => 0,
+ };
+ while let Some(batch) = data_stream.next().await.transpose()? {
row_count += batch.num_rows();
// TODO cleanup all multipart writes when any encounters an
error
writers[idx].write(&batch).await?;
diff --git a/datafusion/core/src/datasource/file_format/write.rs
b/datafusion/core/src/datasource/file_format/write.rs
index c256c9689a..3c005894f6 100644
--- a/datafusion/core/src/datasource/file_format/write.rs
+++ b/datafusion/core/src/datasource/file_format/write.rs
@@ -328,16 +328,29 @@ pub(crate) async fn stateless_serialize_and_write_files(
mut data: Vec<SendableRecordBatchStream>,
mut serializers: Vec<Box<dyn BatchSerializer>>,
mut writers: Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>,
+ per_thread_output: bool,
) -> Result<u64> {
+ if !per_thread_output && (serializers.len() != 1 || writers.len() != 1) {
+ return Err(DataFusionError::Internal(
+ "per_thread_output is false, but got more than 1 writer!".into(),
+ ));
+ }
let num_partitions = data.len();
+ if per_thread_output && (num_partitions != writers.len()) {
+ return Err(DataFusionError::Internal("per_thread_output is true, but
did not get 1 writer for each output partition!".into()));
+ }
let mut row_count = 0;
// Map errors to DatafusionError.
let err_converter =
|_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
// TODO parallelize serialization accross partitions and batches within
partitions
// see: https://github.com/apache/arrow-datafusion/issues/7079
- for idx in 0..num_partitions {
- while let Some(maybe_batch) = data[idx].next().await {
+ for (part_idx, data_stream) in
data.iter_mut().enumerate().take(num_partitions) {
+ let idx = match per_thread_output {
+ true => part_idx,
+ false => 0,
+ };
+ while let Some(maybe_batch) = data_stream.next().await {
// Write data to files in a round robin fashion:
let serializer = &mut serializers[idx];
let batch = check_for_errors(maybe_batch, &mut writers).await?;
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index d4e2c4aafe..5cc31c8397 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -883,6 +883,8 @@ impl TableProvider for ListingTable {
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
writer_mode,
+ // TODO: when listing table is known to be backed by a single
file, this should be false
+ per_thread_output: true,
overwrite,
};
diff --git a/datafusion/core/src/datasource/listing/url.rs
b/datafusion/core/src/datasource/listing/url.rs
index a4940f57f8..0c017a517c 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -107,7 +107,6 @@ impl ListingTableUrl {
.map_err(|_| DataFusionError::Internal(format!("Can not open path:
{s}")))?;
// TODO: Currently we do not have an IO-related error variant that
accepts ()
// or a string. Once we have such a variant, change the error
type above.
-
Ok(Self::new(url, glob))
}
diff --git a/datafusion/core/src/datasource/memory.rs
b/datafusion/core/src/datasource/memory.rs
index 0441ac7058..54ebfd5cab 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -35,7 +35,7 @@ use crate::datasource::{TableProvider, TableType};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
-use crate::physical_plan::insert::{DataSink, InsertExec};
+use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::{common, SendableRecordBatchStream};
use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
@@ -219,7 +219,11 @@ impl TableProvider for MemTable {
));
}
let sink = Arc::new(MemSink::new(self.batches.clone()));
- Ok(Arc::new(InsertExec::new(input, sink, self.schema.clone())))
+ Ok(Arc::new(FileSinkExec::new(
+ input,
+ sink,
+ self.schema.clone(),
+ )))
}
}
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 6bf5e36340..e3ac811736 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -660,7 +660,7 @@ mod tests {
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use rstest::*;
- use std::fs::File;
+ use std::fs::{self, File};
use std::io::Write;
use tempfile::TempDir;
use url::Url;
@@ -1191,11 +1191,32 @@ mod tests {
Field::new("c2", DataType::UInt64, false),
]));
+ // get name of first part
+ let paths = fs::read_dir(&out_dir).unwrap();
+ let mut part_0_name: String = "".to_owned();
+ for path in paths {
+ let path = path.unwrap();
+ let name = path
+ .path()
+ .file_name()
+ .expect("Should be a file name")
+ .to_str()
+ .expect("Should be a str")
+ .to_owned();
+ if name.ends_with("_0.csv") {
+ part_0_name = name;
+ break;
+ }
+ }
+
+ if part_0_name.is_empty() {
+ panic!("Did not find part_0 in csv output files!")
+ }
// register each partition as well as the top level dir
let csv_read_option = CsvReadOptions::new().schema(&schema);
ctx.register_csv(
"part0",
- &format!("{out_dir}/part-0.csv"),
+ &format!("{out_dir}/{part_0_name}"),
csv_read_option.clone(),
)
.await?;
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index b8ad2aa0a6..62fcc320eb 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -331,6 +331,7 @@ mod tests {
use crate::test::partitioned_file_groups;
use datafusion_common::cast::{as_int32_array, as_int64_array,
as_string_array};
use rstest::*;
+ use std::fs;
use std::path::Path;
use tempfile::TempDir;
use url::Url;
@@ -699,11 +700,33 @@ mod tests {
// create a new context and verify that the results were saved to a
partitioned csv file
let ctx = SessionContext::new();
+ // get name of first part
+ let paths = fs::read_dir(&out_dir).unwrap();
+ let mut part_0_name: String = "".to_owned();
+ for path in paths {
+ let name = path
+ .unwrap()
+ .path()
+ .file_name()
+ .expect("Should be a file name")
+ .to_str()
+ .expect("Should be a str")
+ .to_owned();
+ if name.ends_with("_0.json") {
+ part_0_name = name;
+ break;
+ }
+ }
+
+ if part_0_name.is_empty() {
+ panic!("Did not find part_0 in json output files!")
+ }
+
// register each partition as well as the top level dir
let json_read_option = NdJsonReadOptions::default();
ctx.register_json(
"part0",
- &format!("{out_dir}/part-0.json"),
+ &format!("{out_dir}/{part_0_name}"),
json_read_option.clone(),
)
.await?;
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index b0914b0816..06c16ad751 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -332,6 +332,10 @@ pub struct FileSinkConfig {
pub table_partition_cols: Vec<(String, DataType)>,
/// A writer mode that determines how data is written to the file
pub writer_mode: FileWriterMode,
+ /// If false, it is assumed there is a single table_path which is a file
to which all data should be written
+ /// regardless of input partitioning. Otherwise, each table path is
assumed to be a directory
+ /// to which each output partition is written to its own output file.
+ pub per_thread_output: bool,
/// Controls whether existing data should be overwritten by this sink
pub overwrite: bool,
}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 24243ec749..3ef1d13c26 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -768,7 +768,7 @@ mod tests {
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::ObjectMeta;
- use std::fs::File;
+ use std::fs::{self, File};
use std::io::Write;
use tempfile::TempDir;
use url::Url;
@@ -1956,31 +1956,46 @@ mod tests {
df.write_parquet(out_dir_url, None).await?;
// write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir,
None).await?;
- // create a new context and verify that the results were saved to a
partitioned csv file
+ // create a new context and verify that the results were saved to a
partitioned parquet file
let ctx = SessionContext::new();
+ // get write_id
+ let mut paths = fs::read_dir(&out_dir).unwrap();
+ let path = paths.next();
+ let name = path
+ .unwrap()?
+ .path()
+ .file_name()
+ .expect("Should be a file name")
+ .to_str()
+ .expect("Should be a str")
+ .to_owned();
+ println!("{name}");
+ let (parsed_id, _) = name.split_once('_').expect("File should contain
_ !");
+ let write_id = parsed_id.to_owned();
+
// register each partition as well as the top level dir
ctx.register_parquet(
"part0",
- &format!("{out_dir}/part-0.parquet"),
+ &format!("{out_dir}/{write_id}_0.parquet"),
ParquetReadOptions::default(),
)
.await?;
ctx.register_parquet(
"part1",
- &format!("{out_dir}/part-1.parquet"),
+ &format!("{out_dir}/{write_id}_1.parquet"),
ParquetReadOptions::default(),
)
.await?;
ctx.register_parquet(
"part2",
- &format!("{out_dir}/part-2.parquet"),
+ &format!("{out_dir}/{write_id}_2.parquet"),
ParquetReadOptions::default(),
)
.await?;
ctx.register_parquet(
"part3",
- &format!("{out_dir}/part-3.parquet"),
+ &format!("{out_dir}/{write_id}_3.parquet"),
ParquetReadOptions::default(),
)
.await?;
diff --git a/datafusion/core/src/datasource/provider.rs
b/datafusion/core/src/datasource/provider.rs
index 6a81f89696..48965addc7 100644
--- a/datafusion/core/src/datasource/provider.rs
+++ b/datafusion/core/src/datasource/provider.rs
@@ -119,10 +119,10 @@ pub trait TableProvider: Sync + Send {
///
/// # See Also
///
- /// See [`InsertExec`] for the common pattern of inserting a
- /// single stream of `RecordBatch`es.
+ /// See [`FileSinkExec`] for the common pattern of inserting a
+ /// streams of `RecordBatch`es as files to an ObjectStore.
///
- /// [`InsertExec`]: crate::physical_plan::insert::InsertExec
+ /// [`FileSinkExec`]: crate::physical_plan::insert::FileSinkExec
async fn insert_into(
&self,
_state: &SessionState,
diff --git a/datafusion/core/src/physical_plan/insert.rs
b/datafusion/core/src/physical_plan/insert.rs
index a05cb5fb15..acbca834f2 100644
--- a/datafusion/core/src/physical_plan/insert.rs
+++ b/datafusion/core/src/physical_plan/insert.rs
@@ -64,7 +64,7 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync {
/// Execution plan for writing record batches to a [`DataSink`]
///
/// Returns a single row with the number of values written
-pub struct InsertExec {
+pub struct FileSinkExec {
/// Input plan that produces the record batches to be written.
input: Arc<dyn ExecutionPlan>,
/// Sink to which to write
@@ -75,13 +75,13 @@ pub struct InsertExec {
count_schema: SchemaRef,
}
-impl fmt::Debug for InsertExec {
+impl fmt::Debug for FileSinkExec {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "InsertExec schema: {:?}", self.count_schema)
+ write!(f, "FileSinkExec schema: {:?}", self.count_schema)
}
}
-impl InsertExec {
+impl FileSinkExec {
/// Create a plan to write to `sink`
pub fn new(
input: Arc<dyn ExecutionPlan>,
@@ -149,7 +149,7 @@ impl InsertExec {
}
}
-impl DisplayAs for InsertExec {
+impl DisplayAs for FileSinkExec {
fn fmt_as(
&self,
t: DisplayFormatType,
@@ -164,7 +164,7 @@ impl DisplayAs for InsertExec {
}
}
-impl ExecutionPlan for InsertExec {
+impl ExecutionPlan for FileSinkExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
@@ -233,7 +233,7 @@ impl ExecutionPlan for InsertExec {
) -> Result<SendableRecordBatchStream> {
if partition != 0 {
return Err(DataFusionError::Internal(
- "InsertExec can only be called on partition 0!".into(),
+ "FileSinkExec can only be called on partition 0!".into(),
));
}
let data = self.execute_all_input_streams(context.clone())?;
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 6b868b9b24..f154c2a173 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -17,6 +17,15 @@
//! Planner for [`LogicalPlan`] to [`ExecutionPlan`]
+use crate::datasource::file_format::arrow::ArrowFormat;
+use crate::datasource::file_format::avro::AvroFormat;
+use crate::datasource::file_format::csv::CsvFormat;
+use crate::datasource::file_format::json::JsonFormat;
+use crate::datasource::file_format::parquet::ParquetFormat;
+use crate::datasource::file_format::write::FileWriterMode;
+use crate::datasource::file_format::FileFormat;
+use crate::datasource::listing::ListingTableUrl;
+use crate::datasource::physical_plan::FileSinkConfig;
use crate::datasource::source_as_provider;
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_expr::utils::generate_sort_key;
@@ -29,6 +38,8 @@ use crate::logical_expr::{
Repartition, Union, UserDefinedLogicalNode,
};
use datafusion_common::display::ToStringifiedPlan;
+use datafusion_expr::dml::{CopyTo, OutputFileFormat};
+use url::Url;
use crate::logical_expr::{Limit, Values};
use crate::physical_expr::create_physical_expr;
@@ -80,6 +91,7 @@ use itertools::{multiunzip, Itertools};
use log::{debug, trace};
use std::collections::HashMap;
use std::fmt::Write;
+use std::fs;
use std::sync::Arc;
fn create_function_physical_name(
@@ -544,6 +556,67 @@ impl DefaultPhysicalPlanner {
let unaliased: Vec<Expr> =
filters.into_iter().map(unalias).collect();
source.scan(session_state, projection.as_ref(),
&unaliased, *fetch).await
}
+ LogicalPlan::Copy(CopyTo{
+ input,
+ output_url,
+ file_format,
+ per_thread_output,
+ options: _,
+ }) => {
+ let input_exec = self.create_initial_plan(input,
session_state).await?;
+
+ // Get object store for specified output_url
+ // if user did not pass in a url, we assume it is a local
file path
+ // this requires some special handling as copy can create
non
+ // existing file paths
+ let is_valid_url = Url::parse(output_url).is_ok();
+
+ // TODO: make this behavior configurable via options
(should copy to create path/file as needed?)
+ // TODO: add additional configurable options for if
existing files should be overwritten or
+ // appended to
+ let parsed_url = match is_valid_url {
+ true => ListingTableUrl::parse(output_url),
+ false => {
+ let path = std::path::PathBuf::from(output_url);
+ if !path.exists(){
+ if *per_thread_output{
+ fs::create_dir_all(path)?;
+ } else{
+ fs::File::create(path)?;
+ }
+ }
+ ListingTableUrl::parse(output_url)
+ }
+ }?;
+
+ let object_store_url = parsed_url.object_store();
+
+ let schema: Schema = (**input.schema()).clone().into();
+
+ // Set file sink related options
+ let config = FileSinkConfig {
+ object_store_url,
+ table_paths: vec![parsed_url],
+ file_groups: vec![],
+ output_schema: Arc::new(schema),
+ table_partition_cols: vec![],
+ writer_mode: FileWriterMode::PutMultipart,
+ per_thread_output: *per_thread_output,
+ overwrite: false,
+ };
+
+ // TODO: implement statement level overrides for each file
type
+ // E.g. CsvFormat::from_options(options)
+ let sink_format: Arc<dyn FileFormat> = match file_format {
+ OutputFileFormat::CSV =>
Arc::new(CsvFormat::default()),
+ OutputFileFormat::PARQUET =>
Arc::new(ParquetFormat::default()),
+ OutputFileFormat::JSON =>
Arc::new(JsonFormat::default()),
+ OutputFileFormat::AVRO => Arc::new(AvroFormat {} ),
+ OutputFileFormat::ARROW => Arc::new(ArrowFormat {}),
+ };
+
+ sink_format.create_writer_physical_plan(input_exec,
session_state, config).await
+ }
LogicalPlan::Dml(DmlStatement {
table_name,
op: WriteOp::InsertInto,
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index f89be03f79..29d7571d36 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -17,6 +17,7 @@
//! This module provides a builder for creating LogicalPlans
+use crate::dml::{CopyTo, OutputFileFormat};
use crate::expr::Alias;
use crate::expr_rewriter::{
coerce_plan_expr_for_schema, normalize_col,
@@ -232,6 +233,23 @@ impl LogicalPlanBuilder {
Self::scan_with_filters(table_name, table_source, projection, vec![])
}
+ /// Create a [CopyTo] for copying the contents of this builder to the
specified file(s)
+ pub fn copy_to(
+ input: LogicalPlan,
+ output_url: String,
+ file_format: OutputFileFormat,
+ per_thread_output: bool,
+ options: Vec<(String, String)>,
+ ) -> Result<Self> {
+ Ok(Self::from(LogicalPlan::Copy(CopyTo {
+ input: Arc::new(input),
+ output_url,
+ file_format,
+ per_thread_output,
+ options,
+ })))
+ }
+
/// Create a [DmlStatement] for inserting the contents of this builder
into the named table
pub fn insert_into(
input: LogicalPlan,
diff --git a/datafusion/expr/src/logical_plan/dml.rs
b/datafusion/expr/src/logical_plan/dml.rs
index 07f34101eb..ecdea7dcc6 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -17,13 +17,71 @@
use std::{
fmt::{self, Display},
+ str::FromStr,
sync::Arc,
};
-use datafusion_common::{DFSchemaRef, OwnedTableReference};
+use datafusion_common::{DFSchemaRef, DataFusionError, OwnedTableReference};
use crate::LogicalPlan;
+/// Operator that copies the contents of a database to file(s)
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub struct CopyTo {
+ /// The relation that determines the tuples to write to the output file(s)
+ pub input: Arc<LogicalPlan>,
+ /// The location to write the file(s)
+ pub output_url: String,
+ /// The file format to output (explicitly defined or inferred from file
extension)
+ pub file_format: OutputFileFormat,
+ /// If false, it is assumed output_url is a file to which all data should
be written
+ /// regardless of input partitioning. Otherwise, output_url is assumed to
be a directory
+ /// to which each output partition is written to its own output file
+ pub per_thread_output: bool,
+ /// Arbitrary options as tuples
+ pub options: Vec<(String, String)>,
+}
+
+/// The file formats that CopyTo can output
+#[derive(Clone, PartialEq, Eq, Hash)]
+pub enum OutputFileFormat {
+ CSV,
+ JSON,
+ PARQUET,
+ AVRO,
+ ARROW,
+}
+
+impl FromStr for OutputFileFormat {
+ type Err = DataFusionError;
+
+ fn from_str(s: &str) -> Result<Self, DataFusionError> {
+ match s {
+ "csv" => Ok(OutputFileFormat::CSV),
+ "json" => Ok(OutputFileFormat::JSON),
+ "parquet" => Ok(OutputFileFormat::PARQUET),
+ "avro" => Ok(OutputFileFormat::AVRO),
+ "arrow" => Ok(OutputFileFormat::ARROW),
+ _ => Err(DataFusionError::NotImplemented(format!(
+ "Unknown or not supported file format {s}!"
+ ))),
+ }
+ }
+}
+
+impl Display for OutputFileFormat {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let out = match self {
+ OutputFileFormat::CSV => "csv",
+ OutputFileFormat::JSON => "json",
+ OutputFileFormat::PARQUET => "parquet",
+ OutputFileFormat::AVRO => "avro",
+ OutputFileFormat::ARROW => "arrow",
+ };
+ write!(f, "{}", out)
+ }
+}
+
/// The operator that modifies the content of a database (adapted from
/// substrait WriteRel)
#[derive(Clone, PartialEq, Eq, Hash)]
diff --git a/datafusion/expr/src/logical_plan/mod.rs
b/datafusion/expr/src/logical_plan/mod.rs
index 01862c3d54..8316417138 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -18,7 +18,7 @@
pub mod builder;
mod ddl;
pub mod display;
-mod dml;
+pub mod dml;
mod extension;
mod plan;
mod statement;
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 3557745ed3..1ee4fb810d 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -31,6 +31,7 @@ use crate::{
build_join_schema, Expr, ExprSchemable, TableProviderFilterPushDown,
TableSource,
};
+use super::dml::CopyTo;
use super::DdlStatement;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -120,6 +121,8 @@ pub enum LogicalPlan {
Dml(DmlStatement),
/// CREATE / DROP TABLES / VIEWS / SCHEMAs
Ddl(DdlStatement),
+ /// COPY TO
+ Copy(CopyTo),
/// Describe the schema of table
DescribeTable(DescribeTable),
/// Unnest a column that contains a nested list type.
@@ -157,6 +160,7 @@ impl LogicalPlan {
dummy_schema
}
LogicalPlan::Dml(DmlStatement { table_schema, .. }) =>
table_schema,
+ LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
LogicalPlan::Ddl(ddl) => ddl.schema(),
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
}
@@ -203,6 +207,7 @@ impl LogicalPlan {
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Dml(_)
+ | LogicalPlan::Copy(_)
| LogicalPlan::Values(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Union(_)
@@ -343,6 +348,7 @@ impl LogicalPlan {
| LogicalPlan::Distinct(_)
| LogicalPlan::Dml(_)
| LogicalPlan::Ddl(_)
+ | LogicalPlan::Copy(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Prepare(_) => Ok(()),
}
@@ -371,6 +377,7 @@ impl LogicalPlan {
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
LogicalPlan::Dml(write) => vec![&write.input],
+ LogicalPlan::Copy(copy) => vec![©.input],
LogicalPlan::Ddl(ddl) => ddl.inputs(),
LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
LogicalPlan::Prepare(Prepare { input, .. }) => vec![input],
@@ -477,6 +484,7 @@ impl LogicalPlan {
| LogicalPlan::Analyze(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Dml(_)
+ | LogicalPlan::Copy(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Unnest(_) => Ok(None),
@@ -640,6 +648,7 @@ impl LogicalPlan {
| LogicalPlan::Explain(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Dml(_)
+ | LogicalPlan::Copy(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Statement(_)
@@ -1083,6 +1092,24 @@ impl LogicalPlan {
LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
write!(f, "Dml: op=[{op}] table=[{table_name}]")
}
+ LogicalPlan::Copy(CopyTo {
+ input: _,
+ output_url,
+ file_format,
+ per_thread_output,
+ options,
+ }) => {
+ let mut op_str = String::new();
+ op_str.push('(');
+ for (key, val) in options {
+ if !op_str.is_empty() {
+ op_str.push(',');
+ }
+ op_str.push_str(&format!("{key} {val}"));
+ }
+ op_str.push(')');
+ write!(f, "CopyTo: format={file_format}
output_url={output_url} per_thread_output={per_thread_output} options:
{op_str}")
+ }
LogicalPlan::Ddl(ddl) => {
write!(f, "{}", ddl.display())
}
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index bffcd0669c..90307f1491 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -17,6 +17,7 @@
//! Expression utilities
+use crate::dml::CopyTo;
use crate::expr::{Alias, Sort, WindowFunction};
use crate::logical_plan::builder::build_join_schema;
use crate::logical_plan::{
@@ -745,6 +746,19 @@ pub fn from_plan(
op: op.clone(),
input: Arc::new(inputs[0].clone()),
})),
+ LogicalPlan::Copy(CopyTo {
+ input: _,
+ output_url,
+ file_format,
+ per_thread_output,
+ options,
+ }) => Ok(LogicalPlan::Copy(CopyTo {
+ input: Arc::new(inputs[0].clone()),
+ output_url: output_url.clone(),
+ file_format: file_format.clone(),
+ per_thread_output: *per_thread_output,
+ options: options.clone(),
+ })),
LogicalPlan::Values(Values { schema, .. }) =>
Ok(LogicalPlan::Values(Values {
schema: schema.clone(),
values: expr
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 74c4b1d36f..08b28567fb 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -368,6 +368,7 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::Distinct(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Dml(_)
+ | LogicalPlan::Copy(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::Prepare(_) => {
// apply the optimization to all inputs of the plan
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index d00e5e2f59..511b288442 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -1429,6 +1429,9 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlan::Dml(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Dml",
)),
+ LogicalPlan::Copy(_) => Err(proto_error(
+ "LogicalPlan serde is not yet implemented for Copy",
+ )),
LogicalPlan::DescribeTable(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DescribeTable",
)),
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 16036defda..0f5dbb9ec0 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -16,8 +16,8 @@
// under the License.
use crate::parser::{
- CopyToStatement, CreateExternalTable, DFParser, DescribeTableStmt,
LexOrdering,
- Statement as DFStatement,
+ CopyToSource, CopyToStatement, CreateExternalTable, DFParser,
DescribeTableStmt,
+ LexOrdering, Statement as DFStatement,
};
use crate::planner::{
object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
@@ -31,6 +31,7 @@ use datafusion_common::{
DataFusionError, ExprSchema, OwnedTableReference, Result, SchemaReference,
TableReference, ToDFSchema,
};
+use datafusion_expr::dml::{CopyTo, OutputFileFormat};
use datafusion_expr::expr::Placeholder;
use
datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
use datafusion_expr::logical_plan::builder::project;
@@ -55,6 +56,8 @@ use sqlparser::parser::ParserError::ParserError;
use datafusion_common::plan_err;
use std::collections::{BTreeMap, HashMap, HashSet};
+use std::path::Path;
+use std::str::FromStr;
use std::sync::Arc;
fn ident_to_string(ident: &Ident) -> String {
@@ -377,7 +380,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let _ = into; // optional keyword doesn't change behavior
self.insert_to_plan(table_name, columns, source, overwrite)
}
-
Statement::Update {
table,
assignments,
@@ -547,11 +549,71 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}))
}
- fn copy_to_plan(&self, _statement: CopyToStatement) -> Result<LogicalPlan>
{
- // TODO: implement as part of
https://github.com/apache/arrow-datafusion/issues/5654
- Err(DataFusionError::NotImplemented(
- "`COPY .. TO ..` statement is not yet supported".to_string(),
- ))
+ fn copy_to_plan(&self, statement: CopyToStatement) -> Result<LogicalPlan> {
+ // determine if source is table or query and handle accordingly
+ let copy_source = statement.source;
+ let input = match copy_source {
+ CopyToSource::Relation(object_name) => {
+ let table_ref =
+ self.object_name_to_table_reference(object_name.clone())?;
+ let table_source =
self.schema_provider.get_table_provider(table_ref)?;
+ LogicalPlanBuilder::scan(
+ object_name_to_string(&object_name),
+ table_source,
+ None,
+ )?
+ .build()?
+ }
+ CopyToSource::Query(query) => {
+ self.query_to_plan(query, &mut PlannerContext::new())?
+ }
+ };
+
+ // convert options to lowercase strings, check for explicitly set
"format" option
+ let mut options = vec![];
+ let mut explicit_format = None;
+ // default behavior is to assume the user is specifying a single file
to which
+ // we should output all data regardless of input partitioning.
+ let mut per_thread_output: bool = false;
+ for (key, value) in statement.options {
+ let (k, v) = (key.to_lowercase(),
value.to_string().to_lowercase());
+ // check for options important to planning
+ if k == "format" {
+ explicit_format = Some(OutputFileFormat::from_str(&v)?);
+ }
+ if k == "per_thread_output" {
+ per_thread_output = match v.as_str(){
+ "true" => true,
+ "false" => false,
+ _ => return Err(DataFusionError::Plan(
+ format!("Copy to option 'per_thread_output' must be
true or false, got {value}")
+ ))
+ }
+ }
+ options.push((k, v));
+ }
+ let format = match explicit_format {
+ Some(file_format) => file_format,
+ None => {
+ // try to infer file format from file extension
+ let extension: &str = &Path::new(&statement.target)
+ .extension()
+ .ok_or(
+ DataFusionError::Plan("Copy To format not explicitly
set and unable to get file extension!".to_string()))?
+ .to_str()
+ .ok_or(DataFusionError::Plan("Copy to format not
explicitly set and failed to parse file extension!".to_string()))?
+ .to_lowercase();
+
+ OutputFileFormat::from_str(extension)?
+ }
+ };
+ Ok(LogicalPlan::Copy(CopyTo {
+ input: Arc::new(input),
+ output_url: statement.target,
+ file_format: format,
+ per_thread_output,
+ options,
+ }))
}
fn build_order_by(
diff --git a/datafusion/sql/tests/sql_integration.rs
b/datafusion/sql/tests/sql_integration.rs
index accb6ec9ce..53ffccbdb4 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -325,6 +325,30 @@ fn plan_rollback_transaction_chained() {
quick_test(sql, plan);
}
+#[test]
+fn plan_copy_to() {
+ let sql = "COPY test_decimal to 'output.csv'";
+ let plan = r#"
+CopyTo: format=csv output_url=output.csv per_thread_output=false options: ()
+ TableScan: test_decimal
+ "#
+ .trim();
+ quick_test(sql, plan);
+}
+
+#[test]
+fn plan_copy_to_query() {
+ let sql = "COPY (select * from test_decimal limit 10) to 'output.csv'";
+ let plan = r#"
+CopyTo: format=csv output_url=output.csv per_thread_output=false options: ()
+ Limit: skip=0, fetch=10
+ Projection: test_decimal.id, test_decimal.price
+ TableScan: test_decimal
+ "#
+ .trim();
+ quick_test(sql, plan);
+}
+
#[test]
fn plan_insert() {
let sql =
diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs
b/datafusion/sqllogictest/bin/sqllogictests.rs
index f28fdbe23c..d097d97fb7 100644
--- a/datafusion/sqllogictest/bin/sqllogictests.rs
+++ b/datafusion/sqllogictest/bin/sqllogictests.rs
@@ -16,6 +16,7 @@
// under the License.
use std::ffi::OsStr;
+use std::fs;
use std::path::{Path, PathBuf};
#[cfg(target_family = "windows")]
use std::thread;
@@ -54,10 +55,26 @@ pub async fn main() -> Result<()> {
run_tests().await
}
+/// Sets up an empty directory at test_files/scratch
+/// creating it if needed and clearing any file contents if it exists
+/// This allows tests for inserting to external tables or copy to
+/// to persist data to disk and have consistent state when running
+/// a new test
+fn setup_scratch_dir() -> Result<()> {
+ let path = std::path::Path::new("test_files/scratch");
+ if path.exists() {
+ fs::remove_dir_all(path)?;
+ }
+ fs::create_dir(path)?;
+ Ok(())
+}
+
async fn run_tests() -> Result<()> {
// Enable logging (e.g. set RUST_LOG=debug to see debug logs)
env_logger::init();
+ setup_scratch_dir()?;
+
let options = Options::new();
// Run all tests in parallel, reporting failures at the end
diff --git a/datafusion/sqllogictest/test_files/copy.slt
b/datafusion/sqllogictest/test_files/copy.slt
index e7bde89d29..364459fa2d 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -16,29 +16,141 @@
# under the License.
# tests for copy command
-
statement ok
create table source_table(col1 integer, col2 varchar) as values (1, 'Foo'),
(2, 'Bar');
-# Copy from table
-statement error DataFusion error: This feature is not implemented: `COPY \.\.
TO \.\.` statement is not yet supported
-COPY source_table to '/tmp/table.parquet';
+# Copy to directory as multiple files
+query IT
+COPY source_table TO 'test_files/scratch/table' (format parquet,
per_thread_output true);
+----
+2
+
+#Explain copy queries not currently working
+query error DataFusion error: This feature is not implemented: Unsupported SQL
statement: Some\("COPY source_table TO 'test_files/scratch/table'"\)
+EXPLAIN COPY source_table to 'test_files/scratch/table'
+
+query error DataFusion error: SQL error: ParserError\("Expected end of
statement, found: source_table"\)
+EXPLAIN COPY source_table to 'test_files/scratch/table' (format parquet,
per_thread_output true)
+
+# Copy more files to directory via query
+query IT
+COPY (select * from source_table UNION ALL select * from source_table) to
'test_files/scratch/table' (format parquet, per_thread_output true);
+----
+4
+
+# validate multiple parquet file output
+statement ok
+CREATE EXTERNAL TABLE validate_parquet STORED AS PARQUET LOCATION
'test_files/scratch/table/';
+
+query IT
+select * from validate_parquet;
+----
+1 Foo
+2 Bar
+1 Foo
+2 Bar
+1 Foo
+2 Bar
+
+# Copy from table to single file
+query IT
+COPY source_table to 'test_files/scratch/table.parquet';
+----
+2
+
+# validate single parquet file output
+statement ok
+CREATE EXTERNAL TABLE validate_parquet_single STORED AS PARQUET LOCATION
'test_files/scratch/table.parquet';
+
+query IT
+select * from validate_parquet_single;
+----
+1 Foo
+2 Bar
+
+# copy from table to folder of csv files
+query IT
+COPY source_table to 'test_files/scratch/table_csv' (format csv,
per_thread_output true);
+----
+2
+
+# validate folder of csv files
+statement ok
+CREATE EXTERNAL TABLE validate_csv STORED AS csv WITH HEADER ROW LOCATION
'test_files/scratch/table_csv';
+
+query IT
+select * from validate_csv;
+----
+1 Foo
+2 Bar
+
+# Copy from table to single csv
+query IT
+COPY source_table to 'test_files/scratch/table.csv';
+----
+2
+
+# Validate single csv output
+statement ok
+CREATE EXTERNAL TABLE validate_single_csv STORED AS csv WITH HEADER ROW
LOCATION 'test_files/scratch/table.csv';
+
+query IT
+select * from validate_single_csv;
+----
+1 Foo
+2 Bar
+
+# Copy from table to folder of json
+query IT
+COPY source_table to 'test_files/scratch/table_json' (format json,
per_thread_output true);
+----
+2
+
+# Validate json output
+statement ok
+CREATE EXTERNAL TABLE validate_json STORED AS json LOCATION
'test_files/scratch/table_json';
+
+query IT
+select * from validate_json;
+----
+1 Foo
+2 Bar
+
+# Copy from table to single json file
+query IT
+COPY source_table to 'test_files/scratch/table.json';
+----
+2
+
+# Validate single JSON file`
+statement ok
+CREATE EXTERNAL TABLE validate_single_json STORED AS json LOCATION
'test_files/scratch/table_json';
+
+query IT
+select * from validate_single_json;
+----
+1 Foo
+2 Bar
# Copy from table with options
-statement error DataFusion error: This feature is not implemented: `COPY \.\.
TO \.\.` statement is not yet supported
-COPY source_table to '/tmp/table.parquet' (row_group_size 55);
+query IT
+COPY source_table to 'test_files/scratch/table.json' (row_group_size 55);
+----
+2
# Copy from table with options (and trailing comma)
-statement error DataFusion error: This feature is not implemented: `COPY \.\.
TO \.\.` statement is not yet supported
-COPY source_table to '/tmp/table.parquet' (row_group_size 55,
row_group_limit_bytes 9,);
+query IT
+COPY source_table to 'test_files/scratch/table.json' (row_group_size 55,
row_group_limit_bytes 9,);
+----
+2
# Error cases:
# Incomplete statement
-statement error DataFusion error: SQL error: ParserError\("Expected \), found:
EOF"\)
+query error DataFusion error: SQL error: ParserError\("Expected \), found:
EOF"\)
COPY (select col2, sum(col1) from source_table
# Copy from table with non literal
-statement error DataFusion error: SQL error: ParserError\("Expected ',' or
'\)' after option definition, found: \+"\)
+query error DataFusion error: SQL error: ParserError\("Expected ',' or '\)'
after option definition, found: \+"\)
COPY source_table to '/tmp/table.parquet' (row_group_size 55 + 102);