This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 547cb80ad Support Compression in parquet-fromcsv (#4160)
547cb80ad is described below
commit 547cb80ad02b77033f2ef20d21d2f94cd0507096
Author: 苏小刚 <[email protected]>
AuthorDate: Thu May 4 04:20:10 2023 +0800
Support Compression in parquet-fromcsv (#4160)
* parquet-fromcsv support read compressed
* add test
* fix for clippy
* fix label error
* add dependences for parquet_fromcsv
* Unified import format of decompression packages
---
parquet/Cargo.toml | 2 +-
parquet/src/bin/parquet-fromcsv-help.txt | 7 +-
parquet/src/bin/parquet-fromcsv.rs | 126 ++++++++++++++++++++++++++++---
3 files changed, 123 insertions(+), 12 deletions(-)
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index ef5ea8cd1..84142824e 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -133,7 +133,7 @@ required-features = ["cli"]
[[bin]]
name = "parquet-fromcsv"
-required-features = ["arrow", "cli"]
+required-features = ["arrow", "cli", "snap", "brotli", "flate2", "lz4", "zstd"]
[[bin]]
name = "parquet-show-bloom-filter"
diff --git a/parquet/src/bin/parquet-fromcsv-help.txt
b/parquet/src/bin/parquet-fromcsv-help.txt
index 44d75f5a0..ac38c5689 100644
--- a/parquet/src/bin/parquet-fromcsv-help.txt
+++ b/parquet/src/bin/parquet-fromcsv-help.txt
@@ -47,8 +47,13 @@ Options:
[possible values: true, false]
+ -C, --csv-compression <CSV_COMPRESSION>
+ compression mode of csv
+
+ [default: UNCOMPRESSED]
+
-c, --parquet-compression <PARQUET_COMPRESSION>
- compression mode
+ compression mode of parquet
[default: SNAPPY]
diff --git a/parquet/src/bin/parquet-fromcsv.rs
b/parquet/src/bin/parquet-fromcsv.rs
index 4e96fb878..f2a911c00 100644
--- a/parquet/src/bin/parquet-fromcsv.rs
+++ b/parquet/src/bin/parquet-fromcsv.rs
@@ -61,6 +61,7 @@
//! ```text
//! - `-i`, `--input-file` : Path to input CSV file
//! - `-f`, `--input-format` : Dialect for input file, `csv` or `tsv`.
+//! - `-C`, `--csv-compression` : Compression option for csv, default is
UNCOMPRESSED
//! - `-d`, `--delimiter : Field delimiter for CSV file, default depends
`--input-format`
//! - `-e`, `--escape` : Escape character for input file
//! - `-h`, `--has-header` : Input has header
@@ -72,6 +73,7 @@
use std::{
fmt::Display,
fs::{read_to_string, File},
+ io::Read,
path::{Path, PathBuf},
sync::Arc,
};
@@ -193,7 +195,10 @@ struct Args {
quote_char: Option<char>,
#[clap(short('D'), long, help("double quote"))]
double_quote: Option<bool>,
- #[clap(short('c'), long, help("compression mode"),
default_value_t=Compression::SNAPPY)]
+ #[clap(short('C'), long, help("compression mode of csv"),
default_value_t=Compression::UNCOMPRESSED)]
+ #[clap(value_parser=compression_from_str)]
+ csv_compression: Compression,
+ #[clap(short('c'), long, help("compression mode of parquet"),
default_value_t=Compression::SNAPPY)]
#[clap(value_parser=compression_from_str)]
parquet_compression: Compression,
@@ -368,9 +373,31 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(),
ParquetFromCsvError> {
&format!("Failed to open input file {:#?}", &args.input_file),
)
})?;
+
+ // open input file decoder
+ let input_file_decoder = match args.csv_compression {
+ Compression::UNCOMPRESSED => Box::new(input_file) as Box<dyn Read>,
+ Compression::SNAPPY => {
+ Box::new(snap::read::FrameDecoder::new(input_file)) as Box<dyn
Read>
+ }
+ Compression::GZIP(_) => {
+ Box::new(flate2::read::GzDecoder::new(input_file)) as Box<dyn Read>
+ }
+ Compression::BROTLI(_) => {
+ Box::new(brotli::Decompressor::new(input_file, 0)) as Box<dyn Read>
+ }
+ Compression::LZ4 => Box::new(lz4::Decoder::new(input_file).map_err(|e|
{
+ ParquetFromCsvError::with_context(e, "Failed to create
lz4::Decoder")
+ })?) as Box<dyn Read>,
+ Compression::ZSTD(_) =>
Box::new(zstd::Decoder::new(input_file).map_err(|e| {
+ ParquetFromCsvError::with_context(e, "Failed to create
zstd::Decoder")
+ })?) as Box<dyn Read>,
+ d => unimplemented!("compression type {d}"),
+ };
+
// create input csv reader
let builder = configure_reader_builder(args, arrow_schema);
- let reader = builder.build(input_file)?;
+ let reader = builder.build(input_file_decoder)?;
for batch_result in reader {
let batch = batch_result.map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to read RecordBatch
from CSV")
@@ -393,13 +420,17 @@ fn main() -> Result<(), ParquetFromCsvError> {
#[cfg(test)]
mod tests {
use std::{
- io::{Seek, Write},
+ io::Write,
path::{Path, PathBuf},
};
use super::*;
use arrow::datatypes::{DataType, Field};
+ use brotli::CompressorWriter;
use clap::{CommandFactory, Parser};
+ use flate2::write::GzEncoder;
+ use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
+ use snap::write::FrameEncoder;
use tempfile::NamedTempFile;
#[test]
@@ -558,6 +589,7 @@ mod tests {
escape_char: None,
quote_char: None,
double_quote: None,
+ csv_compression: Compression::UNCOMPRESSED,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
@@ -593,6 +625,7 @@ mod tests {
escape_char: Some('\\'),
quote_char: None,
double_quote: None,
+ csv_compression: Compression::UNCOMPRESSED,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
@@ -616,8 +649,7 @@ mod tests {
assert_debug_text(&builder_debug, "escape", "Some(92)");
}
- #[test]
- fn test_convert_csv_to_parquet() {
+ fn test_convert_compressed_csv_to_parquet(csv_compression: Compression) {
let schema = NamedTempFile::new().unwrap();
let schema_text = r"message schema {
optional int32 id;
@@ -626,14 +658,71 @@ mod tests {
schema.as_file().write_all(schema_text.as_bytes()).unwrap();
let mut input_file = NamedTempFile::new().unwrap();
- {
- let csv = input_file.as_file_mut();
+
+ fn write_tmp_file<T: Write>(w: &mut T) {
for index in 1..2000 {
- write!(csv, "{index},\"name_{index}\"\r\n").unwrap();
+ write!(w, "{index},\"name_{index}\"\r\n").unwrap();
}
- csv.flush().unwrap();
- csv.rewind().unwrap();
+ w.flush().unwrap();
}
+
+ // make sure the input_file's lifetime being long enough
+ input_file = match csv_compression {
+ Compression::UNCOMPRESSED => {
+ write_tmp_file(&mut input_file);
+ input_file
+ }
+ Compression::SNAPPY => {
+ let mut encoder = FrameEncoder::new(input_file);
+ write_tmp_file(&mut encoder);
+ encoder.into_inner().unwrap()
+ }
+ Compression::GZIP(level) => {
+ let mut encoder = GzEncoder::new(
+ input_file,
+ flate2::Compression::new(level.compression_level()),
+ );
+ write_tmp_file(&mut encoder);
+ encoder.finish().unwrap()
+ }
+ Compression::BROTLI(level) => {
+ let mut encoder =
+ CompressorWriter::new(input_file, 0,
level.compression_level(), 0);
+ write_tmp_file(&mut encoder);
+ encoder.into_inner()
+ }
+ Compression::LZ4 => {
+ let mut encoder = lz4::EncoderBuilder::new()
+ .build(input_file)
+ .map_err(|e| {
+ ParquetFromCsvError::with_context(
+ e,
+ "Failed to create lz4::Encoder",
+ )
+ })
+ .unwrap();
+ write_tmp_file(&mut encoder);
+ let (inner, err) = encoder.finish();
+ err.unwrap();
+ inner
+ }
+
+ Compression::ZSTD(level) => {
+ let mut encoder =
+ zstd::Encoder::new(input_file, level.compression_level())
+ .map_err(|e| {
+ ParquetFromCsvError::with_context(
+ e,
+ "Failed to create zstd::Encoder",
+ )
+ })
+ .unwrap();
+ write_tmp_file(&mut encoder);
+ encoder.finish().unwrap()
+ }
+ d => unimplemented!("compression type {d}"),
+ };
+
let output_parquet = NamedTempFile::new().unwrap();
let args = Args {
@@ -648,6 +737,7 @@ mod tests {
escape_char: None,
quote_char: None,
double_quote: None,
+ csv_compression,
parquet_compression: Compression::SNAPPY,
writer_version: None,
max_row_group_size: None,
@@ -657,4 +747,20 @@ mod tests {
};
convert_csv_to_parquet(&args).unwrap();
}
+
+ #[test]
+ fn test_convert_csv_to_parquet() {
+ test_convert_compressed_csv_to_parquet(Compression::UNCOMPRESSED);
+ test_convert_compressed_csv_to_parquet(Compression::SNAPPY);
+ test_convert_compressed_csv_to_parquet(Compression::GZIP(
+ GzipLevel::try_new(1).unwrap(),
+ ));
+ test_convert_compressed_csv_to_parquet(Compression::BROTLI(
+ BrotliLevel::try_new(2).unwrap(),
+ ));
+ test_convert_compressed_csv_to_parquet(Compression::LZ4);
+ test_convert_compressed_csv_to_parquet(Compression::ZSTD(
+ ZstdLevel::try_new(1).unwrap(),
+ ));
+ }
}