This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 5d464b58f9 Add `CompressionCodec` Thrift enum for Parquet metadata
(#9864)
5d464b58f9 is described below
commit 5d464b58f9fc183141e4501df6759e17d4c71507
Author: Ed Seidl <[email protected]>
AuthorDate: Thu May 7 10:56:58 2026 -0700
Add `CompressionCodec` Thrift enum for Parquet metadata (#9864)
# Which issue does this PR close?
- Part of #9863.
# Rationale for this change
See issue for more, but the idea is to separate Parquet metadata
structures from those used for configuration. This can reduce memory
used by the metadata, and also allows use of the thrift macros, reducing
maintenance burden.
# What changes are included in this PR?
This adds a new `CompressionCodec` enum for use in the Parquet metadata,
and means to convert between `CompressionCodec` and `Compression`.
# Are these changes tested?
Should be covered by existing tests, but new test of the interchange is
also added.
# Are there any user-facing changes?
No
---
parquet/benches/metadata.rs | 2 +-
parquet/src/basic.rs | 155 ++++++++++++++++++++++----------
parquet/src/bin/parquet-layout.rs | 22 ++---
parquet/src/file/metadata/memory.rs | 4 +-
parquet/src/file/metadata/mod.rs | 63 +++++++------
parquet/src/file/metadata/thrift/mod.rs | 6 +-
parquet/src/file/writer.rs | 4 +-
parquet/src/schema/printer.rs | 6 +-
8 files changed, 167 insertions(+), 95 deletions(-)
diff --git a/parquet/benches/metadata.rs b/parquet/benches/metadata.rs
index 750e33a2d3..b6e04580c1 100644
--- a/parquet/benches/metadata.rs
+++ b/parquet/benches/metadata.rs
@@ -90,7 +90,7 @@ fn encoded_meta(is_nullable: bool, has_lists: bool,
write_path_in_schema: bool)
.map(|j| {
ColumnChunkMetaData::builder(column_desc_ptrs[j].clone())
.set_encodings(vec![Encoding::PLAIN,
Encoding::RLE_DICTIONARY])
-
.set_compression(parquet::basic::Compression::UNCOMPRESSED)
+
.set_compression_codec(parquet::basic::CompressionCodec::UNCOMPRESSED)
.set_num_values(rng.random_range(1..1000000))
.set_total_compressed_size(rng.random_range(50000..5000000))
.set_data_page_offset(rng.random_range(4..2000000000))
diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index ba8ffc2e92..cc7a16eca0 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -798,6 +798,33 @@ fn i32_to_encoding(val: i32) -> Encoding {
// ----------------------------------------------------------------------
// Mirrors thrift enum `CompressionCodec`
+thrift_enum!(
+/// Supported compression algorithms.
+///
+/// Codecs added in format version X.Y can be read by readers based on X.Y and
later.
+/// Codec support may vary between readers based on the format version and
+/// libraries available at runtime.
+///
+/// See [Compression.md] for a detailed specification of these algorithms.
+///
+/// [Compression.md]:
https://github.com/apache/parquet-format/blob/master/Compression.md
+enum CompressionCodec {
+ UNCOMPRESSED = 0;
+ SNAPPY = 1;
+ GZIP = 2;
+ LZO = 3;
+ BROTLI = 4; // Added in 2.4
+ LZ4 = 5; // DEPRECATED (Added in 2.4)
+ ZSTD = 6; // Added in 2.4
+ LZ4_RAW = 7; // Added in 2.9
+}
+);
+
+// NOTE: This enum likely belongs in file::properties now, but moving it there
would be a
+// breaking API change, that's probably not worth the pain. If a new codec is
added to the
+// Parquet specification, or any other breaking changes are made to this enum,
this can be
+// revisited.
+
/// Supported block compression algorithms.
///
/// Block compression can yield non-trivial improvements to storage efficiency
at the expense
@@ -834,51 +861,33 @@ pub enum Compression {
LZ4_RAW,
}
-impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for Compression {
- fn read_thrift(prot: &mut R) -> Result<Self> {
- let val = prot.read_i32()?;
- Ok(match val {
- 0 => Self::UNCOMPRESSED,
- 1 => Self::SNAPPY,
- 2 => Self::GZIP(Default::default()),
- 3 => Self::LZO,
- 4 => Self::BROTLI(Default::default()),
- 5 => Self::LZ4,
- 6 => Self::ZSTD(Default::default()),
- 7 => Self::LZ4_RAW,
- _ => return Err(general_err!("Unexpected CompressionCodec {}",
val)),
- })
- }
-}
-
-// TODO(ets): explore replacing this with a thrift_enum!(ThriftCompression)
for the serialization
-// and then provide `From` impls to convert back and forth. This is necessary
due to the addition
-// of compression level to some variants.
-impl WriteThrift for Compression {
- const ELEMENT_TYPE: ElementType = ElementType::I32;
-
- fn write_thrift<W: Write>(&self, writer: &mut
ThriftCompactOutputProtocol<W>) -> Result<()> {
- let id: i32 = match *self {
- Self::UNCOMPRESSED => 0,
- Self::SNAPPY => 1,
- Self::GZIP(_) => 2,
- Self::LZO => 3,
- Self::BROTLI(_) => 4,
- Self::LZ4 => 5,
- Self::ZSTD(_) => 6,
- Self::LZ4_RAW => 7,
- };
- writer.write_i32(id)
+impl From<CompressionCodec> for Compression {
+ fn from(value: CompressionCodec) -> Self {
+ match value {
+ CompressionCodec::UNCOMPRESSED => Compression::UNCOMPRESSED,
+ CompressionCodec::SNAPPY => Compression::SNAPPY,
+ CompressionCodec::GZIP => Compression::GZIP(Default::default()),
+ CompressionCodec::LZO => Compression::LZO,
+ CompressionCodec::BROTLI =>
Compression::BROTLI(Default::default()),
+ CompressionCodec::LZ4 => Compression::LZ4,
+ CompressionCodec::ZSTD => Compression::ZSTD(Default::default()),
+ CompressionCodec::LZ4_RAW => Compression::LZ4_RAW,
+ }
}
}
-write_thrift_field!(Compression, FieldType::I32);
-
-impl Compression {
- /// Returns the codec type of this compression setting as a string,
without the compression
- /// level.
- pub(crate) fn codec_to_string(self) -> String {
- format!("{self:?}").split('(').next().unwrap().to_owned()
+impl From<Compression> for CompressionCodec {
+ fn from(value: Compression) -> Self {
+ match value {
+ Compression::UNCOMPRESSED => CompressionCodec::UNCOMPRESSED,
+ Compression::SNAPPY => CompressionCodec::SNAPPY,
+ Compression::GZIP(_) => CompressionCodec::GZIP,
+ Compression::LZO => CompressionCodec::LZO,
+ Compression::BROTLI(_) => CompressionCodec::BROTLI,
+ Compression::LZ4 => CompressionCodec::LZ4,
+ Compression::ZSTD(_) => CompressionCodec::ZSTD,
+ Compression::LZ4_RAW => CompressionCodec::LZ4_RAW,
+ }
}
}
@@ -2140,11 +2149,65 @@ mod tests {
}
#[test]
- fn test_compression_codec_to_string() {
- assert_eq!(Compression::UNCOMPRESSED.codec_to_string(),
"UNCOMPRESSED");
+ fn test_compression_conversion() {
+ assert_eq!(
+ CompressionCodec::from(Compression::UNCOMPRESSED),
+ CompressionCodec::UNCOMPRESSED
+ );
+ assert_eq!(
+ CompressionCodec::from(Compression::SNAPPY),
+ CompressionCodec::SNAPPY
+ );
+ assert_eq!(
+ CompressionCodec::from(Compression::GZIP(Default::default())),
+ CompressionCodec::GZIP
+ );
+ assert_eq!(
+ CompressionCodec::from(Compression::LZO),
+ CompressionCodec::LZO
+ );
+ assert_eq!(
+ CompressionCodec::from(Compression::BROTLI(Default::default())),
+ CompressionCodec::BROTLI
+ );
+ assert_eq!(
+ CompressionCodec::from(Compression::LZ4),
+ CompressionCodec::LZ4
+ );
+ assert_eq!(
+ CompressionCodec::from(Compression::ZSTD(Default::default())),
+ CompressionCodec::ZSTD
+ );
+ assert_eq!(
+ CompressionCodec::from(Compression::LZ4_RAW),
+ CompressionCodec::LZ4_RAW
+ );
+
+ assert_eq!(
+ Compression::from(CompressionCodec::UNCOMPRESSED),
+ Compression::UNCOMPRESSED
+ );
+ assert_eq!(
+ Compression::from(CompressionCodec::SNAPPY),
+ Compression::SNAPPY
+ );
+ assert_eq!(
+ Compression::from(CompressionCodec::GZIP),
+ Compression::GZIP(Default::default())
+ );
+ assert_eq!(Compression::from(CompressionCodec::LZO), Compression::LZO);
+ assert_eq!(
+ Compression::from(CompressionCodec::BROTLI),
+ Compression::BROTLI(Default::default())
+ );
+ assert_eq!(Compression::from(CompressionCodec::LZ4), Compression::LZ4);
+ assert_eq!(
+ Compression::from(CompressionCodec::ZSTD),
+ Compression::ZSTD(Default::default())
+ );
assert_eq!(
- Compression::ZSTD(ZstdLevel::default()).codec_to_string(),
- "ZSTD"
+ Compression::from(CompressionCodec::LZ4_RAW),
+ Compression::LZ4_RAW
);
}
diff --git a/parquet/src/bin/parquet-layout.rs
b/parquet/src/bin/parquet-layout.rs
index 007f93517d..735a966513 100644
--- a/parquet/src/bin/parquet-layout.rs
+++ b/parquet/src/bin/parquet-layout.rs
@@ -48,7 +48,7 @@ use parquet::file::metadata::ParquetMetaDataReader;
use serde::Serialize;
use thrift::protocol::TCompactInputProtocol;
-use parquet::basic::Compression;
+use parquet::basic::CompressionCodec;
use parquet::errors::Result;
use parquet::file::reader::ChunkReader;
#[allow(deprecated)]
@@ -118,7 +118,7 @@ fn do_layout<C: ChunkReader>(reader: &C) ->
Result<ParquetFile> {
.iter()
.zip(schema.columns())
.map(|(column, column_schema)| {
- let compression = compression(column.compression());
+ let compression = compression(column.compression_codec());
let mut pages = vec![];
let mut start = column
@@ -225,16 +225,16 @@ fn read_page_header<C: ChunkReader>(reader: &C, offset:
u64) -> Result<(usize, P
}
/// Returns a string representation for a given compression
-fn compression(compression: Compression) -> Option<&'static str> {
+fn compression(compression: CompressionCodec) -> Option<&'static str> {
match compression {
- Compression::UNCOMPRESSED => None,
- Compression::SNAPPY => Some("snappy"),
- Compression::GZIP(_) => Some("gzip"),
- Compression::LZO => Some("lzo"),
- Compression::BROTLI(_) => Some("brotli"),
- Compression::LZ4 => Some("lz4"),
- Compression::ZSTD(_) => Some("zstd"),
- Compression::LZ4_RAW => Some("lz4_raw"),
+ CompressionCodec::UNCOMPRESSED => None,
+ CompressionCodec::SNAPPY => Some("snappy"),
+ CompressionCodec::GZIP => Some("gzip"),
+ CompressionCodec::LZO => Some("lzo"),
+ CompressionCodec::BROTLI => Some("brotli"),
+ CompressionCodec::LZ4 => Some("lz4"),
+ CompressionCodec::ZSTD => Some("zstd"),
+ CompressionCodec::LZ4_RAW => Some("lz4_raw"),
}
}
diff --git a/parquet/src/file/metadata/memory.rs
b/parquet/src/file/metadata/memory.rs
index 30c10e7f22..8a5937c43c 100644
--- a/parquet/src/file/metadata/memory.rs
+++ b/parquet/src/file/metadata/memory.rs
@@ -18,7 +18,7 @@
//! Memory calculations for [`ParquetMetadata::memory_size`]
//!
//! [`ParquetMetadata::memory_size`]:
crate::file::metadata::ParquetMetaData::memory_size
-use crate::basic::{BoundaryOrder, ColumnOrder, Compression, Encoding,
PageType};
+use crate::basic::{BoundaryOrder, ColumnOrder, CompressionCodec, Encoding,
PageType};
use crate::data_type::private::ParquetValueType;
use crate::file::metadata::{
ColumnChunkMetaData, FileMetaData, KeyValue, PageEncodingStats,
ParquetPageEncodingStats,
@@ -206,7 +206,7 @@ impl HeapSize for SortingColumn {
0 // no heap allocations
}
}
-impl HeapSize for Compression {
+impl HeapSize for CompressionCodec {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 156385ddaa..646438d2e9 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -95,9 +95,13 @@ pub(crate) mod reader;
pub(crate) mod thrift;
mod writer;
-use crate::basic::{EncodingMask, PageType};
+use crate::basic::{
+ BoundaryOrder, ColumnOrder, Compression, CompressionCodec, Encoding,
EncodingMask, PageType,
+ Type,
+};
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::FileDecryptor;
+use crate::errors::{ParquetError, Result};
#[cfg(feature = "encryption")]
use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
pub(crate) use crate::file::metadata::memory::HeapSize;
@@ -107,22 +111,15 @@ use
crate::file::page_index::column_index::{ByteArrayColumnIndex, PrimitiveColum
use crate::file::page_index::{column_index::ColumnIndexMetaData,
offset_index::PageLocation};
use crate::file::statistics::Statistics;
use crate::geospatial::statistics as geo_statistics;
+use crate::parquet_thrift::{
+ ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol,
+ WriteThrift, WriteThriftField,
+};
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr,
SchemaDescriptor,
Type as SchemaType,
};
use crate::thrift_struct;
-use crate::{
- basic::BoundaryOrder,
- errors::{ParquetError, Result},
-};
-use crate::{
- basic::{ColumnOrder, Compression, Encoding, Type},
- parquet_thrift::{
- ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
- ThriftCompactOutputProtocol, WriteThrift, WriteThriftField,
- },
-};
use crate::{
data_type::private::ParquetValueType,
file::page_index::offset_index::OffsetIndexMetaData,
};
@@ -814,7 +811,7 @@ pub struct ColumnChunkMetaData {
file_path: Option<String>,
file_offset: i64,
num_values: i64,
- compression: Compression,
+ compression: CompressionCodec,
total_compressed_size: i64,
total_uncompressed_size: i64,
data_page_offset: i64,
@@ -1016,8 +1013,18 @@ impl ColumnChunkMetaData {
self.num_values
}
- /// Compression for this column.
+ /// [`Compression`] for this column.
+ ///
+ /// This is a default value suitable for passing to
[`WriterPropertiesBuilder::set_compression`].
+ /// It is constructed from the `codec` field of the Parquet
`ColumnMetaData`
+ ///
+ /// [`WriterPropertiesBuilder::set_compression`]:
crate::file::properties::WriterPropertiesBuilder
pub fn compression(&self) -> Compression {
+ self.compression.into()
+ }
+
+ /// Returns the compression codec used when writing this column.
+ pub fn compression_codec(&self) -> CompressionCodec {
self.compression
}
@@ -1234,7 +1241,7 @@ impl ColumnChunkMetaDataBuilder {
file_path: None,
file_offset: 0,
num_values: 0,
- compression: Compression::UNCOMPRESSED,
+ compression: CompressionCodec::UNCOMPRESSED,
total_compressed_size: 0,
total_uncompressed_size: 0,
data_page_offset: 0,
@@ -1285,8 +1292,14 @@ impl ColumnChunkMetaDataBuilder {
self
}
- /// Sets compression.
+ /// Sets compression codec given a [`Compression`] configuration value.
pub fn set_compression(mut self, value: Compression) -> Self {
+ self.0.compression = value.into();
+ self
+ }
+
+ /// Sets compression codec.
+ pub fn set_compression_codec(mut self, value: CompressionCodec) -> Self {
self.0.compression = value;
self
}
@@ -1820,7 +1833,7 @@ mod tests {
))
.set_file_path("file_path".to_owned())
.set_num_values(1000)
- .set_compression(Compression::SNAPPY)
+ .set_compression_codec(CompressionCodec::SNAPPY)
.set_total_compressed_size(2000)
.set_total_uncompressed_size(3000)
.set_data_page_offset(4000)
@@ -1860,7 +1873,7 @@ mod tests {
))
.set_file_path("file_path".to_owned())
.set_num_values(1000)
- .set_compression(Compression::SNAPPY)
+ .set_compression_codec(CompressionCodec::SNAPPY)
.set_total_compressed_size(2000)
.set_total_uncompressed_size(3000)
.set_data_page_offset(4000)
@@ -1903,7 +1916,7 @@ mod tests {
[Encoding::PLAIN, Encoding::RLE].iter(),
))
.set_num_values(1000)
- .set_compression(Compression::SNAPPY)
+ .set_compression_codec(CompressionCodec::SNAPPY)
.set_total_compressed_size(2000)
.set_total_uncompressed_size(3000)
.set_data_page_offset(4000)
@@ -2034,9 +2047,9 @@ mod tests {
.build();
#[cfg(not(feature = "encryption"))]
- let base_expected_size = 2766;
+ let base_expected_size = 2734;
#[cfg(feature = "encryption")]
- let base_expected_size = 2934;
+ let base_expected_size = 2902;
assert_eq!(parquet_meta.memory_size(), base_expected_size);
@@ -2065,9 +2078,9 @@ mod tests {
.build();
#[cfg(not(feature = "encryption"))]
- let bigger_expected_size = 3192;
+ let bigger_expected_size = 3160;
#[cfg(feature = "encryption")]
- let bigger_expected_size = 3360;
+ let bigger_expected_size = 3328;
// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
@@ -2114,7 +2127,7 @@ mod tests {
.set_row_groups(row_group_meta.clone())
.build();
- let base_expected_size = 2058;
+ let base_expected_size = 2042;
assert_eq!(parquet_meta_data.memory_size(), base_expected_size);
let footer_key = "0123456789012345".as_bytes();
@@ -2140,7 +2153,7 @@ mod tests {
.set_file_decryptor(Some(decryptor))
.build();
- let expected_size_with_decryptor = 3072;
+ let expected_size_with_decryptor = 3056;
assert!(expected_size_with_decryptor > base_expected_size);
assert_eq!(
diff --git a/parquet/src/file/metadata/thrift/mod.rs
b/parquet/src/file/metadata/thrift/mod.rs
index 8c1147ff2c..b68af0c485 100644
--- a/parquet/src/file/metadata/thrift/mod.rs
+++ b/parquet/src/file/metadata/thrift/mod.rs
@@ -35,8 +35,8 @@ use crate::file::{
};
use crate::{
basic::{
- ColumnOrder, Compression, ConvertedType, Encoding, EncodingMask,
LogicalType, PageType,
- Repetition, Type,
+ ColumnOrder, CompressionCodec, ConvertedType, Encoding, EncodingMask,
LogicalType,
+ PageType, Repetition, Type,
},
data_type::{ByteArray, FixedLenByteArray, Int96},
errors::{ParquetError, Result},
@@ -460,7 +460,7 @@ fn read_column_metadata<'a>(
}
// 3: path_in_schema is redundant
4 => {
- column.compression = Compression::read_thrift(&mut *prot)?;
+ column.compression = CompressionCodec::read_thrift(&mut
*prot)?;
seen_mask |= COL_META_CODEC;
}
5 => {
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index beb83b71ce..ec4d02a47d 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -718,7 +718,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
let map_offset = |x| x - src_offset + write_offset as i64;
let mut builder =
ColumnChunkMetaData::builder(metadata.column_descr_ptr())
- .set_compression(metadata.compression())
+ .set_compression_codec(metadata.compression_codec())
.set_encodings_mask(*metadata.encodings_mask())
.set_total_compressed_size(metadata.compressed_size())
.set_total_uncompressed_size(metadata.uncompressed_size())
@@ -1597,7 +1597,7 @@ mod tests {
let desc = ColumnDescriptor::new(Arc::new(t), 0, 0,
ColumnPath::new(vec![]));
let meta = ColumnChunkMetaData::builder(Arc::new(desc))
- .set_compression(codec)
+ .set_compression_codec(codec.into())
.set_total_compressed_size(reader.len() as i64)
.set_num_values(total_num_values)
.build()
diff --git a/parquet/src/schema/printer.rs b/parquet/src/schema/printer.rs
index 4ad3b6b93e..68398005b6 100644
--- a/parquet/src/schema/printer.rs
+++ b/parquet/src/schema/printer.rs
@@ -177,11 +177,7 @@ fn print_column_chunk_metadata(out: &mut dyn io::Write,
cc_metadata: &ColumnChun
writeln!(out, "file path: {file_path_str}");
writeln!(out, "file offset: {}", cc_metadata.file_offset());
writeln!(out, "num of values: {}", cc_metadata.num_values());
- writeln!(
- out,
- "compression: {}",
- cc_metadata.compression().codec_to_string()
- );
+ writeln!(out, "compression: {}", cc_metadata.compression_codec());
writeln!(
out,
"total compressed size (in bytes): {}",