This is an automated email from the ASF dual-hosted git repository.
etseidl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new f725bc9b95 configurable data page v2 compression threshold (#9826)
f725bc9b95 is described below
commit f725bc9b955f23772a6a6d8a38c99a8b3f359116
Author: Leonardo Yvens <[email protected]>
AuthorDate: Fri May 1 14:01:03 2026 -0300
configurable data page v2 compression threshold (#9826)
# Which issue does this PR close?
- Closes #9827.
- Contributes to #8358
# Rationale for this change
Currently V2 pages will choose compression even if it gains a single
byte. This adds a `WriterProperties` configuration so the user can pick
the compression ratio at which the compressed page should be preferred.
# Are these changes tested?
Yes there is a test.
# Are there any user-facing changes?
Yes this adds a configuration flag to `WriterProperties`. Default is
kept. Though maybe the default should be lower?
---
parquet/src/column/writer/mod.rs | 56 ++++++++++++++++++-
parquet/src/file/properties.rs | 117 +++++++++++++++++++++++++++++++++++++++
2 files changed, 172 insertions(+), 1 deletion(-)
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 0c4e40b7ac..f755beed55 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -1115,7 +1115,12 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
Some(ref mut cmpr) => {
let buffer_len = buffer.len();
cmpr.compress(&values_data.buf, &mut buffer)?;
- if uncompressed_size <= buffer.len() - buffer_len {
+ let compressed_values_size = buffer.len() - buffer_len;
+ let threshold = self
+ .props
+
.column_data_page_v2_compression_ratio_threshold(self.descr.path());
+ if (compressed_values_size as f64) >=
(uncompressed_size as f64) * threshold
+ {
buffer.truncate(buffer_len);
buffer.extend_from_slice(&values_data.buf);
false
@@ -2418,6 +2423,55 @@ mod tests {
column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX,
10, 10);
}
+ #[test]
+ fn test_column_writer_v2_compression_ratio_threshold() {
+ fn write_v2_page(threshold: f64) -> bool {
+ let mut buf = Vec::with_capacity(4096);
+ let mut write = TrackedWrite::new(&mut buf);
+ let page_writer = Box::new(SerializedPageWriter::new(&mut write));
+ let props = Arc::new(
+ WriterProperties::builder()
+ .set_writer_version(WriterVersion::PARQUET_2_0)
+ .set_compression(Compression::SNAPPY)
+ .set_dictionary_enabled(false)
+ .set_data_page_v2_compression_ratio_threshold(threshold)
+ .build(),
+ );
+
+ let mut writer = get_test_column_writer::<Int32Type>(page_writer,
0, 0, props);
+ let values: Vec<i32> = vec![42; 4096];
+ writer.write_batch(&values, None, None).unwrap();
+ let r = writer.close().unwrap();
+ drop(write);
+
+ let reader_props = ReaderProperties::builder()
+ .set_backward_compatible_lz4(false)
+ .build();
+ let reader = SerializedPageReader::new_with_properties(
+ Arc::new(Bytes::from(buf)),
+ &r.metadata,
+ r.rows_written as usize,
+ None,
+ Arc::new(reader_props),
+ )
+ .unwrap();
+ let pages = reader.collect::<Result<Vec<_>>>().unwrap();
+ let data_page = pages
+ .iter()
+ .find(|p| p.page_type() == PageType::DATA_PAGE_V2)
+ .expect("expected a v2 data page");
+ match data_page {
+ Page::DataPageV2 { is_compressed, .. } => *is_compressed,
+ _ => unreachable!(),
+ }
+ }
+
+ // Default threshold keeps the compressed buffer for constant data.
+ assert!(write_v2_page(1.0));
+ // A strict threshold (require >1000x reduction) discards it.
+ assert!(!write_v2_page(0.001));
+ }
+
#[test]
fn test_column_writer_add_data_pages_with_dict() {
// ARROW-5129: Test verifies that we add data page in case of
dictionary encoding
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 2f7a16a32d..197c5d5c72 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -67,6 +67,8 @@ pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> =
Some(64);
pub const DEFAULT_OFFSET_INDEX_DISABLED: bool = false;
/// Default values for [`WriterProperties::coerce_types`]
pub const DEFAULT_COERCE_TYPES: bool = false;
+/// Default value for
[`WriterProperties::data_page_v2_compression_ratio_threshold`]
+pub const DEFAULT_DATA_PAGE_V2_COMPRESSION_RATIO_THRESHOLD: f64 = 1.0;
/// Default minimum chunk size for content-defined chunking: 256 KiB.
pub const DEFAULT_CDC_MIN_CHUNK_SIZE: usize = 256 * 1024;
/// Default maximum chunk size for content-defined chunking: 1024 KiB.
@@ -442,6 +444,30 @@ impl WriterProperties {
self.content_defined_chunking.as_ref()
}
+ /// Returns the compression ratio threshold at or above which a Data Page
v2's
+ /// compressed values are discarded in favor of writing the values
uncompressed.
+ ///
+ /// For more details see
[`WriterPropertiesBuilder::set_data_page_v2_compression_ratio_threshold`]
+ pub fn data_page_v2_compression_ratio_threshold(&self) -> f64 {
+ self.default_column_properties
+ .data_page_v2_compression_ratio_threshold()
+ .unwrap_or(DEFAULT_DATA_PAGE_V2_COMPRESSION_RATIO_THRESHOLD)
+ }
+
+ /// Returns the Data Page v2 compression ratio threshold for a specific
column.
+ ///
+ /// Takes precedence over
[`Self::data_page_v2_compression_ratio_threshold`].
+ pub fn column_data_page_v2_compression_ratio_threshold(&self, col:
&ColumnPath) -> f64 {
+ self.column_properties
+ .get(col)
+ .and_then(|c| c.data_page_v2_compression_ratio_threshold())
+ .or_else(|| {
+ self.default_column_properties
+ .data_page_v2_compression_ratio_threshold()
+ })
+ .unwrap_or(DEFAULT_DATA_PAGE_V2_COMPRESSION_RATIO_THRESHOLD)
+ }
+
/// Returns encoding for a data page, when dictionary encoding is enabled.
///
/// This is not configurable.
@@ -890,6 +916,35 @@ impl WriterPropertiesBuilder {
self
}
+ /// Sets the default compression ratio threshold at or above which a Data
Page
+ /// v2's compressed values are discarded in favor of writing the values
+ /// uncompressed, for all columns (defaults to `1.0` via
+ /// [`DEFAULT_DATA_PAGE_V2_COMPRESSION_RATIO_THRESHOLD`]).
+ ///
+ /// When writing a Data Page v2 with a configured compression codec, the
writer
+ /// first compresses the values and then compares the compressed size to
the
+ /// uncompressed size. If `compressed_size >= uncompressed_size *
threshold`, the
+ /// compressed buffer is discarded and the values are written uncompressed
for
+ /// that page (the page's `is_compressed` flag is set to `false`).
+ ///
+ /// The default of `1.0` preserves the historical behavior of only keeping
+ /// compression when it strictly reduces the size. Setting a value below
`1.0`
+ /// requires a minimum amount of size reduction to keep the compressed
page —
+ /// for example `0.9` requires at least a 10% reduction. Setting a value
above
+ /// `1.0` keeps the compressed buffer even if it's somewhat larger than the
+ /// uncompressed values.
+ ///
+ /// This setting only affects Data Page v2; Data Page v1 always stores the
+ /// compressor's output regardless of the resulting size.
+ ///
+ /// # Panics
+ /// If `value` is not finite or is not strictly positive.
+ pub fn set_data_page_v2_compression_ratio_threshold(mut self, value: f64)
-> Self {
+ self.default_column_properties
+ .set_data_page_v2_compression_ratio_threshold(value);
+ self
+ }
+
/// Sets FileEncryptionProperties (defaults to `None`)
#[cfg(feature = "encryption")]
pub fn with_file_encryption_properties(
@@ -1158,6 +1213,22 @@ impl WriterPropertiesBuilder {
self.get_mut_props(col).set_bloom_filter_ndv(value);
self
}
+
+ /// Sets the Data Page v2 compression ratio threshold for a specific
column.
+ ///
+ /// Takes precedence over
[`Self::set_data_page_v2_compression_ratio_threshold`].
+ ///
+ /// # Panics
+ /// If `value` is not finite or is not strictly positive.
+ pub fn set_column_data_page_v2_compression_ratio_threshold(
+ mut self,
+ col: ColumnPath,
+ value: f64,
+ ) -> Self {
+ self.get_mut_props(col)
+ .set_data_page_v2_compression_ratio_threshold(value);
+ self
+ }
}
impl From<WriterProperties> for WriterPropertiesBuilder {
@@ -1309,6 +1380,7 @@ struct ColumnProperties {
bloom_filter_properties: Option<BloomFilterProperties>,
/// Whether the bloom filter NDV was explicitly set by the user
bloom_filter_ndv_is_set: bool,
+ data_page_v2_compression_ratio_threshold: Option<f64>,
}
impl ColumnProperties {
@@ -1395,6 +1467,18 @@ impl ColumnProperties {
self.bloom_filter_ndv_is_set = true;
}
+ /// Sets the Data Page v2 compression ratio threshold for this column.
+ ///
+ /// # Panics
+ /// If `value` is not finite or is not strictly positive.
+ fn set_data_page_v2_compression_ratio_threshold(&mut self, value: f64) {
+ assert!(
+ value.is_finite() && value > 0.0,
+ "data_page_v2_compression_ratio_threshold must be a positive
finite number, got {value}"
+ );
+ self.data_page_v2_compression_ratio_threshold = Some(value);
+ }
+
/// Returns optional encoding for this column.
fn encoding(&self) -> Option<Encoding> {
self.encoding
@@ -1441,6 +1525,11 @@ impl ColumnProperties {
self.bloom_filter_properties.as_ref()
}
+ /// Returns optional Data Page v2 compression ratio threshold for this
column.
+ fn data_page_v2_compression_ratio_threshold(&self) -> Option<f64> {
+ self.data_page_v2_compression_ratio_threshold
+ }
+
/// If bloom filter is enabled and NDV was not explicitly set, resolve it
to the
/// given `default_ndv` (typically derived from `max_row_group_row_count`).
fn resolve_bloom_filter_ndv(&mut self, default_ndv: u64) {
@@ -1840,6 +1929,34 @@ mod tests {
);
}
+ #[test]
+ fn
test_writer_properties_column_data_page_v2_compression_ratio_threshold() {
+ let props = WriterProperties::builder()
+ .set_data_page_v2_compression_ratio_threshold(0.5)
+
.set_column_data_page_v2_compression_ratio_threshold(ColumnPath::from("col"),
0.1)
+ .build();
+
+ assert_eq!(props.data_page_v2_compression_ratio_threshold(), 0.5);
+ assert_eq!(
+
props.column_data_page_v2_compression_ratio_threshold(&ColumnPath::from("col")),
+ 0.1
+ );
+ assert_eq!(
+
props.column_data_page_v2_compression_ratio_threshold(&ColumnPath::from("other")),
+ 0.5
+ );
+ }
+
+ #[test]
+ #[should_panic(
+ expected = "data_page_v2_compression_ratio_threshold must be a
positive finite number"
+ )]
+ fn
test_writer_properties_panic_on_invalid_data_page_v2_compression_ratio_threshold()
{
+ WriterProperties::builder()
+ .set_data_page_v2_compression_ratio_threshold(0.0)
+ .build();
+ }
+
#[test]
fn test_writer_properties_column_dictionary_page_size_limit() {
let props = WriterProperties::builder()