This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new a94ccff9de feat: support parsing for parquet writer option (#4938)
a94ccff9de is described below

commit a94ccff9deac04ca075f6f05f81a5755af81348e
Author: fan <[email protected]>
AuthorDate: Wed Oct 18 17:36:43 2023 +0800

    feat: support parsing for parquet writer option (#4938)
    
    * feat: support parsing for parquet writer option
    
    Signed-off-by: fan <[email protected]>
    
    * fix clippy warning
    
    Signed-off-by: fan <[email protected]>
    
    * add tests
    
    Signed-off-by: fan <[email protected]>
    
    * follow reviews
    
    Signed-off-by: fan <[email protected]>
    
    * fix only support lower and uppercase
    
    Signed-off-by: fan <[email protected]>
    
    ---------
    
    Signed-off-by: fan <[email protected]>
---
 parquet/src/basic.rs           | 185 +++++++++++++++++++++++++++++++++++++++++
 parquet/src/file/properties.rs |  68 +++++++++++++++
 2 files changed, 253 insertions(+)

diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index cc8d033f42..cdad3597ff 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -18,6 +18,7 @@
 //! Contains Rust mappings for Thrift definition.
 //! Refer to 
[`parquet.thrift`](https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift)
 file to see raw definitions.
 
+use std::str::FromStr;
 use std::{fmt, str};
 
 pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel};
@@ -278,6 +279,29 @@ pub enum Encoding {
     BYTE_STREAM_SPLIT,
 }
 
+impl FromStr for Encoding {
+    type Err = ParquetError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+            "PLAIN" | "plain" => Ok(Encoding::PLAIN),
+            "PLAIN_DICTIONARY" | "plain_dictionary" => 
Ok(Encoding::PLAIN_DICTIONARY),
+            "RLE" | "rle" => Ok(Encoding::RLE),
+            "BIT_PACKED" | "bit_packed" => Ok(Encoding::BIT_PACKED),
+            "DELTA_BINARY_PACKED" | "delta_binary_packed" => {
+                Ok(Encoding::DELTA_BINARY_PACKED)
+            }
+            "DELTA_LENGTH_BYTE_ARRAY" | "delta_length_byte_array" => {
+                Ok(Encoding::DELTA_LENGTH_BYTE_ARRAY)
+            }
+            "DELTA_BYTE_ARRAY" | "delta_byte_array" => 
Ok(Encoding::DELTA_BYTE_ARRAY),
+            "RLE_DICTIONARY" | "rle_dictionary" => 
Ok(Encoding::RLE_DICTIONARY),
+            "BYTE_STREAM_SPLIT" | "byte_stream_split" => 
Ok(Encoding::BYTE_STREAM_SPLIT),
+            _ => Err(general_err!("unknown encoding: {}", s)),
+        }
+    }
+}
+
 // ----------------------------------------------------------------------
 // Mirrors `parquet::CompressionCodec`
 
@@ -295,6 +319,90 @@ pub enum Compression {
     LZ4_RAW,
 }
 
+fn split_compression_string(
+    str_setting: &str,
+) -> Result<(&str, Option<u32>), ParquetError> {
+    let split_setting = str_setting.split_once('(');
+
+    match split_setting {
+        Some((codec, level_str)) => {
+            let level =
+                &level_str[..level_str.len() - 1]
+                    .parse::<u32>()
+                    .map_err(|_| {
+                        ParquetError::General(format!(
+                            "invalid compression level: {}",
+                            level_str
+                        ))
+                    })?;
+            Ok((codec, Some(*level)))
+        }
+        None => Ok((str_setting, None)),
+    }
+}
+
+fn check_level_is_none(level: &Option<u32>) -> Result<(), ParquetError> {
+    if level.is_some() {
+        return Err(ParquetError::General("level is not support".to_string()));
+    }
+
+    Ok(())
+}
+
+fn require_level(codec: &str, level: Option<u32>) -> Result<u32, ParquetError> 
{
+    level.ok_or(ParquetError::General(format!("{} require level", codec)))
+}
+
+impl FromStr for Compression {
+    type Err = ParquetError;
+
+    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+        let (codec, level) = split_compression_string(s)?;
+
+        let c = match codec {
+            "UNCOMPRESSED" | "uncompressed" => {
+                check_level_is_none(&level)?;
+                Compression::UNCOMPRESSED
+            }
+            "SNAPPY" | "snappy" => {
+                check_level_is_none(&level)?;
+                Compression::SNAPPY
+            }
+            "GZIP" | "gzip" => {
+                let level = require_level(codec, level)?;
+                Compression::GZIP(GzipLevel::try_new(level)?)
+            }
+            "LZO" | "lzo" => {
+                check_level_is_none(&level)?;
+                Compression::LZO
+            }
+            "BROTLI" | "brotli" => {
+                let level = require_level(codec, level)?;
+                Compression::BROTLI(BrotliLevel::try_new(level)?)
+            }
+            "LZ4" | "lz4" => {
+                check_level_is_none(&level)?;
+                Compression::LZ4
+            }
+            "ZSTD" | "zstd" => {
+                let level = require_level(codec, level)?;
+                Compression::ZSTD(ZstdLevel::try_new(level as i32)?)
+            }
+            "LZ4_RAW" | "lz4_raw" => {
+                check_level_is_none(&level)?;
+                Compression::LZ4_RAW
+            }
+            _ => {
+                return Err(ParquetError::General(format!(
+                    "unsupport compression {codec}"
+                )));
+            }
+        };
+
+        Ok(c)
+    }
+}
+
 // ----------------------------------------------------------------------
 // Mirrors `parquet::PageType`
 
@@ -2130,4 +2238,81 @@ mod tests {
         );
         assert_eq!(ColumnOrder::UNDEFINED.sort_order(), SortOrder::SIGNED);
     }
+
+    #[test]
+    fn test_parse_encoding() {
+        let mut encoding: Encoding = "PLAIN".parse().unwrap();
+        assert_eq!(encoding, Encoding::PLAIN);
+        encoding = "PLAIN_DICTIONARY".parse().unwrap();
+        assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
+        encoding = "RLE".parse().unwrap();
+        assert_eq!(encoding, Encoding::RLE);
+        encoding = "BIT_PACKED".parse().unwrap();
+        assert_eq!(encoding, Encoding::BIT_PACKED);
+        encoding = "DELTA_BINARY_PACKED".parse().unwrap();
+        assert_eq!(encoding, Encoding::DELTA_BINARY_PACKED);
+        encoding = "DELTA_LENGTH_BYTE_ARRAY".parse().unwrap();
+        assert_eq!(encoding, Encoding::DELTA_LENGTH_BYTE_ARRAY);
+        encoding = "DELTA_BYTE_ARRAY".parse().unwrap();
+        assert_eq!(encoding, Encoding::DELTA_BYTE_ARRAY);
+        encoding = "RLE_DICTIONARY".parse().unwrap();
+        assert_eq!(encoding, Encoding::RLE_DICTIONARY);
+        encoding = "BYTE_STREAM_SPLIT".parse().unwrap();
+        assert_eq!(encoding, Encoding::BYTE_STREAM_SPLIT);
+
+        // test lowercase
+        encoding = "byte_stream_split".parse().unwrap();
+        assert_eq!(encoding, Encoding::BYTE_STREAM_SPLIT);
+
+        // test unknown string
+        match "plain_xxx".parse::<Encoding>() {
+            Ok(e) => {
+                panic!("Should not be able to parse {:?}", e);
+            }
+            Err(e) => {
+                assert_eq!(e.to_string(), "Parquet error: unknown encoding: 
plain_xxx");
+            }
+        }
+    }
+
+    #[test]
+    fn test_parse_compression() {
+        let mut compress: Compression = "snappy".parse().unwrap();
+        assert_eq!(compress, Compression::SNAPPY);
+        compress = "lzo".parse().unwrap();
+        assert_eq!(compress, Compression::LZO);
+        compress = "zstd(3)".parse().unwrap();
+        assert_eq!(compress, 
Compression::ZSTD(ZstdLevel::try_new(3).unwrap()));
+        compress = "LZ4_RAW".parse().unwrap();
+        assert_eq!(compress, Compression::LZ4_RAW);
+        compress = "uncompressed".parse().unwrap();
+        assert_eq!(compress, Compression::UNCOMPRESSED);
+        compress = "snappy".parse().unwrap();
+        assert_eq!(compress, Compression::SNAPPY);
+        compress = "gzip(9)".parse().unwrap();
+        assert_eq!(compress, 
Compression::GZIP(GzipLevel::try_new(9).unwrap()));
+        compress = "lzo".parse().unwrap();
+        assert_eq!(compress, Compression::LZO);
+        compress = "brotli(3)".parse().unwrap();
+        assert_eq!(
+            compress,
+            Compression::BROTLI(BrotliLevel::try_new(3).unwrap())
+        );
+        compress = "lz4".parse().unwrap();
+        assert_eq!(compress, Compression::LZ4);
+
+        // test unknown compression
+        let mut err = "plain_xxx".parse::<Encoding>().unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Parquet error: unknown encoding: plain_xxx"
+        );
+
+        // test invalid compress level
+        err = "gzip(-10)".parse::<Encoding>().unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Parquet error: unknown encoding: gzip(-10)"
+        );
+    }
 }
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index c83fea3f9b..93b034cf4f 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 //! Configuration via [`WriterProperties`] and [`ReaderProperties`]
+use std::str::FromStr;
 use std::{collections::HashMap, sync::Arc};
 
 use crate::basic::{Compression, Encoding};
@@ -72,6 +73,18 @@ impl WriterVersion {
     }
 }
 
+impl FromStr for WriterVersion {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+            "PARQUET_1_0" | "parquet_1_0" => Ok(WriterVersion::PARQUET_1_0),
+            "PARQUET_2_0" | "parquet_2_0" => Ok(WriterVersion::PARQUET_2_0),
+            _ => Err(format!("Invalid writer version: {}", s)),
+        }
+    }
+}
+
 /// Reference counted writer properties.
 pub type WriterPropertiesPtr = Arc<WriterProperties>;
 
@@ -655,6 +668,19 @@ pub enum EnabledStatistics {
     Page,
 }
 
+impl FromStr for EnabledStatistics {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+            "NONE" | "none" => Ok(EnabledStatistics::None),
+            "CHUNK" | "chunk" => Ok(EnabledStatistics::Chunk),
+            "PAGE" | "page" => Ok(EnabledStatistics::Page),
+            _ => Err(format!("Invalid statistics arg: {}", s)),
+        }
+    }
+}
+
 impl Default for EnabledStatistics {
     fn default() -> Self {
         DEFAULT_STATISTICS_ENABLED
@@ -1182,4 +1208,46 @@ mod tests {
 
         assert_eq!(props.codec_options(), &codec_options);
     }
+
+    #[test]
+    fn test_parse_writerversion() {
+        let mut writer_version = 
"PARQUET_1_0".parse::<WriterVersion>().unwrap();
+        assert_eq!(writer_version, WriterVersion::PARQUET_1_0);
+        writer_version = "PARQUET_2_0".parse::<WriterVersion>().unwrap();
+        assert_eq!(writer_version, WriterVersion::PARQUET_2_0);
+
+        // test lowercase
+        writer_version = "parquet_1_0".parse::<WriterVersion>().unwrap();
+        assert_eq!(writer_version, WriterVersion::PARQUET_1_0);
+
+        // test invalid version
+        match "PARQUET_-1_0".parse::<WriterVersion>() {
+            Ok(_) => panic!("Should not be able to parse PARQUET_-1_0"),
+            Err(e) => {
+                assert_eq!(e, "Invalid writer version: PARQUET_-1_0");
+            }
+        }
+    }
+
+    #[test]
+    fn test_parse_enabledstatistics() {
+        let mut enabled_statistics = 
"NONE".parse::<EnabledStatistics>().unwrap();
+        assert_eq!(enabled_statistics, EnabledStatistics::None);
+        enabled_statistics = "CHUNK".parse::<EnabledStatistics>().unwrap();
+        assert_eq!(enabled_statistics, EnabledStatistics::Chunk);
+        enabled_statistics = "PAGE".parse::<EnabledStatistics>().unwrap();
+        assert_eq!(enabled_statistics, EnabledStatistics::Page);
+
+        // test lowercase
+        enabled_statistics = "none".parse::<EnabledStatistics>().unwrap();
+        assert_eq!(enabled_statistics, EnabledStatistics::None);
+
+        //test invalid statistics
+        match "ChunkAndPage".parse::<EnabledStatistics>() {
+            Ok(_) => panic!("Should not be able to parse ChunkAndPage"),
+            Err(e) => {
+                assert_eq!(e, "Invalid statistics arg: ChunkAndPage");
+            }
+        }
+    }
 }

Reply via email to