alamb commented on code in PR #7283: URL: https://github.com/apache/arrow-datafusion/pull/7283#discussion_r1294544795
########## .gitignore: ########## @@ -104,3 +104,6 @@ datafusion/CHANGELOG.md.bak # Generated tpch data datafusion/core/tests/sqllogictests/test_files/tpch/data/* + +# Scratch temp dir for sqllogictests +datafusion/core/tests/sqllogictests/test_files/scratch* Review Comment: this will likely have a logical conflict with https://github.com/apache/arrow-datafusion/pull/7284, FYI ########## datafusion/core/src/physical_planner.rs: ########## @@ -544,6 +556,67 @@ impl DefaultPhysicalPlanner { let unaliased: Vec<Expr> = filters.into_iter().map(unalias).collect(); source.scan(session_state, projection.as_ref(), &unaliased, *fetch).await } + LogicalPlan::Copy(CopyTo{ + input, + output_url, + file_format, + per_thread_output, + options: _, + }) => { + let input_exec = self.create_initial_plan(input, session_state).await?; + + // Get object store for specified output_url + // if user did not pass in a url, we assume it is a local file path + // this requires some special handling as copy can create non + // existing file paths + let is_valid_url = Url::parse(output_url).is_ok(); + + // TODO: make this behavior configurable via options (should copy to create path/file as needed?) + // TODO: add additional configurable options for if existing files should be overwritten or + // appended to + let parsed_url = match is_valid_url { + true => ListingTableUrl::parse(output_url), + false => { + let path = std::path::PathBuf::from(output_url); + if !path.exists(){ + if *per_thread_output{ + fs::create_dir_all(path)?; + } else{ + fs::File::create(path)?; + } + } + ListingTableUrl::parse(output_url) + } + }?; + + let object_store_url = parsed_url.object_store(); + + let schema: Schema = (**input.schema()).clone().into(); + + // Set file sink related options + let config = FileSinkConfig { + object_store_url, + table_paths: vec![parsed_url], + file_groups: vec![], + output_schema: Arc::new(schema), + table_partition_cols: vec![], + writer_mode: FileWriterMode::PutMultipart, + per_thread_output: *per_thread_output, + overwrite: false, + }; + + // TODO: implement statement level overrides for each file type + // E.g. CsvFormat::from_options(options) Review Comment: 👍 ########## datafusion/core/tests/sqllogictests/test_files/copy.slt: ########## @@ -16,29 +16,141 @@ # under the License. # tests for copy command - statement ok create table source_table(col1 integer, col2 varchar) as values (1, 'Foo'), (2, 'Bar'); -# Copy from table -statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported -COPY source_table to '/tmp/table.parquet'; +# Copy to directory as multiple files +query IT +COPY source_table TO 'tests/sqllogictests/test_files/scratch/table' (format parquet, per_thread_output true); +---- +2 + +#Explain copy queries not currently working +query error DataFusion error: This feature is not implemented: Unsupported SQL statement: Some\("COPY source_table TO 'tests/sqllogictests/test_files/scratch/table'"\) +EXPLAIN COPY source_table to 'tests/sqllogictests/test_files/scratch/table' + +query error DataFusion error: SQL error: ParserError\("Expected end of statement, found: source_table"\) +EXPLAIN COPY source_table to 'tests/sqllogictests/test_files/scratch/table' (format parquet, per_thread_output true) + +# Copy more files to directory via query +query IT +COPY (select * from source_table UNION ALL select * from source_table) to 'tests/sqllogictests/test_files/scratch/table' (format parquet, per_thread_output true); +---- +4 + +# validate multiple parquet file output +statement ok +CREATE EXTERNAL TABLE validate_parquet STORED AS PARQUET LOCATION 'tests/sqllogictests/test_files/scratch/table/'; + +query IT +select * from validate_parquet; +---- +1 Foo +2 Bar +1 Foo +2 Bar +1 Foo +2 Bar + +# Copy from table to single file +query IT +COPY source_table to 'tests/sqllogictests/test_files/scratch/table.parquet'; +---- +2 + +# validate single parquet file output +statement ok +CREATE EXTERNAL TABLE validate_parquet_single STORED AS PARQUET LOCATION 'tests/sqllogictests/test_files/scratch/table.parquet'; + +query IT +select * from validate_parquet_single; +---- +1 Foo +2 Bar + +# copy from table to folder of csv files +query IT +COPY source_table to 'tests/sqllogictests/test_files/scratch/table_csv' (format csv, per_thread_output true); +---- +2 + +# validate folder of csv files +statement ok +CREATE EXTERNAL TABLE validate_csv STORED AS csv WITH HEADER ROW LOCATION 'tests/sqllogictests/test_files/scratch/table_csv'; + +query IT +select * from validate_csv; +---- +1 Foo +2 Bar + +# Copy from table to single csv +query IT +COPY source_table to 'tests/sqllogictests/test_files/scratch/table.csv'; +---- +2 + +# Validate single csv output +statement ok +CREATE EXTERNAL TABLE validate_single_csv STORED AS csv WITH HEADER ROW LOCATION 'tests/sqllogictests/test_files/scratch/table.csv'; + +query IT +select * from validate_single_csv; +---- +1 Foo +2 Bar + +# Copy from table to folder of json +query IT +COPY source_table to 'tests/sqllogictests/test_files/scratch/table_json' (format json, per_thread_output true); +---- +2 + +# Validate json output +statement ok +CREATE EXTERNAL TABLE validate_json STORED AS json LOCATION 'tests/sqllogictests/test_files/scratch/table_json'; + +query IT +select * from validate_json; +---- +1 Foo +2 Bar + +# Copy from table to single json file +query IT +COPY source_table to 'tests/sqllogictests/test_files/scratch/table.json'; +---- +2 + +# Validate single JSON file` +statement ok +CREATE EXTERNAL TABLE validate_single_json STORED AS json LOCATION 'tests/sqllogictests/test_files/scratch/table_json'; + +query IT +select * from validate_single_json; +---- +1 Foo +2 Bar # Copy from table with options -statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported -COPY source_table to '/tmp/table.parquet' (row_group_size 55); +query IT +COPY source_table to 'tests/sqllogictests/test_files/scratch/table.json' (row_group_size 55); Review Comment: row_group_size for json is somewhat surprising to me as I expect it is a parquet thing ########## datafusion/expr/src/logical_plan/dml.rs: ########## @@ -17,13 +17,71 @@ use std::{ fmt::{self, Display}, + str::FromStr, sync::Arc, }; -use datafusion_common::{DFSchemaRef, OwnedTableReference}; +use datafusion_common::{DFSchemaRef, DataFusionError, OwnedTableReference}; use crate::LogicalPlan; +/// Operator that copies the contents of a database to file(s) +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct CopyTo { + /// The relation that determines the tuples to write to the output file(s) + pub input: Arc<LogicalPlan>, + /// The location to write the file(s) + pub output_url: String, + /// The file format to output (explicitly defined or inferred from file extension) + pub file_format: OutputFileFormat, + /// If false, it is assumed output_url is a file to which all data should be written + /// regardless of input partitioning. Otherwise, output_url is assumed to be a directory + /// to which each output partition is written to its own output file + pub per_thread_output: bool, + /// Arbitrary options as tuples + pub options: Vec<(String, String)>, +} + +/// The file formats that CopyTo can output Review Comment: This looks very similar to the existing `FileType` enum: https://docs.rs/datafusion/latest/datafusion/datasource/file_format/file_type/enum.FileType.html Perhaps we could move `FileType` into `datafusion_common` so it could be used by both the logical plan and datasource? ########## datafusion/expr/src/logical_plan/builder.rs: ########## @@ -232,6 +233,23 @@ impl LogicalPlanBuilder { Self::scan_with_filters(table_name, table_source, projection, vec![]) } + /// Create a [CopyTo] for copying the contents of this builder to the specified file(s) + pub fn copy_to( + input: LogicalPlan, + output_url: String, + file_format: OutputFileFormat, + per_thread_output: bool, Review Comment: Can we add some more documentation about what each of the parameters mean (specifically `per_thread_output`) Also, do you envision other options here (like `overwrite` vs `append`)? If so maybe it makes sense to make a struct lik ```rust CopyOptions { output_url: String, file_format: OutputFileFormat, per_thread_output: bool, other: Vec<(String, String) } ``` Making a config struct like that would not only allow additional options to be easily added without an API change, it would also provide a natural location to document the options and what they meant ########## datafusion/core/tests/sqllogictests/test_files/copy.slt: ########## @@ -16,29 +16,141 @@ # under the License. # tests for copy command - statement ok create table source_table(col1 integer, col2 varchar) as values (1, 'Foo'), (2, 'Bar'); -# Copy from table -statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported -COPY source_table to '/tmp/table.parquet'; +# Copy to directory as multiple files +query IT +COPY source_table TO 'tests/sqllogictests/test_files/scratch/table' (format parquet, per_thread_output true); +---- +2 + +#Explain copy queries not currently working Review Comment: I think a special case for explain copy is probably the right thing ########## datafusion/expr/src/logical_plan/plan.rs: ########## @@ -1083,6 +1092,24 @@ impl LogicalPlan { LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => { write!(f, "Dml: op=[{op}] table=[{table_name}]") } + LogicalPlan::Copy(CopyTo { + input: _, + output_url, + file_format, + per_thread_output, + options, + }) => { + let mut op_str = String::new(); + op_str.push('('); + for (key, val) in options { + if !op_str.is_empty() { + op_str.push(','); + } + op_str.push_str(&format!("{key} {val}")); + } + op_str.push(')'); Review Comment: I think you could use `join()` here to be more conscise: https://doc.rust-lang.org/std/primitive.slice.html#method.join But then you would probably have to make the (key val) pairs in a separate Vec resulting in another copy ########## datafusion/core/src/physical_planner.rs: ########## @@ -544,6 +556,67 @@ impl DefaultPhysicalPlanner { let unaliased: Vec<Expr> = filters.into_iter().map(unalias).collect(); source.scan(session_state, projection.as_ref(), &unaliased, *fetch).await } + LogicalPlan::Copy(CopyTo{ + input, + output_url, + file_format, + per_thread_output, + options: _, + }) => { + let input_exec = self.create_initial_plan(input, session_state).await?; + + // Get object store for specified output_url + // if user did not pass in a url, we assume it is a local file path + // this requires some special handling as copy can create non + // existing file paths + let is_valid_url = Url::parse(output_url).is_ok(); + + // TODO: make this behavior configurable via options (should copy to create path/file as needed?) + // TODO: add additional configurable options for if existing files should be overwritten or + // appended to + let parsed_url = match is_valid_url { + true => ListingTableUrl::parse(output_url), + false => { + let path = std::path::PathBuf::from(output_url); + if !path.exists(){ + if *per_thread_output{ + fs::create_dir_all(path)?; + } else{ + fs::File::create(path)?; + } + } + ListingTableUrl::parse(output_url) + } + }?; + + let object_store_url = parsed_url.object_store(); + + let schema: Schema = (**input.schema()).clone().into(); + + // Set file sink related options + let config = FileSinkConfig { + object_store_url, + table_paths: vec![parsed_url], + file_groups: vec![], + output_schema: Arc::new(schema), + table_partition_cols: vec![], + writer_mode: FileWriterMode::PutMultipart, + per_thread_output: *per_thread_output, + overwrite: false, + }; + + // TODO: implement statement level overrides for each file type + // E.g. CsvFormat::from_options(options) Review Comment: 👍 ########## datafusion/core/tests/sqllogictests/src/main.rs: ########## @@ -58,10 +59,26 @@ pub async fn main() -> Result<()> { run_tests().await } +/// Sets up an empty directory at tests/sqllogictests/test_files/scratch/ Review Comment: ❤️ -- we should also probably add a note about `scratch` to https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/sqllogictests/README.md eventually too ########## datafusion/core/tests/sqllogictests/src/main.rs: ########## @@ -58,10 +59,26 @@ pub async fn main() -> Result<()> { run_tests().await } +/// Sets up an empty directory at tests/sqllogictests/test_files/scratch/ Review Comment: ❤️ -- we should also probably add a note about `scratch` to https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/sqllogictests/README.md eventually too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
