This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c7cb2ac3f Parquet: Do not compress v2 data page when compress is bad 
quality (#8257)
0c7cb2ac3f is described below

commit 0c7cb2ac3f3132216a08fd557f9b1edc7f90060f
Author: mwish <[email protected]>
AuthorDate: Mon Sep 8 18:53:01 2025 +0800

    Parquet: Do not compress v2 data page when compress is bad quality (#8257)
    
    # Which issue does this PR close?
    
    - Closes #8256 .
    
    # Rationale for this change
    
    Do not compress v2 data page when compress is bad quality ( compressed
    size is greater or equal to uncompressed_size )
    
    # What changes are included in this PR?
    
    Discard compression when it's too large
    
    # Are these changes tested?
    
    Covered by existing
    
    # Are there any user-facing changes?
    
    No
---
 parquet/src/column/writer/mod.rs | 48 ++++++++++++++++++++++++++++++++++++----
 1 file changed, 44 insertions(+), 4 deletions(-)

diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 9374e226b8..82b8ba166f 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -1104,12 +1104,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
                     rep_levels_byte_len + def_levels_byte_len + 
values_data.buf.len();
 
                 // Data Page v2 compresses values only.
-                match self.compressor {
+                let is_compressed = match self.compressor {
                     Some(ref mut cmpr) => {
+                        let buffer_len = buffer.len();
                         cmpr.compress(&values_data.buf, &mut buffer)?;
+                        if uncompressed_size <= buffer.len() - buffer_len {
+                            buffer.truncate(buffer_len);
+                            buffer.extend_from_slice(&values_data.buf);
+                            false
+                        } else {
+                            true
+                        }
                     }
-                    None => buffer.extend_from_slice(&values_data.buf),
-                }
+                    None => {
+                        buffer.extend_from_slice(&values_data.buf);
+                        false
+                    }
+                };
 
                 let data_page = Page::DataPageV2 {
                     buf: buffer.into(),
@@ -1119,7 +1130,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
                     num_rows: self.page_metrics.num_buffered_rows,
                     def_levels_byte_len: def_levels_byte_len as u32,
                     rep_levels_byte_len: rep_levels_byte_len as u32,
-                    is_compressed: self.compressor.is_some(),
+                    is_compressed,
                     statistics: page_statistics,
                 };
 
@@ -4236,4 +4247,33 @@ mod tests {
             .unwrap();
         ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, 
path)
     }
+
+    #[test]
+    fn test_page_v2_snappy_compression_fallback() {
+        // Test that PageV2 sets is_compressed to false when Snappy 
compression increases data size
+        let page_writer = TestPageWriter {};
+
+        // Create WriterProperties with PageV2 and Snappy compression
+        let props = WriterProperties::builder()
+            .set_writer_version(WriterVersion::PARQUET_2_0)
+            // Disable dictionary to ensure data is written directly
+            .set_dictionary_enabled(false)
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let mut column_writer =
+            get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 
0, Arc::new(props));
+
+        // Create small, simple data that Snappy compression will likely 
increase in size
+        // due to compression overhead for very small data
+        let values = vec![ByteArray::from("a")];
+
+        column_writer.write_batch(&values, None, None).unwrap();
+
+        let result = column_writer.close().unwrap();
+        assert_eq!(
+            result.metadata.uncompressed_size(),
+            result.metadata.compressed_size()
+        );
+    }
 }

Reply via email to