This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 0c7cb2ac3f Parquet: Do not compress v2 data page when compress is bad
quality (#8257)
0c7cb2ac3f is described below
commit 0c7cb2ac3f3132216a08fd557f9b1edc7f90060f
Author: mwish <[email protected]>
AuthorDate: Mon Sep 8 18:53:01 2025 +0800
Parquet: Do not compress v2 data page when compress is bad quality (#8257)
# Which issue does this PR close?
- Closes #8256 .
# Rationale for this change
Do not compress v2 data page when compress is bad quality ( compressed
size is greater or equal to uncompressed_size )
# What changes are included in this PR?
Discard compression when it's too large
# Are these changes tested?
Covered by existing
# Are there any user-facing changes?
No
---
parquet/src/column/writer/mod.rs | 48 ++++++++++++++++++++++++++++++++++++----
1 file changed, 44 insertions(+), 4 deletions(-)
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 9374e226b8..82b8ba166f 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -1104,12 +1104,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
rep_levels_byte_len + def_levels_byte_len +
values_data.buf.len();
// Data Page v2 compresses values only.
- match self.compressor {
+ let is_compressed = match self.compressor {
Some(ref mut cmpr) => {
+ let buffer_len = buffer.len();
cmpr.compress(&values_data.buf, &mut buffer)?;
+ if uncompressed_size <= buffer.len() - buffer_len {
+ buffer.truncate(buffer_len);
+ buffer.extend_from_slice(&values_data.buf);
+ false
+ } else {
+ true
+ }
}
- None => buffer.extend_from_slice(&values_data.buf),
- }
+ None => {
+ buffer.extend_from_slice(&values_data.buf);
+ false
+ }
+ };
let data_page = Page::DataPageV2 {
buf: buffer.into(),
@@ -1119,7 +1130,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
num_rows: self.page_metrics.num_buffered_rows,
def_levels_byte_len: def_levels_byte_len as u32,
rep_levels_byte_len: rep_levels_byte_len as u32,
- is_compressed: self.compressor.is_some(),
+ is_compressed,
statistics: page_statistics,
};
@@ -4236,4 +4247,33 @@ mod tests {
.unwrap();
ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level,
path)
}
+
+ #[test]
+ fn test_page_v2_snappy_compression_fallback() {
+ // Test that PageV2 sets is_compressed to false when Snappy
compression increases data size
+ let page_writer = TestPageWriter {};
+
+ // Create WriterProperties with PageV2 and Snappy compression
+ let props = WriterProperties::builder()
+ .set_writer_version(WriterVersion::PARQUET_2_0)
+ // Disable dictionary to ensure data is written directly
+ .set_dictionary_enabled(false)
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ let mut column_writer =
+ get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0,
0, Arc::new(props));
+
+ // Create small, simple data that Snappy compression will likely
increase in size
+ // due to compression overhead for very small data
+ let values = vec![ByteArray::from("a")];
+
+ column_writer.write_batch(&values, None, None).unwrap();
+
+ let result = column_writer.close().unwrap();
+ assert_eq!(
+ result.metadata.uncompressed_size(),
+ result.metadata.compressed_size()
+ );
+ }
}