This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new a94ccff9de feat: support parsing for parquet writer option (#4938)
a94ccff9de is described below
commit a94ccff9deac04ca075f6f05f81a5755af81348e
Author: fan <[email protected]>
AuthorDate: Wed Oct 18 17:36:43 2023 +0800
feat: support parsing for parquet writer option (#4938)
* feat: support parsing for parquet writer option
Signed-off-by: fan <[email protected]>
* fix clippy warning
Signed-off-by: fan <[email protected]>
* add tests
Signed-off-by: fan <[email protected]>
* follow reviews
Signed-off-by: fan <[email protected]>
* fix only support lower and uppercase
Signed-off-by: fan <[email protected]>
---------
Signed-off-by: fan <[email protected]>
---
parquet/src/basic.rs | 185 +++++++++++++++++++++++++++++++++++++++++
parquet/src/file/properties.rs | 68 +++++++++++++++
2 files changed, 253 insertions(+)
diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index cc8d033f42..cdad3597ff 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -18,6 +18,7 @@
//! Contains Rust mappings for Thrift definition.
//! Refer to
[`parquet.thrift`](https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift)
file to see raw definitions.
+use std::str::FromStr;
use std::{fmt, str};
pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel};
@@ -278,6 +279,29 @@ pub enum Encoding {
BYTE_STREAM_SPLIT,
}
+impl FromStr for Encoding {
+ type Err = ParquetError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "PLAIN" | "plain" => Ok(Encoding::PLAIN),
+ "PLAIN_DICTIONARY" | "plain_dictionary" =>
Ok(Encoding::PLAIN_DICTIONARY),
+ "RLE" | "rle" => Ok(Encoding::RLE),
+ "BIT_PACKED" | "bit_packed" => Ok(Encoding::BIT_PACKED),
+ "DELTA_BINARY_PACKED" | "delta_binary_packed" => {
+ Ok(Encoding::DELTA_BINARY_PACKED)
+ }
+ "DELTA_LENGTH_BYTE_ARRAY" | "delta_length_byte_array" => {
+ Ok(Encoding::DELTA_LENGTH_BYTE_ARRAY)
+ }
+ "DELTA_BYTE_ARRAY" | "delta_byte_array" =>
Ok(Encoding::DELTA_BYTE_ARRAY),
+ "RLE_DICTIONARY" | "rle_dictionary" =>
Ok(Encoding::RLE_DICTIONARY),
+ "BYTE_STREAM_SPLIT" | "byte_stream_split" =>
Ok(Encoding::BYTE_STREAM_SPLIT),
+ _ => Err(general_err!("unknown encoding: {}", s)),
+ }
+ }
+}
+
// ----------------------------------------------------------------------
// Mirrors `parquet::CompressionCodec`
@@ -295,6 +319,90 @@ pub enum Compression {
LZ4_RAW,
}
+fn split_compression_string(
+ str_setting: &str,
+) -> Result<(&str, Option<u32>), ParquetError> {
+ let split_setting = str_setting.split_once('(');
+
+ match split_setting {
+ Some((codec, level_str)) => {
+ let level =
+ &level_str[..level_str.len() - 1]
+ .parse::<u32>()
+ .map_err(|_| {
+ ParquetError::General(format!(
+ "invalid compression level: {}",
+ level_str
+ ))
+ })?;
+ Ok((codec, Some(*level)))
+ }
+ None => Ok((str_setting, None)),
+ }
+}
+
+fn check_level_is_none(level: &Option<u32>) -> Result<(), ParquetError> {
+ if level.is_some() {
+ return Err(ParquetError::General("level is not support".to_string()));
+ }
+
+ Ok(())
+}
+
+fn require_level(codec: &str, level: Option<u32>) -> Result<u32, ParquetError>
{
+ level.ok_or(ParquetError::General(format!("{} require level", codec)))
+}
+
+impl FromStr for Compression {
+ type Err = ParquetError;
+
+ fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+ let (codec, level) = split_compression_string(s)?;
+
+ let c = match codec {
+ "UNCOMPRESSED" | "uncompressed" => {
+ check_level_is_none(&level)?;
+ Compression::UNCOMPRESSED
+ }
+ "SNAPPY" | "snappy" => {
+ check_level_is_none(&level)?;
+ Compression::SNAPPY
+ }
+ "GZIP" | "gzip" => {
+ let level = require_level(codec, level)?;
+ Compression::GZIP(GzipLevel::try_new(level)?)
+ }
+ "LZO" | "lzo" => {
+ check_level_is_none(&level)?;
+ Compression::LZO
+ }
+ "BROTLI" | "brotli" => {
+ let level = require_level(codec, level)?;
+ Compression::BROTLI(BrotliLevel::try_new(level)?)
+ }
+ "LZ4" | "lz4" => {
+ check_level_is_none(&level)?;
+ Compression::LZ4
+ }
+ "ZSTD" | "zstd" => {
+ let level = require_level(codec, level)?;
+ Compression::ZSTD(ZstdLevel::try_new(level as i32)?)
+ }
+ "LZ4_RAW" | "lz4_raw" => {
+ check_level_is_none(&level)?;
+ Compression::LZ4_RAW
+ }
+ _ => {
+ return Err(ParquetError::General(format!(
+ "unsupport compression {codec}"
+ )));
+ }
+ };
+
+ Ok(c)
+ }
+}
+
// ----------------------------------------------------------------------
// Mirrors `parquet::PageType`
@@ -2130,4 +2238,81 @@ mod tests {
);
assert_eq!(ColumnOrder::UNDEFINED.sort_order(), SortOrder::SIGNED);
}
+
+ #[test]
+ fn test_parse_encoding() {
+ let mut encoding: Encoding = "PLAIN".parse().unwrap();
+ assert_eq!(encoding, Encoding::PLAIN);
+ encoding = "PLAIN_DICTIONARY".parse().unwrap();
+ assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
+ encoding = "RLE".parse().unwrap();
+ assert_eq!(encoding, Encoding::RLE);
+ encoding = "BIT_PACKED".parse().unwrap();
+ assert_eq!(encoding, Encoding::BIT_PACKED);
+ encoding = "DELTA_BINARY_PACKED".parse().unwrap();
+ assert_eq!(encoding, Encoding::DELTA_BINARY_PACKED);
+ encoding = "DELTA_LENGTH_BYTE_ARRAY".parse().unwrap();
+ assert_eq!(encoding, Encoding::DELTA_LENGTH_BYTE_ARRAY);
+ encoding = "DELTA_BYTE_ARRAY".parse().unwrap();
+ assert_eq!(encoding, Encoding::DELTA_BYTE_ARRAY);
+ encoding = "RLE_DICTIONARY".parse().unwrap();
+ assert_eq!(encoding, Encoding::RLE_DICTIONARY);
+ encoding = "BYTE_STREAM_SPLIT".parse().unwrap();
+ assert_eq!(encoding, Encoding::BYTE_STREAM_SPLIT);
+
+ // test lowercase
+ encoding = "byte_stream_split".parse().unwrap();
+ assert_eq!(encoding, Encoding::BYTE_STREAM_SPLIT);
+
+ // test unknown string
+ match "plain_xxx".parse::<Encoding>() {
+ Ok(e) => {
+ panic!("Should not be able to parse {:?}", e);
+ }
+ Err(e) => {
+ assert_eq!(e.to_string(), "Parquet error: unknown encoding:
plain_xxx");
+ }
+ }
+ }
+
+ #[test]
+ fn test_parse_compression() {
+ let mut compress: Compression = "snappy".parse().unwrap();
+ assert_eq!(compress, Compression::SNAPPY);
+ compress = "lzo".parse().unwrap();
+ assert_eq!(compress, Compression::LZO);
+ compress = "zstd(3)".parse().unwrap();
+ assert_eq!(compress,
Compression::ZSTD(ZstdLevel::try_new(3).unwrap()));
+ compress = "LZ4_RAW".parse().unwrap();
+ assert_eq!(compress, Compression::LZ4_RAW);
+ compress = "uncompressed".parse().unwrap();
+ assert_eq!(compress, Compression::UNCOMPRESSED);
+ compress = "snappy".parse().unwrap();
+ assert_eq!(compress, Compression::SNAPPY);
+ compress = "gzip(9)".parse().unwrap();
+ assert_eq!(compress,
Compression::GZIP(GzipLevel::try_new(9).unwrap()));
+ compress = "lzo".parse().unwrap();
+ assert_eq!(compress, Compression::LZO);
+ compress = "brotli(3)".parse().unwrap();
+ assert_eq!(
+ compress,
+ Compression::BROTLI(BrotliLevel::try_new(3).unwrap())
+ );
+ compress = "lz4".parse().unwrap();
+ assert_eq!(compress, Compression::LZ4);
+
+ // test unknown compression
+ let mut err = "plain_xxx".parse::<Encoding>().unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Parquet error: unknown encoding: plain_xxx"
+ );
+
+ // test invalid compress level
+ err = "gzip(-10)".parse::<Encoding>().unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Parquet error: unknown encoding: gzip(-10)"
+ );
+ }
}
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index c83fea3f9b..93b034cf4f 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -16,6 +16,7 @@
// under the License.
//! Configuration via [`WriterProperties`] and [`ReaderProperties`]
+use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};
use crate::basic::{Compression, Encoding};
@@ -72,6 +73,18 @@ impl WriterVersion {
}
}
+impl FromStr for WriterVersion {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "PARQUET_1_0" | "parquet_1_0" => Ok(WriterVersion::PARQUET_1_0),
+ "PARQUET_2_0" | "parquet_2_0" => Ok(WriterVersion::PARQUET_2_0),
+ _ => Err(format!("Invalid writer version: {}", s)),
+ }
+ }
+}
+
/// Reference counted writer properties.
pub type WriterPropertiesPtr = Arc<WriterProperties>;
@@ -655,6 +668,19 @@ pub enum EnabledStatistics {
Page,
}
+impl FromStr for EnabledStatistics {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "NONE" | "none" => Ok(EnabledStatistics::None),
+ "CHUNK" | "chunk" => Ok(EnabledStatistics::Chunk),
+ "PAGE" | "page" => Ok(EnabledStatistics::Page),
+ _ => Err(format!("Invalid statistics arg: {}", s)),
+ }
+ }
+}
+
impl Default for EnabledStatistics {
fn default() -> Self {
DEFAULT_STATISTICS_ENABLED
@@ -1182,4 +1208,46 @@ mod tests {
assert_eq!(props.codec_options(), &codec_options);
}
+
+ #[test]
+ fn test_parse_writerversion() {
+ let mut writer_version =
"PARQUET_1_0".parse::<WriterVersion>().unwrap();
+ assert_eq!(writer_version, WriterVersion::PARQUET_1_0);
+ writer_version = "PARQUET_2_0".parse::<WriterVersion>().unwrap();
+ assert_eq!(writer_version, WriterVersion::PARQUET_2_0);
+
+ // test lowercase
+ writer_version = "parquet_1_0".parse::<WriterVersion>().unwrap();
+ assert_eq!(writer_version, WriterVersion::PARQUET_1_0);
+
+ // test invalid version
+ match "PARQUET_-1_0".parse::<WriterVersion>() {
+ Ok(_) => panic!("Should not be able to parse PARQUET_-1_0"),
+ Err(e) => {
+ assert_eq!(e, "Invalid writer version: PARQUET_-1_0");
+ }
+ }
+ }
+
+ #[test]
+ fn test_parse_enabledstatistics() {
+ let mut enabled_statistics =
"NONE".parse::<EnabledStatistics>().unwrap();
+ assert_eq!(enabled_statistics, EnabledStatistics::None);
+ enabled_statistics = "CHUNK".parse::<EnabledStatistics>().unwrap();
+ assert_eq!(enabled_statistics, EnabledStatistics::Chunk);
+ enabled_statistics = "PAGE".parse::<EnabledStatistics>().unwrap();
+ assert_eq!(enabled_statistics, EnabledStatistics::Page);
+
+ // test lowercase
+ enabled_statistics = "none".parse::<EnabledStatistics>().unwrap();
+ assert_eq!(enabled_statistics, EnabledStatistics::None);
+
+ //test invalid statistics
+ match "ChunkAndPage".parse::<EnabledStatistics>() {
+ Ok(_) => panic!("Should not be able to parse ChunkAndPage"),
+ Err(e) => {
+ assert_eq!(e, "Invalid statistics arg: ChunkAndPage");
+ }
+ }
+ }
}