This is an automated email from the ASF dual-hosted git repository.

etseidl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new f725bc9b95 configurable data page v2 compression threshold (#9826)
f725bc9b95 is described below

commit f725bc9b955f23772a6a6d8a38c99a8b3f359116
Author: Leonardo Yvens <[email protected]>
AuthorDate: Fri May 1 14:01:03 2026 -0300

    configurable data page v2 compression threshold (#9826)
    
    # Which issue does this PR close?
    
    - Closes #9827.
    - Contributes to #8358
    
    # Rationale for this change
    
    Currently V2 pages will choose compression even if it gains a single
    byte. This adds a `WriterProperties` configuration so the user can pick
    the compression ratio at which the compressed page should be preferred.
    
    # Are these changes tested?
    
    Yes there is a test.
    
    # Are there any user-facing changes?
    
    Yes this adds a configuration flag to `WriterProperties`. Default is
    kept. Though maybe the default should be lower?
---
 parquet/src/column/writer/mod.rs |  56 ++++++++++++++++++-
 parquet/src/file/properties.rs   | 117 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 172 insertions(+), 1 deletion(-)

diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 0c4e40b7ac..f755beed55 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -1115,7 +1115,12 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
                     Some(ref mut cmpr) => {
                         let buffer_len = buffer.len();
                         cmpr.compress(&values_data.buf, &mut buffer)?;
-                        if uncompressed_size <= buffer.len() - buffer_len {
+                        let compressed_values_size = buffer.len() - buffer_len;
+                        let threshold = self
+                            .props
+                            
.column_data_page_v2_compression_ratio_threshold(self.descr.path());
+                        if (compressed_values_size as f64) >= 
(uncompressed_size as f64) * threshold
+                        {
                             buffer.truncate(buffer_len);
                             buffer.extend_from_slice(&values_data.buf);
                             false
@@ -2418,6 +2423,55 @@ mod tests {
         column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 
10, 10);
     }
 
+    #[test]
+    fn test_column_writer_v2_compression_ratio_threshold() {
+        fn write_v2_page(threshold: f64) -> bool {
+            let mut buf = Vec::with_capacity(4096);
+            let mut write = TrackedWrite::new(&mut buf);
+            let page_writer = Box::new(SerializedPageWriter::new(&mut write));
+            let props = Arc::new(
+                WriterProperties::builder()
+                    .set_writer_version(WriterVersion::PARQUET_2_0)
+                    .set_compression(Compression::SNAPPY)
+                    .set_dictionary_enabled(false)
+                    .set_data_page_v2_compression_ratio_threshold(threshold)
+                    .build(),
+            );
+
+            let mut writer = get_test_column_writer::<Int32Type>(page_writer, 
0, 0, props);
+            let values: Vec<i32> = vec![42; 4096];
+            writer.write_batch(&values, None, None).unwrap();
+            let r = writer.close().unwrap();
+            drop(write);
+
+            let reader_props = ReaderProperties::builder()
+                .set_backward_compatible_lz4(false)
+                .build();
+            let reader = SerializedPageReader::new_with_properties(
+                Arc::new(Bytes::from(buf)),
+                &r.metadata,
+                r.rows_written as usize,
+                None,
+                Arc::new(reader_props),
+            )
+            .unwrap();
+            let pages = reader.collect::<Result<Vec<_>>>().unwrap();
+            let data_page = pages
+                .iter()
+                .find(|p| p.page_type() == PageType::DATA_PAGE_V2)
+                .expect("expected a v2 data page");
+            match data_page {
+                Page::DataPageV2 { is_compressed, .. } => *is_compressed,
+                _ => unreachable!(),
+            }
+        }
+
+        // Default threshold keeps the compressed buffer for constant data.
+        assert!(write_v2_page(1.0));
+        // A strict threshold (require >1000x reduction) discards it.
+        assert!(!write_v2_page(0.001));
+    }
+
     #[test]
     fn test_column_writer_add_data_pages_with_dict() {
         // ARROW-5129: Test verifies that we add data page in case of 
dictionary encoding
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 2f7a16a32d..197c5d5c72 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -67,6 +67,8 @@ pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = 
Some(64);
 pub const DEFAULT_OFFSET_INDEX_DISABLED: bool = false;
 /// Default values for [`WriterProperties::coerce_types`]
 pub const DEFAULT_COERCE_TYPES: bool = false;
+/// Default value for 
[`WriterProperties::data_page_v2_compression_ratio_threshold`]
+pub const DEFAULT_DATA_PAGE_V2_COMPRESSION_RATIO_THRESHOLD: f64 = 1.0;
 /// Default minimum chunk size for content-defined chunking: 256 KiB.
 pub const DEFAULT_CDC_MIN_CHUNK_SIZE: usize = 256 * 1024;
 /// Default maximum chunk size for content-defined chunking: 1024 KiB.
@@ -442,6 +444,30 @@ impl WriterProperties {
         self.content_defined_chunking.as_ref()
     }
 
+    /// Returns the compression ratio threshold at or above which a Data Page 
v2's
+    /// compressed values are discarded in favor of writing the values 
uncompressed.
+    ///
+    /// For more details see 
[`WriterPropertiesBuilder::set_data_page_v2_compression_ratio_threshold`]
+    pub fn data_page_v2_compression_ratio_threshold(&self) -> f64 {
+        self.default_column_properties
+            .data_page_v2_compression_ratio_threshold()
+            .unwrap_or(DEFAULT_DATA_PAGE_V2_COMPRESSION_RATIO_THRESHOLD)
+    }
+
+    /// Returns the Data Page v2 compression ratio threshold for a specific 
column.
+    ///
+    /// Takes precedence over 
[`Self::data_page_v2_compression_ratio_threshold`].
+    pub fn column_data_page_v2_compression_ratio_threshold(&self, col: 
&ColumnPath) -> f64 {
+        self.column_properties
+            .get(col)
+            .and_then(|c| c.data_page_v2_compression_ratio_threshold())
+            .or_else(|| {
+                self.default_column_properties
+                    .data_page_v2_compression_ratio_threshold()
+            })
+            .unwrap_or(DEFAULT_DATA_PAGE_V2_COMPRESSION_RATIO_THRESHOLD)
+    }
+
     /// Returns encoding for a data page, when dictionary encoding is enabled.
     ///
     /// This is not configurable.
@@ -890,6 +916,35 @@ impl WriterPropertiesBuilder {
         self
     }
 
+    /// Sets the default compression ratio threshold at or above which a Data 
Page
+    /// v2's compressed values are discarded in favor of writing the values
+    /// uncompressed, for all columns (defaults to `1.0` via
+    /// [`DEFAULT_DATA_PAGE_V2_COMPRESSION_RATIO_THRESHOLD`]).
+    ///
+    /// When writing a Data Page v2 with a configured compression codec, the 
writer
+    /// first compresses the values and then compares the compressed size to 
the
+    /// uncompressed size. If `compressed_size >= uncompressed_size * 
threshold`, the
+    /// compressed buffer is discarded and the values are written uncompressed 
for
+    /// that page (the page's `is_compressed` flag is set to `false`).
+    ///
+    /// The default of `1.0` preserves the historical behavior of only keeping
+    /// compression when it strictly reduces the size. Setting a value below 
`1.0`
+    /// requires a minimum amount of size reduction to keep the compressed 
page —
+    /// for example `0.9` requires at least a 10% reduction. Setting a value 
above
+    /// `1.0` keeps the compressed buffer even if it's somewhat larger than the
+    /// uncompressed values.
+    ///
+    /// This setting only affects Data Page v2; Data Page v1 always stores the
+    /// compressor's output regardless of the resulting size.
+    ///
+    /// # Panics
+    /// If `value` is not finite or is not strictly positive.
+    pub fn set_data_page_v2_compression_ratio_threshold(mut self, value: f64) 
-> Self {
+        self.default_column_properties
+            .set_data_page_v2_compression_ratio_threshold(value);
+        self
+    }
+
     /// Sets FileEncryptionProperties (defaults to `None`)
     #[cfg(feature = "encryption")]
     pub fn with_file_encryption_properties(
@@ -1158,6 +1213,22 @@ impl WriterPropertiesBuilder {
         self.get_mut_props(col).set_bloom_filter_ndv(value);
         self
     }
+
+    /// Sets the Data Page v2 compression ratio threshold for a specific 
column.
+    ///
+    /// Takes precedence over 
[`Self::set_data_page_v2_compression_ratio_threshold`].
+    ///
+    /// # Panics
+    /// If `value` is not finite or is not strictly positive.
+    pub fn set_column_data_page_v2_compression_ratio_threshold(
+        mut self,
+        col: ColumnPath,
+        value: f64,
+    ) -> Self {
+        self.get_mut_props(col)
+            .set_data_page_v2_compression_ratio_threshold(value);
+        self
+    }
 }
 
 impl From<WriterProperties> for WriterPropertiesBuilder {
@@ -1309,6 +1380,7 @@ struct ColumnProperties {
     bloom_filter_properties: Option<BloomFilterProperties>,
     /// Whether the bloom filter NDV was explicitly set by the user
     bloom_filter_ndv_is_set: bool,
+    data_page_v2_compression_ratio_threshold: Option<f64>,
 }
 
 impl ColumnProperties {
@@ -1395,6 +1467,18 @@ impl ColumnProperties {
         self.bloom_filter_ndv_is_set = true;
     }
 
+    /// Sets the Data Page v2 compression ratio threshold for this column.
+    ///
+    /// # Panics
+    /// If `value` is not finite or is not strictly positive.
+    fn set_data_page_v2_compression_ratio_threshold(&mut self, value: f64) {
+        assert!(
+            value.is_finite() && value > 0.0,
+            "data_page_v2_compression_ratio_threshold must be a positive 
finite number, got {value}"
+        );
+        self.data_page_v2_compression_ratio_threshold = Some(value);
+    }
+
     /// Returns optional encoding for this column.
     fn encoding(&self) -> Option<Encoding> {
         self.encoding
@@ -1441,6 +1525,11 @@ impl ColumnProperties {
         self.bloom_filter_properties.as_ref()
     }
 
+    /// Returns optional Data Page v2 compression ratio threshold for this 
column.
+    fn data_page_v2_compression_ratio_threshold(&self) -> Option<f64> {
+        self.data_page_v2_compression_ratio_threshold
+    }
+
     /// If bloom filter is enabled and NDV was not explicitly set, resolve it 
to the
     /// given `default_ndv` (typically derived from `max_row_group_row_count`).
     fn resolve_bloom_filter_ndv(&mut self, default_ndv: u64) {
@@ -1840,6 +1929,34 @@ mod tests {
         );
     }
 
+    #[test]
+    fn 
test_writer_properties_column_data_page_v2_compression_ratio_threshold() {
+        let props = WriterProperties::builder()
+            .set_data_page_v2_compression_ratio_threshold(0.5)
+            
.set_column_data_page_v2_compression_ratio_threshold(ColumnPath::from("col"), 
0.1)
+            .build();
+
+        assert_eq!(props.data_page_v2_compression_ratio_threshold(), 0.5);
+        assert_eq!(
+            
props.column_data_page_v2_compression_ratio_threshold(&ColumnPath::from("col")),
+            0.1
+        );
+        assert_eq!(
+            
props.column_data_page_v2_compression_ratio_threshold(&ColumnPath::from("other")),
+            0.5
+        );
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "data_page_v2_compression_ratio_threshold must be a 
positive finite number"
+    )]
+    fn 
test_writer_properties_panic_on_invalid_data_page_v2_compression_ratio_threshold()
 {
+        WriterProperties::builder()
+            .set_data_page_v2_compression_ratio_threshold(0.0)
+            .build();
+    }
+
     #[test]
     fn test_writer_properties_column_dictionary_page_size_limit() {
         let props = WriterProperties::builder()

Reply via email to