This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 547cb80ad Support Compression in parquet-fromcsv (#4160)
547cb80ad is described below

commit 547cb80ad02b77033f2ef20d21d2f94cd0507096
Author: 苏小刚 <[email protected]>
AuthorDate: Thu May 4 04:20:10 2023 +0800

    Support Compression in parquet-fromcsv (#4160)
    
    * parquet-fromcsv support read compressed
    
    * add test
    
    * fix for clippy
    
    * fix label error
    
    * add dependences for parquet_fromcsv
    
    * Unified import format of decompression packages
---
 parquet/Cargo.toml                       |   2 +-
 parquet/src/bin/parquet-fromcsv-help.txt |   7 +-
 parquet/src/bin/parquet-fromcsv.rs       | 126 ++++++++++++++++++++++++++++---
 3 files changed, 123 insertions(+), 12 deletions(-)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index ef5ea8cd1..84142824e 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -133,7 +133,7 @@ required-features = ["cli"]
 
 [[bin]]
 name = "parquet-fromcsv"
-required-features = ["arrow", "cli"]
+required-features = ["arrow", "cli", "snap", "brotli", "flate2", "lz4", "zstd"]
 
 [[bin]]
 name = "parquet-show-bloom-filter"
diff --git a/parquet/src/bin/parquet-fromcsv-help.txt 
b/parquet/src/bin/parquet-fromcsv-help.txt
index 44d75f5a0..ac38c5689 100644
--- a/parquet/src/bin/parquet-fromcsv-help.txt
+++ b/parquet/src/bin/parquet-fromcsv-help.txt
@@ -47,8 +47,13 @@ Options:
           
           [possible values: true, false]
 
+  -C, --csv-compression <CSV_COMPRESSION>
+          compression mode of csv
+          
+          [default: UNCOMPRESSED]
+
   -c, --parquet-compression <PARQUET_COMPRESSION>
-          compression mode
+          compression mode of parquet
           
           [default: SNAPPY]
 
diff --git a/parquet/src/bin/parquet-fromcsv.rs 
b/parquet/src/bin/parquet-fromcsv.rs
index 4e96fb878..f2a911c00 100644
--- a/parquet/src/bin/parquet-fromcsv.rs
+++ b/parquet/src/bin/parquet-fromcsv.rs
@@ -61,6 +61,7 @@
 //! ```text
 //! - `-i`, `--input-file` : Path to input CSV file
 //! - `-f`, `--input-format` : Dialect for input file, `csv` or `tsv`.
+//! - `-C`, `--csv-compression` : Compression option for csv, default is 
UNCOMPRESSED
 //! - `-d`, `--delimiter : Field delimiter for CSV file, default depends 
`--input-format`
 //! - `-e`, `--escape` : Escape character for input file
 //! - `-h`, `--has-header` : Input has header
@@ -72,6 +73,7 @@
 use std::{
     fmt::Display,
     fs::{read_to_string, File},
+    io::Read,
     path::{Path, PathBuf},
     sync::Arc,
 };
@@ -193,7 +195,10 @@ struct Args {
     quote_char: Option<char>,
     #[clap(short('D'), long, help("double quote"))]
     double_quote: Option<bool>,
-    #[clap(short('c'), long, help("compression mode"), 
default_value_t=Compression::SNAPPY)]
+    #[clap(short('C'), long, help("compression mode of csv"), 
default_value_t=Compression::UNCOMPRESSED)]
+    #[clap(value_parser=compression_from_str)]
+    csv_compression: Compression,
+    #[clap(short('c'), long, help("compression mode of parquet"), 
default_value_t=Compression::SNAPPY)]
     #[clap(value_parser=compression_from_str)]
     parquet_compression: Compression,
 
@@ -368,9 +373,31 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(), 
ParquetFromCsvError> {
             &format!("Failed to open input file {:#?}", &args.input_file),
         )
     })?;
+
+    // open input file decoder
+    let input_file_decoder = match args.csv_compression {
+        Compression::UNCOMPRESSED => Box::new(input_file) as Box<dyn Read>,
+        Compression::SNAPPY => {
+            Box::new(snap::read::FrameDecoder::new(input_file)) as Box<dyn 
Read>
+        }
+        Compression::GZIP(_) => {
+            Box::new(flate2::read::GzDecoder::new(input_file)) as Box<dyn Read>
+        }
+        Compression::BROTLI(_) => {
+            Box::new(brotli::Decompressor::new(input_file, 0)) as Box<dyn Read>
+        }
+        Compression::LZ4 => Box::new(lz4::Decoder::new(input_file).map_err(|e| 
{
+            ParquetFromCsvError::with_context(e, "Failed to create 
lz4::Decoder")
+        })?) as Box<dyn Read>,
+        Compression::ZSTD(_) => 
Box::new(zstd::Decoder::new(input_file).map_err(|e| {
+            ParquetFromCsvError::with_context(e, "Failed to create 
zstd::Decoder")
+        })?) as Box<dyn Read>,
+        d => unimplemented!("compression type {d}"),
+    };
+
     // create input csv reader
     let builder = configure_reader_builder(args, arrow_schema);
-    let reader = builder.build(input_file)?;
+    let reader = builder.build(input_file_decoder)?;
     for batch_result in reader {
         let batch = batch_result.map_err(|e| {
             ParquetFromCsvError::with_context(e, "Failed to read RecordBatch 
from CSV")
@@ -393,13 +420,17 @@ fn main() -> Result<(), ParquetFromCsvError> {
 #[cfg(test)]
 mod tests {
     use std::{
-        io::{Seek, Write},
+        io::Write,
         path::{Path, PathBuf},
     };
 
     use super::*;
     use arrow::datatypes::{DataType, Field};
+    use brotli::CompressorWriter;
     use clap::{CommandFactory, Parser};
+    use flate2::write::GzEncoder;
+    use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
+    use snap::write::FrameEncoder;
     use tempfile::NamedTempFile;
 
     #[test]
@@ -558,6 +589,7 @@ mod tests {
             escape_char: None,
             quote_char: None,
             double_quote: None,
+            csv_compression: Compression::UNCOMPRESSED,
             parquet_compression: Compression::SNAPPY,
             writer_version: None,
             max_row_group_size: None,
@@ -593,6 +625,7 @@ mod tests {
             escape_char: Some('\\'),
             quote_char: None,
             double_quote: None,
+            csv_compression: Compression::UNCOMPRESSED,
             parquet_compression: Compression::SNAPPY,
             writer_version: None,
             max_row_group_size: None,
@@ -616,8 +649,7 @@ mod tests {
         assert_debug_text(&builder_debug, "escape", "Some(92)");
     }
 
-    #[test]
-    fn test_convert_csv_to_parquet() {
+    fn test_convert_compressed_csv_to_parquet(csv_compression: Compression) {
         let schema = NamedTempFile::new().unwrap();
         let schema_text = r"message schema {
             optional int32 id;
@@ -626,14 +658,71 @@ mod tests {
         schema.as_file().write_all(schema_text.as_bytes()).unwrap();
 
         let mut input_file = NamedTempFile::new().unwrap();
-        {
-            let csv = input_file.as_file_mut();
+
+        fn write_tmp_file<T: Write>(w: &mut T) {
             for index in 1..2000 {
-                write!(csv, "{index},\"name_{index}\"\r\n").unwrap();
+                write!(w, "{index},\"name_{index}\"\r\n").unwrap();
             }
-            csv.flush().unwrap();
-            csv.rewind().unwrap();
+            w.flush().unwrap();
         }
+
+        // make sure the input_file's lifetime being long enough
+        input_file = match csv_compression {
+            Compression::UNCOMPRESSED => {
+                write_tmp_file(&mut input_file);
+                input_file
+            }
+            Compression::SNAPPY => {
+                let mut encoder = FrameEncoder::new(input_file);
+                write_tmp_file(&mut encoder);
+                encoder.into_inner().unwrap()
+            }
+            Compression::GZIP(level) => {
+                let mut encoder = GzEncoder::new(
+                    input_file,
+                    flate2::Compression::new(level.compression_level()),
+                );
+                write_tmp_file(&mut encoder);
+                encoder.finish().unwrap()
+            }
+            Compression::BROTLI(level) => {
+                let mut encoder =
+                    CompressorWriter::new(input_file, 0, 
level.compression_level(), 0);
+                write_tmp_file(&mut encoder);
+                encoder.into_inner()
+            }
+            Compression::LZ4 => {
+                let mut encoder = lz4::EncoderBuilder::new()
+                    .build(input_file)
+                    .map_err(|e| {
+                        ParquetFromCsvError::with_context(
+                            e,
+                            "Failed to create lz4::Encoder",
+                        )
+                    })
+                    .unwrap();
+                write_tmp_file(&mut encoder);
+                let (inner, err) = encoder.finish();
+                err.unwrap();
+                inner
+            }
+
+            Compression::ZSTD(level) => {
+                let mut encoder =
+                    zstd::Encoder::new(input_file, level.compression_level())
+                        .map_err(|e| {
+                            ParquetFromCsvError::with_context(
+                                e,
+                                "Failed to create zstd::Encoder",
+                            )
+                        })
+                        .unwrap();
+                write_tmp_file(&mut encoder);
+                encoder.finish().unwrap()
+            }
+            d => unimplemented!("compression type {d}"),
+        };
+
         let output_parquet = NamedTempFile::new().unwrap();
 
         let args = Args {
@@ -648,6 +737,7 @@ mod tests {
             escape_char: None,
             quote_char: None,
             double_quote: None,
+            csv_compression,
             parquet_compression: Compression::SNAPPY,
             writer_version: None,
             max_row_group_size: None,
@@ -657,4 +747,20 @@ mod tests {
         };
         convert_csv_to_parquet(&args).unwrap();
     }
+
+    #[test]
+    fn test_convert_csv_to_parquet() {
+        test_convert_compressed_csv_to_parquet(Compression::UNCOMPRESSED);
+        test_convert_compressed_csv_to_parquet(Compression::SNAPPY);
+        test_convert_compressed_csv_to_parquet(Compression::GZIP(
+            GzipLevel::try_new(1).unwrap(),
+        ));
+        test_convert_compressed_csv_to_parquet(Compression::BROTLI(
+            BrotliLevel::try_new(2).unwrap(),
+        ));
+        test_convert_compressed_csv_to_parquet(Compression::LZ4);
+        test_convert_compressed_csv_to_parquet(Compression::ZSTD(
+            ZstdLevel::try_new(1).unwrap(),
+        ));
+    }
 }

Reply via email to