tustvold commented on code in PR #1798: URL: https://github.com/apache/arrow-rs/pull/1798#discussion_r892025966
########## parquet/src/bin/parquet-fromcsv.rs: ########## @@ -0,0 +1,575 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to converts csv to Parquet file +//! +//! # Install +//! +//! `parquet-fromcsv` can be installed using `cargo`: +//! +//! ```text +//! cargo install parquet --features=cli +//! ``` +//! +//! After this `parquet-fromcsv` shoud be available: +//! +//! ```text +//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet +//! ``` +//! +//! The binary can also be build form the source code and run as follows: Review Comment: ```suggestion //! The binary can also be built from the source code and run as follows: ``` ########## parquet/src/bin/parquet-fromcsv.rs: ########## @@ -0,0 +1,575 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to converts csv to Parquet file Review Comment: These docs are :+1: ########## parquet/src/bin/parquet-fromcsv.rs: ########## @@ -0,0 +1,575 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to converts csv to Parquet file +//! +//! # Install +//! +//! `parquet-fromcsv` can be installed using `cargo`: +//! +//! ```text +//! cargo install parquet --features=cli +//! ``` +//! +//! After this `parquet-fromcsv` shoud be available: +//! +//! ```text +//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet +//! ``` +//! +//! The binary can also be build form the source code and run as follows: +//! +//! ```text +//! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \ +//! \ input.csv output.parquet +//! ``` +//! +//! # Options +//! +//! ```text Review Comment: Is there some way to auto-generate this? Just wondering if we might struggle to keep this comment up to date? It is already at least partially out of date as parquet is v15 now... ########## parquet/Cargo.toml: ########## @@ -49,6 +49,7 @@ serde_json = { version = "1.0", features = ["preserve_order"], optional = true } rand = "0.8" futures = { version = "0.3", optional = true } tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "fs", "rt", "io-util"] } +anyhow = { version = "1.0.57", optional = true } Review Comment: I'm in two minds about this, whilst it is an optional dependency, and I personally really like its error handling approach, I'm not sure how we feel about bringing this in... Perhaps @alamb has some thoughts... ########## parquet/src/bin/parquet-fromcsv.rs: ########## @@ -0,0 +1,575 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to converts csv to Parquet file +//! +//! # Install +//! +//! `parquet-fromcsv` can be installed using `cargo`: +//! +//! ```text +//! cargo install parquet --features=cli +//! ``` +//! +//! After this `parquet-fromcsv` shoud be available: +//! +//! ```text +//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet +//! ``` +//! +//! The binary can also be build form the source code and run as follows: +//! +//! ```text +//! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \ +//! \ input.csv output.parquet +//! ``` +//! +//! # Options +//! +//! ```text +//! parquet 14.0.0 +//! Apache Arrow <[email protected]> +//! Binary file to converts csv to Parquet file +//! +//! USAGE: +//! parquet-fromcsv [OPTIONS] --schema <SCHEMA> --input-file <INPUT_FILE> --output-file <OUTPUT_FILE> +//! +//! OPTIONS: +//! -b, --batch-size <BATCH_SIZE> +//! batch size +//! +//! [env: PARQUET_FROM_CSV_BATCHSIZE=] +//! [default: 1000] +//! +//! -c, --parquet-compression <PARQUET_COMPRESSION> +//! compression mode +//! +//! [default: SNAPPY] +//! +//! -d, --delimiter <DELIMITER> +//! field delimiter +//! +//! default value: when input_format==CSV: ',' when input_format==TSV: 'TAB' +//! +//! -D, --double-quote <DOUBLE_QUOTE> +//! double quote +//! +//! -e, --escape-char <ESCAPE_CHAR> +//! escape charactor +//! +//! -f, --input-format <INPUT_FORMAT> +//! input file format +//! +//! [default: csv] +//! [possible values: csv, tsv] +//! +//! -h, --has-header +//! has header +//! +//! --help +//! Print help information +//! +//! -i, --input-file <INPUT_FILE> +//! input CSV file +//! +//! -o, --output-file <OUTPUT_FILE> +//! output Parquet file +//! +//! -q, --quote-char <QUOTE_CHAR> +//! quate charactor +//! +//! -r, --record-terminator <RECORD_TERMINATOR> +//! record terminator +//! +//! [possible values: lf, crlf, cr] +//! +//! -s, --schema <SCHEMA> +//! message schema for output Parquet +//! +//! -V, --version +//! Print version information +//! ``` +//! +//! ## Parquet file options +//! +//! - `-b`, `--batch-size` : Batch size for Parquet +//! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY +//! - `-s`, `--schema` : path to message schema for generated Parquet file +//! - `-o`, `--output-file` : path to output parquet file +//! +//! ## Input file options +//! +//! - `-i`, `--input-file` : path to input CSV file +//! - `-f`, `--input-format` : dialect for input file, `csv` or `tsv`. +//! - `-d`, `--delimiter : Field delimitor for CSV file, default depends `--input-format` +//! - `-e`, `--escape` : Escape charactor for input file +//! - `-h`, `--has-header` : input has header +//! - `-r`, `--record-terminator` : record terminator charactor for input. default is CRLF +//! - `-q`, `--quote-char` : input quoting charactor +//! + +use std::{ + fs::{read_to_string, File}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::{bail, Context, Error, Result}; +use arrow::{csv::ReaderBuilder, datatypes::Schema}; +use clap::{ArgEnum, Parser}; +use parquet::{ + arrow::{parquet_to_arrow_schema, ArrowWriter}, + basic::Compression, + file::properties::WriterProperties, + schema::{parser::parse_message_type, types::SchemaDescriptor}, +}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary file to convert csv to Parquet file"), long_about=None)] +struct Args { + /// Parquet schema file path + #[clap(short, long, help("message schema for output Parquet"))] + schema: PathBuf, + /// input CSV file path + #[clap(short, long, help("input CSV file"))] + input_file: PathBuf, + /// output Parquet file path + #[clap(short, long, help("output Parquet file"))] + output_file: PathBuf, + /// input file format + #[clap( + arg_enum, + short('f'), + long, + help("input file format"), + default_value_t=CsvDialect::CSV + )] + input_format: CsvDialect, + /// batch size + #[clap( + short, + long, + help("batch size"), + default_value_t = 1000, + env = "PARQUET_FROM_CSV_BATCHSIZE" + )] + batch_size: usize, + /// has header line + #[clap(short, long, help("has header"))] + has_header: bool, + /// field delimiter + /// + /// default value: + /// when input_format==CSV: ',' + /// when input_format==TSV: 'TAB' + #[clap(short, long, help("field delimiter"))] + delimiter: Option<char>, + #[clap(arg_enum, short, long, help("record terminator"))] + record_terminator: Option<RecordTerminator>, + #[clap(short, long, help("escape charactor"))] + escape_char: Option<char>, + #[clap(short, long, help("quate charactor"))] + quote_char: Option<char>, + #[clap(short('D'), long, help("double quote"))] + double_quote: Option<bool>, + #[clap(short('c'), long, help("compression mode"), default_value_t=Compression::SNAPPY)] + #[clap(parse(try_from_str =compression_from_str))] + parquet_compression: Compression, +} + +fn compression_from_str(cmp: &str) -> Result<Compression, Error> { + match cmp.to_uppercase().as_str() { + "UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED), + "SNAPPY" => Ok(Compression::SNAPPY), + "GZIP" => Ok(Compression::GZIP), + "LZO" => Ok(Compression::LZO), + "BROTLI" => Ok(Compression::BROTLI), + "LZ4" => Ok(Compression::LZ4), + "ZSTD" => Ok(Compression::ZSTD), + v => bail!("Unknown compression {0} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD ", v), + } +} + +impl Args { + fn schema_path(&self) -> &Path { + self.schema.as_path() + } + fn get_delimiter(&self) -> u8 { + match self.delimiter { + Some(ch) => ch as u8, + None => match self.input_format { + CsvDialect::CSV => b',', + CsvDialect::TSV => b'\t', + }, + } + } + fn get_terminator(&self) -> Option<u8> { + match self.record_terminator { + Some(RecordTerminator::LF) => Some(0x0a), + Some(RecordTerminator::CR) => Some(0x0d), + Some(RecordTerminator::CRLF) => None, + None => match self.input_format { + CsvDialect::CSV => None, + CsvDialect::TSV => Some(0x0a), + }, + } + } + fn get_escape(&self) -> Option<u8> { + self.escape_char.map(|ch| ch as u8) + } + fn get_quote(&self) -> Option<u8> { + if self.quote_char.is_none() { + match self.input_format { + CsvDialect::CSV => Some(b'\"'), + CsvDialect::TSV => None, + } + } else { + self.quote_char.map(|c| c as u8) + } + } + fn get_quoting(&self) -> Option<bool> { + if let Some(_qch) = self.quote_char { + Some(true) + } else { + match self.input_format { + CsvDialect::CSV => None, + CsvDialect::TSV => Some(false), + } + } + } +} + +#[derive(Debug, Clone, Copy, ArgEnum, PartialEq)] +enum CsvDialect { + CSV, + TSV, +} + +#[derive(Debug, Clone, Copy, ArgEnum, PartialEq)] +enum RecordTerminator { + LF, + CRLF, + CR, +} + +fn configure_writer_properties(compression: Compression) -> WriterProperties { + let properties_builder = WriterProperties::builder().set_compression(compression); + properties_builder.build() +} + +fn configure_reader_builder(args: &Args, arrow_schema: Arc<Schema>) -> ReaderBuilder { + fn configure_reader<T, F: Fn(ReaderBuilder, T) -> ReaderBuilder>( + builder: ReaderBuilder, + value: Option<T>, + fun: F, + ) -> ReaderBuilder { + if let Some(val) = value { + fun(builder, val) + } else { + builder + } + } + + let mut builder = ReaderBuilder::new() + .with_schema(arrow_schema) + .with_batch_size(args.batch_size) + .has_header(args.has_header) + .with_delimiter(args.get_delimiter()); + + builder = configure_reader( + builder, + args.get_terminator(), + ReaderBuilder::with_terminator, + ); + builder = configure_reader(builder, args.get_escape(), ReaderBuilder::with_escape); + builder = configure_reader(builder, args.get_quote(), ReaderBuilder::with_quote); + + builder +} + +fn arrow_schema_from_string(schema: &str) -> Result<Arc<Schema>> { + let schema = Arc::new(parse_message_type(&schema)?); + let desc = SchemaDescriptor::new(schema); + let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?); + Ok(arrow_schema) +} + +fn convert_csv_to_parquet(args: &Args) -> Result<()> { + let schema = read_to_string(args.schema_path()).with_context(|| { + format!("Failed to open schema file {:#?}", args.schema_path()) + })?; + let arrow_schema = arrow_schema_from_string(&schema)?; + + // create output parquet writer + let parquet_file = File::create(&args.output_file).context(format!( + "Failed to create output file {:#?}", + &args.output_file + ))?; + + let writer_properties = Some(configure_writer_properties(args.parquet_compression)); + let mut arrow_writer = + ArrowWriter::try_new(parquet_file, arrow_schema.clone(), writer_properties) + .context("Failed to create ArrowWriter")?; + + // open input file + let input_file = File::open(&args.input_file) + .with_context(|| format!("Failed to open input file {:#?}", &args.input_file))?; + // create input csv reader + let builder = configure_reader_builder(&args, arrow_schema); + let reader = builder.build(input_file)?; + for batch_result in reader { + let batch = batch_result.context("Failed to read RecordBatch from CSV")?; + arrow_writer + .write(&batch) + .context("Failed to write RecordBatch to parquet")?; + } + arrow_writer.close().context("Failed to close parquet")?; + Ok(()) +} + +fn main() -> Result<()> { + let args = Args::parse(); + convert_csv_to_parquet(&args) +} + +#[cfg(test)] +mod tests { + use std::{ + io::{Seek, SeekFrom, Write}, + path::{Path, PathBuf}, + }; + + use super::*; + use anyhow::Result; + use arrow::datatypes::{DataType, Field}; + use clap::Parser; + use tempfile::NamedTempFile; + + fn parse_args(mut extra_args: Vec<&str>) -> Result<Args> { + let mut args = vec![ + "test", + "--schema", + "test.schema", + "--input-file", + "infile.csv", + "--output-file", + "out.parquet", + ]; + args.append(&mut extra_args); + let args = Args::try_parse_from(args.iter())?; + Ok(args) + } + + #[test] + fn test_parse_arg_minimum() -> Result<()> { + let args = parse_args(vec![])?; + + assert_eq!(args.schema, PathBuf::from(Path::new("test.schema"))); + assert_eq!(args.input_file, PathBuf::from(Path::new("infile.csv"))); + assert_eq!(args.output_file, PathBuf::from(Path::new("out.parquet"))); + // test default values + assert_eq!(args.input_format, CsvDialect::CSV); + assert_eq!(args.batch_size, 1000); + assert_eq!(args.has_header, false); + assert_eq!(args.delimiter, None); + assert_eq!(args.get_delimiter(), b','); + assert_eq!(args.record_terminator, None); + assert_eq!(args.get_terminator(), None); // CRLF + assert_eq!(args.quote_char, None); + assert_eq!(args.get_quote(), Some(b'\"')); + assert_eq!(args.double_quote, None); + assert_eq!(args.parquet_compression, Compression::SNAPPY); + Ok(()) + } + + #[test] + fn test_parse_arg_format_variants() -> Result<()> { + let args = parse_args(vec!["--input-format", "csv"])?; + assert_eq!(args.input_format, CsvDialect::CSV); + assert_eq!(args.get_delimiter(), b','); + assert_eq!(args.get_terminator(), None); // CRLF + assert_eq!(args.get_quote(), Some(b'\"')); + assert_eq!(args.get_quoting(), None); // quoting default: true + assert_eq!(args.get_escape(), None); + let args = parse_args(vec!["--input-format", "tsv"])?; + assert_eq!(args.input_format, CsvDialect::TSV); + assert_eq!(args.get_delimiter(), b'\t'); + assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF + assert_eq!(args.get_quote(), None); // quote none + assert_eq!(args.get_quoting(), Some(false)); + assert_eq!(args.get_escape(), None); + + let args = parse_args(vec!["--input-format", "csv", "--escape-char", "\\"])?; + assert_eq!(args.input_format, CsvDialect::CSV); + assert_eq!(args.get_delimiter(), b','); + assert_eq!(args.get_terminator(), None); // CRLF + assert_eq!(args.get_quote(), Some(b'\"')); + assert_eq!(args.get_quoting(), None); // quoting default: true + assert_eq!(args.get_escape(), Some(b'\\')); + + let args = parse_args(vec!["--input-format", "tsv", "--delimiter", ":"])?; + assert_eq!(args.input_format, CsvDialect::TSV); + assert_eq!(args.get_delimiter(), b':'); + assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF + assert_eq!(args.get_quote(), None); // quote none + assert_eq!(args.get_quoting(), Some(false)); + assert_eq!(args.get_escape(), None); + + Ok(()) + } + + #[test] + #[should_panic] + fn test_parse_arg_format_error() { + parse_args(vec!["--input-format", "excel"]).unwrap(); + } + + #[test] + fn test_parse_arg_compression_format() { + let args = parse_args(vec!["--parquet-compression", "uncompressed"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::UNCOMPRESSED); + let args = parse_args(vec!["--parquet-compression", "snappy"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::SNAPPY); + let args = parse_args(vec!["--parquet-compression", "gzip"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::GZIP); + let args = parse_args(vec!["--parquet-compression", "lzo"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::LZO); + let args = parse_args(vec!["--parquet-compression", "lz4"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::LZ4); + let args = parse_args(vec!["--parquet-compression", "brotli"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::BROTLI); + let args = parse_args(vec!["--parquet-compression", "zstd"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::ZSTD); + } + + #[test] + #[should_panic] + fn test_parse_arg_compression_format_fail() { + parse_args(vec!["--parquet-compression", "zip"]).unwrap(); + } + + fn assert_debug_text(debug_text: &str, name: &str, value: &str) { + let pattern = format!(" {}: {}", name, value); + assert!( + debug_text.contains(&pattern), + "\"{}\" not contains \"{}\"", + debug_text, + pattern + ) + } + + #[test] + fn test_configure_reader_builder() { + let args = Args { + schema: PathBuf::from(Path::new("schema.arvo")), + input_file: PathBuf::from(Path::new("test.csv")), + output_file: PathBuf::from(Path::new("out.parquet")), + batch_size: 1000, + input_format: CsvDialect::CSV, + has_header: false, + delimiter: None, + record_terminator: None, + escape_char: None, + quote_char: None, + double_quote: None, + parquet_compression: Compression::SNAPPY, + }; + let arrow_schema = Arc::new(Schema::new(vec![ + Field::new("field1", DataType::Utf8, false), + Field::new("field2", DataType::Utf8, false), + Field::new("field3", DataType::Utf8, false), + Field::new("field4", DataType::Utf8, false), + Field::new("field5", DataType::Utf8, false), + ])); + + let reader_builder = configure_reader_builder(&args, arrow_schema.clone()); + let builder_debug = format!("{:?}", reader_builder); + assert_debug_text(&builder_debug, "has_header", "false"); + assert_debug_text(&builder_debug, "delimiter", "Some(44)"); + assert_debug_text(&builder_debug, "quote", "Some(34)"); + assert_debug_text(&builder_debug, "terminator", "None"); + assert_debug_text(&builder_debug, "batch_size", "1000"); + assert_debug_text(&builder_debug, "escape", "None"); + + let args = Args { + schema: PathBuf::from(Path::new("schema.arvo")), + input_file: PathBuf::from(Path::new("test.csv")), + output_file: PathBuf::from(Path::new("out.parquet")), + batch_size: 2000, + input_format: CsvDialect::TSV, + has_header: true, + delimiter: None, + record_terminator: None, + escape_char: Some('\\'), + quote_char: None, + double_quote: None, + parquet_compression: Compression::SNAPPY, + }; + let arrow_schema = Arc::new(Schema::new(vec![ + Field::new("field1", DataType::Utf8, false), + Field::new("field2", DataType::Utf8, false), + Field::new("field3", DataType::Utf8, false), + Field::new("field4", DataType::Utf8, false), + Field::new("field5", DataType::Utf8, false), + ])); + let reader_builder = configure_reader_builder(&args, arrow_schema.clone()); + let builder_debug = format!("{:?}", reader_builder); + assert_debug_text(&builder_debug, "has_header", "true"); + assert_debug_text(&builder_debug, "delimiter", "Some(9)"); + assert_debug_text(&builder_debug, "quote", "None"); + assert_debug_text(&builder_debug, "terminator", "Some(10)"); + assert_debug_text(&builder_debug, "batch_size", "2000"); + assert_debug_text(&builder_debug, "escape", "Some(92)"); + } + + #[test] + fn test_convert_csv_to_parquet() { + let schema = NamedTempFile::new().unwrap(); + let schema_text = r"message schema { + optional int32 id; + optional binary name (STRING); + }"; + schema.as_file().write_all(schema_text.as_bytes()).unwrap(); + + let mut input_file = NamedTempFile::new().unwrap(); + { + let csv = input_file.as_file_mut(); + for index in 1..2000 { + write!(csv, "{},\"name_{}\"\x0d\x0a", index, index).unwrap(); Review Comment: ```suggestion write!(csv, "{},\"name_{}\"\r\n", index, index).unwrap(); ``` ########## parquet/src/bin/parquet-fromcsv.rs: ########## @@ -0,0 +1,575 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to converts csv to Parquet file +//! +//! # Install +//! +//! `parquet-fromcsv` can be installed using `cargo`: +//! +//! ```text +//! cargo install parquet --features=cli +//! ``` +//! +//! After this `parquet-fromcsv` shoud be available: +//! +//! ```text +//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet +//! ``` +//! +//! The binary can also be build form the source code and run as follows: +//! +//! ```text +//! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \ +//! \ input.csv output.parquet +//! ``` +//! +//! # Options +//! +//! ```text +//! parquet 14.0.0 +//! Apache Arrow <[email protected]> +//! Binary file to converts csv to Parquet file +//! +//! USAGE: +//! parquet-fromcsv [OPTIONS] --schema <SCHEMA> --input-file <INPUT_FILE> --output-file <OUTPUT_FILE> +//! +//! OPTIONS: +//! -b, --batch-size <BATCH_SIZE> +//! batch size +//! +//! [env: PARQUET_FROM_CSV_BATCHSIZE=] +//! [default: 1000] +//! +//! -c, --parquet-compression <PARQUET_COMPRESSION> +//! compression mode +//! +//! [default: SNAPPY] +//! +//! -d, --delimiter <DELIMITER> +//! field delimiter +//! +//! default value: when input_format==CSV: ',' when input_format==TSV: 'TAB' +//! +//! -D, --double-quote <DOUBLE_QUOTE> +//! double quote +//! +//! -e, --escape-char <ESCAPE_CHAR> +//! escape charactor +//! +//! -f, --input-format <INPUT_FORMAT> +//! input file format +//! +//! [default: csv] +//! [possible values: csv, tsv] +//! +//! -h, --has-header +//! has header +//! +//! --help +//! Print help information +//! +//! -i, --input-file <INPUT_FILE> +//! input CSV file +//! +//! -o, --output-file <OUTPUT_FILE> +//! output Parquet file +//! +//! -q, --quote-char <QUOTE_CHAR> +//! quate charactor +//! +//! -r, --record-terminator <RECORD_TERMINATOR> +//! record terminator +//! +//! [possible values: lf, crlf, cr] +//! +//! -s, --schema <SCHEMA> +//! message schema for output Parquet +//! +//! -V, --version +//! Print version information +//! ``` +//! +//! ## Parquet file options +//! +//! - `-b`, `--batch-size` : Batch size for Parquet +//! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY +//! - `-s`, `--schema` : path to message schema for generated Parquet file +//! - `-o`, `--output-file` : path to output parquet file +//! +//! ## Input file options +//! +//! - `-i`, `--input-file` : path to input CSV file +//! - `-f`, `--input-format` : dialect for input file, `csv` or `tsv`. +//! - `-d`, `--delimiter : Field delimitor for CSV file, default depends `--input-format` +//! - `-e`, `--escape` : Escape charactor for input file +//! - `-h`, `--has-header` : input has header +//! - `-r`, `--record-terminator` : record terminator charactor for input. default is CRLF +//! - `-q`, `--quote-char` : input quoting charactor +//! + +use std::{ + fs::{read_to_string, File}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::{bail, Context, Error, Result}; +use arrow::{csv::ReaderBuilder, datatypes::Schema}; +use clap::{ArgEnum, Parser}; +use parquet::{ + arrow::{parquet_to_arrow_schema, ArrowWriter}, + basic::Compression, + file::properties::WriterProperties, + schema::{parser::parse_message_type, types::SchemaDescriptor}, +}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary file to convert csv to Parquet file"), long_about=None)] +struct Args { + /// Parquet schema file path + #[clap(short, long, help("message schema for output Parquet"))] + schema: PathBuf, + /// input CSV file path + #[clap(short, long, help("input CSV file"))] + input_file: PathBuf, + /// output Parquet file path + #[clap(short, long, help("output Parquet file"))] + output_file: PathBuf, + /// input file format + #[clap( + arg_enum, + short('f'), + long, + help("input file format"), + default_value_t=CsvDialect::CSV + )] + input_format: CsvDialect, + /// batch size + #[clap( + short, + long, + help("batch size"), + default_value_t = 1000, + env = "PARQUET_FROM_CSV_BATCHSIZE" + )] + batch_size: usize, + /// has header line + #[clap(short, long, help("has header"))] + has_header: bool, + /// field delimiter + /// + /// default value: + /// when input_format==CSV: ',' + /// when input_format==TSV: 'TAB' + #[clap(short, long, help("field delimiter"))] + delimiter: Option<char>, + #[clap(arg_enum, short, long, help("record terminator"))] + record_terminator: Option<RecordTerminator>, + #[clap(short, long, help("escape charactor"))] + escape_char: Option<char>, + #[clap(short, long, help("quate charactor"))] + quote_char: Option<char>, + #[clap(short('D'), long, help("double quote"))] + double_quote: Option<bool>, + #[clap(short('c'), long, help("compression mode"), default_value_t=Compression::SNAPPY)] + #[clap(parse(try_from_str =compression_from_str))] + parquet_compression: Compression, +} + +fn compression_from_str(cmp: &str) -> Result<Compression, Error> { + match cmp.to_uppercase().as_str() { + "UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED), + "SNAPPY" => Ok(Compression::SNAPPY), + "GZIP" => Ok(Compression::GZIP), + "LZO" => Ok(Compression::LZO), + "BROTLI" => Ok(Compression::BROTLI), + "LZ4" => Ok(Compression::LZ4), + "ZSTD" => Ok(Compression::ZSTD), + v => bail!("Unknown compression {0} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD ", v), + } +} + +impl Args { + fn schema_path(&self) -> &Path { + self.schema.as_path() + } + fn get_delimiter(&self) -> u8 { + match self.delimiter { + Some(ch) => ch as u8, + None => match self.input_format { + CsvDialect::CSV => b',', + CsvDialect::TSV => b'\t', + }, + } + } + fn get_terminator(&self) -> Option<u8> { + match self.record_terminator { + Some(RecordTerminator::LF) => Some(0x0a), + Some(RecordTerminator::CR) => Some(0x0d), + Some(RecordTerminator::CRLF) => None, + None => match self.input_format { + CsvDialect::CSV => None, + CsvDialect::TSV => Some(0x0a), + }, + } + } + fn get_escape(&self) -> Option<u8> { + self.escape_char.map(|ch| ch as u8) + } + fn get_quote(&self) -> Option<u8> { + if self.quote_char.is_none() { + match self.input_format { + CsvDialect::CSV => Some(b'\"'), + CsvDialect::TSV => None, + } + } else { + self.quote_char.map(|c| c as u8) + } + } + fn get_quoting(&self) -> Option<bool> { + if let Some(_qch) = self.quote_char { + Some(true) + } else { + match self.input_format { + CsvDialect::CSV => None, + CsvDialect::TSV => Some(false), + } + } + } +} + +#[derive(Debug, Clone, Copy, ArgEnum, PartialEq)] +enum CsvDialect { + CSV, + TSV, +} + +#[derive(Debug, Clone, Copy, ArgEnum, PartialEq)] +enum RecordTerminator { + LF, + CRLF, + CR, +} + +fn configure_writer_properties(compression: Compression) -> WriterProperties { + let properties_builder = WriterProperties::builder().set_compression(compression); + properties_builder.build() +} + +fn configure_reader_builder(args: &Args, arrow_schema: Arc<Schema>) -> ReaderBuilder { + fn configure_reader<T, F: Fn(ReaderBuilder, T) -> ReaderBuilder>( + builder: ReaderBuilder, + value: Option<T>, + fun: F, + ) -> ReaderBuilder { + if let Some(val) = value { + fun(builder, val) + } else { + builder + } + } + + let mut builder = ReaderBuilder::new() + .with_schema(arrow_schema) + .with_batch_size(args.batch_size) + .has_header(args.has_header) + .with_delimiter(args.get_delimiter()); + + builder = configure_reader( + builder, + args.get_terminator(), + ReaderBuilder::with_terminator, + ); + builder = configure_reader(builder, args.get_escape(), ReaderBuilder::with_escape); + builder = configure_reader(builder, args.get_quote(), ReaderBuilder::with_quote); + + builder +} + +fn arrow_schema_from_string(schema: &str) -> Result<Arc<Schema>> { + let schema = Arc::new(parse_message_type(&schema)?); + let desc = SchemaDescriptor::new(schema); + let arrow_schema = Arc::new(parquet_to_arrow_schema(&desc, None)?); + Ok(arrow_schema) +} + +fn convert_csv_to_parquet(args: &Args) -> Result<()> { + let schema = read_to_string(args.schema_path()).with_context(|| { + format!("Failed to open schema file {:#?}", args.schema_path()) + })?; + let arrow_schema = arrow_schema_from_string(&schema)?; + + // create output parquet writer + let parquet_file = File::create(&args.output_file).context(format!( + "Failed to create output file {:#?}", + &args.output_file + ))?; + + let writer_properties = Some(configure_writer_properties(args.parquet_compression)); + let mut arrow_writer = + ArrowWriter::try_new(parquet_file, arrow_schema.clone(), writer_properties) + .context("Failed to create ArrowWriter")?; + + // open input file + let input_file = File::open(&args.input_file) + .with_context(|| format!("Failed to open input file {:#?}", &args.input_file))?; + // create input csv reader + let builder = configure_reader_builder(&args, arrow_schema); + let reader = builder.build(input_file)?; + for batch_result in reader { + let batch = batch_result.context("Failed to read RecordBatch from CSV")?; + arrow_writer + .write(&batch) + .context("Failed to write RecordBatch to parquet")?; + } + arrow_writer.close().context("Failed to close parquet")?; + Ok(()) +} + +fn main() -> Result<()> { + let args = Args::parse(); + convert_csv_to_parquet(&args) +} + +#[cfg(test)] +mod tests { + use std::{ + io::{Seek, SeekFrom, Write}, + path::{Path, PathBuf}, + }; + + use super::*; + use anyhow::Result; + use arrow::datatypes::{DataType, Field}; + use clap::Parser; + use tempfile::NamedTempFile; + + fn parse_args(mut extra_args: Vec<&str>) -> Result<Args> { + let mut args = vec![ + "test", + "--schema", + "test.schema", + "--input-file", + "infile.csv", + "--output-file", + "out.parquet", + ]; + args.append(&mut extra_args); + let args = Args::try_parse_from(args.iter())?; + Ok(args) + } + + #[test] + fn test_parse_arg_minimum() -> Result<()> { + let args = parse_args(vec![])?; + + assert_eq!(args.schema, PathBuf::from(Path::new("test.schema"))); + assert_eq!(args.input_file, PathBuf::from(Path::new("infile.csv"))); + assert_eq!(args.output_file, PathBuf::from(Path::new("out.parquet"))); + // test default values + assert_eq!(args.input_format, CsvDialect::CSV); + assert_eq!(args.batch_size, 1000); + assert_eq!(args.has_header, false); + assert_eq!(args.delimiter, None); + assert_eq!(args.get_delimiter(), b','); + assert_eq!(args.record_terminator, None); + assert_eq!(args.get_terminator(), None); // CRLF + assert_eq!(args.quote_char, None); + assert_eq!(args.get_quote(), Some(b'\"')); + assert_eq!(args.double_quote, None); + assert_eq!(args.parquet_compression, Compression::SNAPPY); + Ok(()) + } + + #[test] + fn test_parse_arg_format_variants() -> Result<()> { + let args = parse_args(vec!["--input-format", "csv"])?; + assert_eq!(args.input_format, CsvDialect::CSV); + assert_eq!(args.get_delimiter(), b','); + assert_eq!(args.get_terminator(), None); // CRLF + assert_eq!(args.get_quote(), Some(b'\"')); + assert_eq!(args.get_quoting(), None); // quoting default: true + assert_eq!(args.get_escape(), None); + let args = parse_args(vec!["--input-format", "tsv"])?; + assert_eq!(args.input_format, CsvDialect::TSV); + assert_eq!(args.get_delimiter(), b'\t'); + assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF + assert_eq!(args.get_quote(), None); // quote none + assert_eq!(args.get_quoting(), Some(false)); + assert_eq!(args.get_escape(), None); + + let args = parse_args(vec!["--input-format", "csv", "--escape-char", "\\"])?; + assert_eq!(args.input_format, CsvDialect::CSV); + assert_eq!(args.get_delimiter(), b','); + assert_eq!(args.get_terminator(), None); // CRLF + assert_eq!(args.get_quote(), Some(b'\"')); + assert_eq!(args.get_quoting(), None); // quoting default: true + assert_eq!(args.get_escape(), Some(b'\\')); + + let args = parse_args(vec!["--input-format", "tsv", "--delimiter", ":"])?; + assert_eq!(args.input_format, CsvDialect::TSV); + assert_eq!(args.get_delimiter(), b':'); + assert_eq!(args.get_terminator(), Some(b'\x0a')); // LF + assert_eq!(args.get_quote(), None); // quote none + assert_eq!(args.get_quoting(), Some(false)); + assert_eq!(args.get_escape(), None); + + Ok(()) + } + + #[test] + #[should_panic] + fn test_parse_arg_format_error() { + parse_args(vec!["--input-format", "excel"]).unwrap(); + } + + #[test] + fn test_parse_arg_compression_format() { + let args = parse_args(vec!["--parquet-compression", "uncompressed"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::UNCOMPRESSED); + let args = parse_args(vec!["--parquet-compression", "snappy"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::SNAPPY); + let args = parse_args(vec!["--parquet-compression", "gzip"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::GZIP); + let args = parse_args(vec!["--parquet-compression", "lzo"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::LZO); + let args = parse_args(vec!["--parquet-compression", "lz4"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::LZ4); + let args = parse_args(vec!["--parquet-compression", "brotli"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::BROTLI); + let args = parse_args(vec!["--parquet-compression", "zstd"]).unwrap(); + assert_eq!(args.parquet_compression, Compression::ZSTD); + } + + #[test] + #[should_panic] Review Comment: Could we check the panic message? ########## parquet/src/bin/parquet-fromcsv.rs: ########## @@ -0,0 +1,575 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to converts csv to Parquet file +//! +//! # Install +//! +//! `parquet-fromcsv` can be installed using `cargo`: +//! +//! ```text +//! cargo install parquet --features=cli +//! ``` +//! +//! After this `parquet-fromcsv` shoud be available: +//! +//! ```text +//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet +//! ``` +//! +//! The binary can also be build form the source code and run as follows: +//! +//! ```text +//! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \ +//! \ input.csv output.parquet +//! ``` +//! +//! # Options +//! +//! ```text +//! parquet 14.0.0 +//! Apache Arrow <[email protected]> +//! Binary file to converts csv to Parquet file +//! +//! USAGE: +//! parquet-fromcsv [OPTIONS] --schema <SCHEMA> --input-file <INPUT_FILE> --output-file <OUTPUT_FILE> +//! +//! OPTIONS: +//! -b, --batch-size <BATCH_SIZE> +//! batch size +//! +//! [env: PARQUET_FROM_CSV_BATCHSIZE=] +//! [default: 1000] +//! +//! -c, --parquet-compression <PARQUET_COMPRESSION> +//! compression mode +//! +//! [default: SNAPPY] +//! +//! -d, --delimiter <DELIMITER> +//! field delimiter +//! +//! default value: when input_format==CSV: ',' when input_format==TSV: 'TAB' +//! +//! -D, --double-quote <DOUBLE_QUOTE> +//! double quote +//! +//! -e, --escape-char <ESCAPE_CHAR> +//! escape charactor +//! +//! -f, --input-format <INPUT_FORMAT> +//! input file format +//! +//! [default: csv] +//! [possible values: csv, tsv] +//! +//! -h, --has-header +//! has header +//! +//! --help +//! Print help information +//! +//! -i, --input-file <INPUT_FILE> +//! input CSV file +//! +//! -o, --output-file <OUTPUT_FILE> +//! output Parquet file +//! +//! -q, --quote-char <QUOTE_CHAR> +//! quate charactor +//! +//! -r, --record-terminator <RECORD_TERMINATOR> +//! record terminator +//! +//! [possible values: lf, crlf, cr] +//! +//! -s, --schema <SCHEMA> +//! message schema for output Parquet +//! +//! -V, --version +//! Print version information +//! ``` +//! +//! ## Parquet file options +//! +//! - `-b`, `--batch-size` : Batch size for Parquet +//! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY +//! - `-s`, `--schema` : path to message schema for generated Parquet file +//! - `-o`, `--output-file` : path to output parquet file +//! +//! ## Input file options +//! +//! - `-i`, `--input-file` : path to input CSV file +//! - `-f`, `--input-format` : dialect for input file, `csv` or `tsv`. +//! - `-d`, `--delimiter : Field delimitor for CSV file, default depends `--input-format` +//! - `-e`, `--escape` : Escape charactor for input file +//! - `-h`, `--has-header` : input has header +//! - `-r`, `--record-terminator` : record terminator charactor for input. default is CRLF +//! - `-q`, `--quote-char` : input quoting charactor +//! + +use std::{ + fs::{read_to_string, File}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::{bail, Context, Error, Result}; +use arrow::{csv::ReaderBuilder, datatypes::Schema}; +use clap::{ArgEnum, Parser}; +use parquet::{ + arrow::{parquet_to_arrow_schema, ArrowWriter}, + basic::Compression, + file::properties::WriterProperties, + schema::{parser::parse_message_type, types::SchemaDescriptor}, +}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary file to convert csv to Parquet file"), long_about=None)] +struct Args { + /// Parquet schema file path Review Comment: ```suggestion /// Path to a text file containing a parquet schema definition ``` ########## parquet/src/bin/parquet-fromcsv.rs: ########## @@ -0,0 +1,575 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to converts csv to Parquet file +//! +//! # Install +//! +//! `parquet-fromcsv` can be installed using `cargo`: +//! +//! ```text +//! cargo install parquet --features=cli +//! ``` +//! +//! After this `parquet-fromcsv` shoud be available: +//! +//! ```text +//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet +//! ``` +//! +//! The binary can also be build form the source code and run as follows: +//! +//! ```text +//! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \ +//! \ input.csv output.parquet +//! ``` +//! +//! # Options +//! +//! ```text +//! parquet 14.0.0 +//! Apache Arrow <[email protected]> +//! Binary file to converts csv to Parquet file +//! +//! USAGE: +//! parquet-fromcsv [OPTIONS] --schema <SCHEMA> --input-file <INPUT_FILE> --output-file <OUTPUT_FILE> +//! +//! OPTIONS: +//! -b, --batch-size <BATCH_SIZE> +//! batch size +//! +//! [env: PARQUET_FROM_CSV_BATCHSIZE=] +//! [default: 1000] +//! +//! -c, --parquet-compression <PARQUET_COMPRESSION> +//! compression mode +//! +//! [default: SNAPPY] +//! +//! -d, --delimiter <DELIMITER> +//! field delimiter +//! +//! default value: when input_format==CSV: ',' when input_format==TSV: 'TAB' +//! +//! -D, --double-quote <DOUBLE_QUOTE> +//! double quote +//! +//! -e, --escape-char <ESCAPE_CHAR> +//! escape charactor +//! +//! -f, --input-format <INPUT_FORMAT> +//! input file format +//! +//! [default: csv] +//! [possible values: csv, tsv] +//! +//! -h, --has-header +//! has header +//! +//! --help +//! Print help information +//! +//! -i, --input-file <INPUT_FILE> +//! input CSV file +//! +//! -o, --output-file <OUTPUT_FILE> +//! output Parquet file +//! +//! -q, --quote-char <QUOTE_CHAR> +//! quate charactor +//! +//! -r, --record-terminator <RECORD_TERMINATOR> +//! record terminator +//! +//! [possible values: lf, crlf, cr] +//! +//! -s, --schema <SCHEMA> +//! message schema for output Parquet +//! +//! -V, --version +//! Print version information +//! ``` +//! +//! ## Parquet file options +//! +//! - `-b`, `--batch-size` : Batch size for Parquet +//! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY +//! - `-s`, `--schema` : path to message schema for generated Parquet file +//! - `-o`, `--output-file` : path to output parquet file +//! +//! ## Input file options +//! +//! - `-i`, `--input-file` : path to input CSV file +//! - `-f`, `--input-format` : dialect for input file, `csv` or `tsv`. +//! - `-d`, `--delimiter : Field delimitor for CSV file, default depends `--input-format` +//! - `-e`, `--escape` : Escape charactor for input file +//! - `-h`, `--has-header` : input has header +//! - `-r`, `--record-terminator` : record terminator charactor for input. default is CRLF +//! - `-q`, `--quote-char` : input quoting charactor +//! + +use std::{ + fs::{read_to_string, File}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::{bail, Context, Error, Result}; +use arrow::{csv::ReaderBuilder, datatypes::Schema}; +use clap::{ArgEnum, Parser}; +use parquet::{ + arrow::{parquet_to_arrow_schema, ArrowWriter}, + basic::Compression, + file::properties::WriterProperties, + schema::{parser::parse_message_type, types::SchemaDescriptor}, +}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary file to convert csv to Parquet file"), long_about=None)] +struct Args { + /// Parquet schema file path + #[clap(short, long, help("message schema for output Parquet"))] + schema: PathBuf, + /// input CSV file path + #[clap(short, long, help("input CSV file"))] + input_file: PathBuf, + /// output Parquet file path + #[clap(short, long, help("output Parquet file"))] + output_file: PathBuf, + /// input file format + #[clap( + arg_enum, + short('f'), + long, + help("input file format"), + default_value_t=CsvDialect::CSV + )] + input_format: CsvDialect, + /// batch size + #[clap( + short, + long, + help("batch size"), + default_value_t = 1000, + env = "PARQUET_FROM_CSV_BATCHSIZE" + )] + batch_size: usize, + /// has header line + #[clap(short, long, help("has header"))] + has_header: bool, + /// field delimiter + /// + /// default value: + /// when input_format==CSV: ',' + /// when input_format==TSV: 'TAB' + #[clap(short, long, help("field delimiter"))] + delimiter: Option<char>, + #[clap(arg_enum, short, long, help("record terminator"))] + record_terminator: Option<RecordTerminator>, + #[clap(short, long, help("escape charactor"))] + escape_char: Option<char>, + #[clap(short, long, help("quate charactor"))] + quote_char: Option<char>, + #[clap(short('D'), long, help("double quote"))] + double_quote: Option<bool>, + #[clap(short('c'), long, help("compression mode"), default_value_t=Compression::SNAPPY)] + #[clap(parse(try_from_str =compression_from_str))] + parquet_compression: Compression, +} + +fn compression_from_str(cmp: &str) -> Result<Compression, Error> { + match cmp.to_uppercase().as_str() { + "UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED), + "SNAPPY" => Ok(Compression::SNAPPY), + "GZIP" => Ok(Compression::GZIP), + "LZO" => Ok(Compression::LZO), + "BROTLI" => Ok(Compression::BROTLI), + "LZ4" => Ok(Compression::LZ4), + "ZSTD" => Ok(Compression::ZSTD), + v => bail!("Unknown compression {0} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD ", v), + } +} + +impl Args { + fn schema_path(&self) -> &Path { + self.schema.as_path() + } + fn get_delimiter(&self) -> u8 { + match self.delimiter { + Some(ch) => ch as u8, + None => match self.input_format { + CsvDialect::CSV => b',', + CsvDialect::TSV => b'\t', + }, + } + } + fn get_terminator(&self) -> Option<u8> { + match self.record_terminator { + Some(RecordTerminator::LF) => Some(0x0a), + Some(RecordTerminator::CR) => Some(0x0d), + Some(RecordTerminator::CRLF) => None, + None => match self.input_format { + CsvDialect::CSV => None, + CsvDialect::TSV => Some(0x0a), + }, + } + } + fn get_escape(&self) -> Option<u8> { + self.escape_char.map(|ch| ch as u8) + } + fn get_quote(&self) -> Option<u8> { + if self.quote_char.is_none() { + match self.input_format { + CsvDialect::CSV => Some(b'\"'), + CsvDialect::TSV => None, + } + } else { + self.quote_char.map(|c| c as u8) + } + } + fn get_quoting(&self) -> Option<bool> { Review Comment: This method is a bit confusing and only seems used by tests, perhaps we could remove it? ########## parquet/src/bin/parquet-fromcsv.rs: ########## @@ -0,0 +1,575 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to converts csv to Parquet file +//! +//! # Install +//! +//! `parquet-fromcsv` can be installed using `cargo`: +//! +//! ```text +//! cargo install parquet --features=cli +//! ``` +//! +//! After this `parquet-fromcsv` shoud be available: +//! +//! ```text +//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet +//! ``` +//! +//! The binary can also be build form the source code and run as follows: +//! +//! ```text +//! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \ +//! \ input.csv output.parquet +//! ``` +//! +//! # Options +//! +//! ```text +//! parquet 14.0.0 +//! Apache Arrow <[email protected]> +//! Binary file to converts csv to Parquet file +//! +//! USAGE: +//! parquet-fromcsv [OPTIONS] --schema <SCHEMA> --input-file <INPUT_FILE> --output-file <OUTPUT_FILE> +//! +//! OPTIONS: +//! -b, --batch-size <BATCH_SIZE> +//! batch size +//! +//! [env: PARQUET_FROM_CSV_BATCHSIZE=] +//! [default: 1000] +//! +//! -c, --parquet-compression <PARQUET_COMPRESSION> +//! compression mode +//! +//! [default: SNAPPY] +//! +//! -d, --delimiter <DELIMITER> +//! field delimiter +//! +//! default value: when input_format==CSV: ',' when input_format==TSV: 'TAB' +//! +//! -D, --double-quote <DOUBLE_QUOTE> +//! double quote +//! +//! -e, --escape-char <ESCAPE_CHAR> +//! escape charactor +//! +//! -f, --input-format <INPUT_FORMAT> +//! input file format +//! +//! [default: csv] +//! [possible values: csv, tsv] +//! +//! -h, --has-header +//! has header +//! +//! --help +//! Print help information +//! +//! -i, --input-file <INPUT_FILE> +//! input CSV file +//! +//! -o, --output-file <OUTPUT_FILE> +//! output Parquet file +//! +//! -q, --quote-char <QUOTE_CHAR> +//! quate charactor +//! +//! -r, --record-terminator <RECORD_TERMINATOR> +//! record terminator +//! +//! [possible values: lf, crlf, cr] +//! +//! -s, --schema <SCHEMA> +//! message schema for output Parquet +//! +//! -V, --version +//! Print version information +//! ``` +//! +//! ## Parquet file options +//! +//! - `-b`, `--batch-size` : Batch size for Parquet +//! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY +//! - `-s`, `--schema` : path to message schema for generated Parquet file +//! - `-o`, `--output-file` : path to output parquet file +//! +//! ## Input file options +//! +//! - `-i`, `--input-file` : path to input CSV file +//! - `-f`, `--input-format` : dialect for input file, `csv` or `tsv`. +//! - `-d`, `--delimiter : Field delimitor for CSV file, default depends `--input-format` +//! - `-e`, `--escape` : Escape charactor for input file +//! - `-h`, `--has-header` : input has header +//! - `-r`, `--record-terminator` : record terminator charactor for input. default is CRLF +//! - `-q`, `--quote-char` : input quoting charactor +//! + +use std::{ + fs::{read_to_string, File}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::{bail, Context, Error, Result}; +use arrow::{csv::ReaderBuilder, datatypes::Schema}; +use clap::{ArgEnum, Parser}; +use parquet::{ + arrow::{parquet_to_arrow_schema, ArrowWriter}, + basic::Compression, + file::properties::WriterProperties, + schema::{parser::parse_message_type, types::SchemaDescriptor}, +}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary file to convert csv to Parquet file"), long_about=None)] Review Comment: ```suggestion #[clap(author, version, about("Binary to convert csv to Parquet"), long_about=None)] ``` ########## parquet/src/bin/parquet-fromcsv.rs: ########## @@ -0,0 +1,575 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary file to converts csv to Parquet file +//! +//! # Install +//! +//! `parquet-fromcsv` can be installed using `cargo`: +//! +//! ```text +//! cargo install parquet --features=cli +//! ``` +//! +//! After this `parquet-fromcsv` shoud be available: +//! +//! ```text +//! parquet-fromcsv --schema message_schema_for_parquet.txt input.csv output.parquet +//! ``` +//! +//! The binary can also be build form the source code and run as follows: +//! +//! ```text +//! cargo run --features=cli --bin parquet-fromcsv --schema message_schema_for_parquet.txt \ +//! \ input.csv output.parquet +//! ``` +//! +//! # Options +//! +//! ```text +//! parquet 14.0.0 +//! Apache Arrow <[email protected]> +//! Binary file to converts csv to Parquet file +//! +//! USAGE: +//! parquet-fromcsv [OPTIONS] --schema <SCHEMA> --input-file <INPUT_FILE> --output-file <OUTPUT_FILE> +//! +//! OPTIONS: +//! -b, --batch-size <BATCH_SIZE> +//! batch size +//! +//! [env: PARQUET_FROM_CSV_BATCHSIZE=] +//! [default: 1000] +//! +//! -c, --parquet-compression <PARQUET_COMPRESSION> +//! compression mode +//! +//! [default: SNAPPY] +//! +//! -d, --delimiter <DELIMITER> +//! field delimiter +//! +//! default value: when input_format==CSV: ',' when input_format==TSV: 'TAB' +//! +//! -D, --double-quote <DOUBLE_QUOTE> +//! double quote +//! +//! -e, --escape-char <ESCAPE_CHAR> +//! escape charactor +//! +//! -f, --input-format <INPUT_FORMAT> +//! input file format +//! +//! [default: csv] +//! [possible values: csv, tsv] +//! +//! -h, --has-header +//! has header +//! +//! --help +//! Print help information +//! +//! -i, --input-file <INPUT_FILE> +//! input CSV file +//! +//! -o, --output-file <OUTPUT_FILE> +//! output Parquet file +//! +//! -q, --quote-char <QUOTE_CHAR> +//! quate charactor +//! +//! -r, --record-terminator <RECORD_TERMINATOR> +//! record terminator +//! +//! [possible values: lf, crlf, cr] +//! +//! -s, --schema <SCHEMA> +//! message schema for output Parquet +//! +//! -V, --version +//! Print version information +//! ``` +//! +//! ## Parquet file options +//! +//! - `-b`, `--batch-size` : Batch size for Parquet +//! - `-c`, `--parquet-compression` : Compression option for Parquet, default is SNAPPY +//! - `-s`, `--schema` : path to message schema for generated Parquet file +//! - `-o`, `--output-file` : path to output parquet file +//! +//! ## Input file options +//! +//! - `-i`, `--input-file` : path to input CSV file +//! - `-f`, `--input-format` : dialect for input file, `csv` or `tsv`. +//! - `-d`, `--delimiter : Field delimitor for CSV file, default depends `--input-format` +//! - `-e`, `--escape` : Escape charactor for input file +//! - `-h`, `--has-header` : input has header +//! - `-r`, `--record-terminator` : record terminator charactor for input. default is CRLF +//! - `-q`, `--quote-char` : input quoting charactor +//! + +use std::{ + fs::{read_to_string, File}, + path::{Path, PathBuf}, + sync::Arc, +}; + +use anyhow::{bail, Context, Error, Result}; +use arrow::{csv::ReaderBuilder, datatypes::Schema}; +use clap::{ArgEnum, Parser}; +use parquet::{ + arrow::{parquet_to_arrow_schema, ArrowWriter}, + basic::Compression, + file::properties::WriterProperties, + schema::{parser::parse_message_type, types::SchemaDescriptor}, +}; + +#[derive(Debug, Parser)] +#[clap(author, version, about("Binary file to convert csv to Parquet file"), long_about=None)] +struct Args { + /// Parquet schema file path + #[clap(short, long, help("message schema for output Parquet"))] + schema: PathBuf, + /// input CSV file path + #[clap(short, long, help("input CSV file"))] + input_file: PathBuf, + /// output Parquet file path + #[clap(short, long, help("output Parquet file"))] + output_file: PathBuf, + /// input file format + #[clap( + arg_enum, + short('f'), + long, + help("input file format"), + default_value_t=CsvDialect::CSV + )] + input_format: CsvDialect, + /// batch size + #[clap( + short, + long, + help("batch size"), + default_value_t = 1000, + env = "PARQUET_FROM_CSV_BATCHSIZE" + )] + batch_size: usize, + /// has header line + #[clap(short, long, help("has header"))] + has_header: bool, + /// field delimiter + /// + /// default value: + /// when input_format==CSV: ',' + /// when input_format==TSV: 'TAB' + #[clap(short, long, help("field delimiter"))] + delimiter: Option<char>, + #[clap(arg_enum, short, long, help("record terminator"))] + record_terminator: Option<RecordTerminator>, + #[clap(short, long, help("escape charactor"))] + escape_char: Option<char>, + #[clap(short, long, help("quate charactor"))] + quote_char: Option<char>, + #[clap(short('D'), long, help("double quote"))] + double_quote: Option<bool>, + #[clap(short('c'), long, help("compression mode"), default_value_t=Compression::SNAPPY)] + #[clap(parse(try_from_str =compression_from_str))] + parquet_compression: Compression, +} + +fn compression_from_str(cmp: &str) -> Result<Compression, Error> { + match cmp.to_uppercase().as_str() { + "UNCOMPRESSED" => Ok(Compression::UNCOMPRESSED), + "SNAPPY" => Ok(Compression::SNAPPY), + "GZIP" => Ok(Compression::GZIP), + "LZO" => Ok(Compression::LZO), + "BROTLI" => Ok(Compression::BROTLI), + "LZ4" => Ok(Compression::LZ4), + "ZSTD" => Ok(Compression::ZSTD), + v => bail!("Unknown compression {0} : possible values UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD ", v), + } +} + +impl Args { + fn schema_path(&self) -> &Path { + self.schema.as_path() + } + fn get_delimiter(&self) -> u8 { + match self.delimiter { + Some(ch) => ch as u8, + None => match self.input_format { + CsvDialect::CSV => b',', + CsvDialect::TSV => b'\t', + }, + } + } + fn get_terminator(&self) -> Option<u8> { + match self.record_terminator { + Some(RecordTerminator::LF) => Some(0x0a), + Some(RecordTerminator::CR) => Some(0x0d), + Some(RecordTerminator::CRLF) => None, + None => match self.input_format { + CsvDialect::CSV => None, + CsvDialect::TSV => Some(0x0a), + }, + } + } + fn get_escape(&self) -> Option<u8> { + self.escape_char.map(|ch| ch as u8) + } + fn get_quote(&self) -> Option<u8> { + if self.quote_char.is_none() { + match self.input_format { + CsvDialect::CSV => Some(b'\"'), + CsvDialect::TSV => None, + } + } else { + self.quote_char.map(|c| c as u8) + } + } + fn get_quoting(&self) -> Option<bool> { + if let Some(_qch) = self.quote_char { + Some(true) + } else { + match self.input_format { + CsvDialect::CSV => None, + CsvDialect::TSV => Some(false), + } + } + } +} + +#[derive(Debug, Clone, Copy, ArgEnum, PartialEq)] +enum CsvDialect { + CSV, + TSV, +} + +#[derive(Debug, Clone, Copy, ArgEnum, PartialEq)] +enum RecordTerminator { + LF, + CRLF, + CR, +} + +fn configure_writer_properties(compression: Compression) -> WriterProperties { + let properties_builder = WriterProperties::builder().set_compression(compression); Review Comment: Definitely could be done in a follow up, but the following might be good to expose: * Writer version * Max row group size (rows) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
