This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new c09945f3f4 parquet: fix panic in DeltaByteArrayDecoder on invalid 
prefix lengths (#9797)
c09945f3f4 is described below

commit c09945f3f487156a0ac0a3ca9947294f1aae5383
Author: pchintar <[email protected]>
AuthorDate: Fri Apr 24 18:44:39 2026 -0400

    parquet: fix panic in DeltaByteArrayDecoder on invalid prefix lengths 
(#9797)
    
    # Which issue does this PR close?
    
    - Closes #9796 .
    
    # Rationale for this change
    
    Currently, `DeltaByteArrayDecoder::get` assumes prefix lengths are
    always valid and directly slices `previous_value`. Invalid prefix
    lengths (negative or exceeding previous value length) can cause a panic
    instead of returning an error.
    
    # What changes are included in this PR?
    
    * Add validation for decoded prefix lengths:
    
      * reject negative values
      * reject values exceeding `previous_value.len()`
    * Return `Err` instead of panicking on invalid input
    * Add a regression test using corrupted encoded data
    
    # Are these changes tested?
    
    Yes.
    
    * Added `test_delta_byte_array_invalid_prefix_len_returns_error`
    * Test:
    
      * encodes valid data
      * corrupts prefix-length stream
      * verifies decoder returns `Err` (previously panicked)
    * All the other existing tests pass
    
    # Are there any user-facing changes?
    
    No.
    
    * No API changes
    * Only improves error handling for invalid input
---
 parquet/src/encodings/decoding.rs | 97 ++++++++++++++++++++++++++++++++++++++-
 1 file changed, 96 insertions(+), 1 deletion(-)

diff --git a/parquet/src/encodings/decoding.rs 
b/parquet/src/encodings/decoding.rs
index f7f4d9be47..da07d23eb6 100644
--- a/parquet/src/encodings/decoding.rs
+++ b/parquet/src/encodings/decoding.rs
@@ -1134,7 +1134,21 @@ impl<T: DataType> Decoder<T> for 
DeltaByteArrayDecoder<T> {
                     let suffix = v[0].data();
 
                     // Extract current prefix length, can be 0
-                    let prefix_len = self.prefix_lengths[self.current_idx] as 
usize;
+                    let prefix_len = 
usize::try_from(self.prefix_lengths[self.current_idx])
+                        .map_err(|_| {
+                            general_err!(
+                                "Invalid DELTA_BYTE_ARRAY prefix length {}",
+                                self.prefix_lengths[self.current_idx]
+                            )
+                        })?;
+
+                    if prefix_len > self.previous_value.len() {
+                        return Err(general_err!(
+                            "Invalid DELTA_BYTE_ARRAY prefix length {} exceeds 
previous value length {}",
+                            prefix_len,
+                            self.previous_value.len()
+                        ));
+                    }
 
                     // Concatenate prefix with suffix
                     let mut result = Vec::with_capacity(prefix_len + 
suffix.len());
@@ -1182,6 +1196,87 @@ mod tests {
     use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, 
Type as SchemaType};
     use crate::util::test_common::rand_gen::RandGen;
 
+    #[test]
+    fn test_delta_byte_array_invalid_prefix_len_returns_error() {
+        let col_descr = create_test_col_desc_ptr(-1, Type::BYTE_ARRAY);
+
+        let mut encoder =
+            get_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, 
&col_descr).unwrap();
+        let input = vec![ByteArray::from("a"), ByteArray::from("ab")];
+        encoder.put(&input).unwrap();
+        let encoded = encoder.flush_buffer().unwrap();
+
+        // First, decode just the prefix-length stream so we know where the 
suffix stream starts.
+        let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+        prefix_len_decoder
+            .set_data(encoded.clone(), input.len())
+            .unwrap();
+        let num_prefixes = prefix_len_decoder.values_left();
+        let mut prefix_lengths = vec![0; num_prefixes];
+        prefix_len_decoder.get(&mut prefix_lengths).unwrap();
+
+        // check: valid encoding should produce prefix lengths [0, 1]
+        assert_eq!(prefix_lengths, vec![0, 1]);
+
+        let prefix_stream_end = prefix_len_decoder.get_offset();
+
+        // Corrupt the prefix-length stream itself:
+        // replace it with a valid DELTA_BINARY_PACKED stream for [1, 1],
+        // so the first decoded prefix length becomes impossible because 
previous_value is empty.
+        let mut prefix_encoder = get_encoder::<Int32Type>(
+            Encoding::DELTA_BINARY_PACKED,
+            &create_test_col_desc_ptr(-1, Type::INT32),
+        )
+        .unwrap();
+        prefix_encoder.put(&[1i32, 1i32]).unwrap();
+        let corrupted_prefix = prefix_encoder.flush_buffer().unwrap();
+
+        let mut corrupted = Vec::new();
+        corrupted.extend_from_slice(corrupted_prefix.as_ref());
+        corrupted.extend_from_slice(&encoded[prefix_stream_end..]);
+
+        let mut decoder = DeltaByteArrayDecoder::<ByteArrayType>::new();
+        decoder
+            .set_data(Bytes::from(corrupted), input.len())
+            .unwrap();
+
+        let mut out = vec![ByteArray::new(); input.len()];
+
+        let err = decoder.get(&mut out).unwrap_err();
+        assert!(
+            err.to_string()
+                .contains("Invalid DELTA_BYTE_ARRAY prefix length"),
+            "{}",
+            err
+        );
+    }
+
+    #[test]
+    fn test_delta_byte_array_negative_prefix_len_returns_error() {
+        let col_descr = create_test_col_desc_ptr(-1, Type::BYTE_ARRAY);
+
+        let mut encoder =
+            get_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, 
&col_descr).unwrap();
+        let input = vec![ByteArray::from("a"), ByteArray::from("ab")];
+        encoder.put(&input).unwrap();
+        let encoded = encoder.flush_buffer().unwrap();
+
+        let mut decoder = DeltaByteArrayDecoder::<ByteArrayType>::new();
+        decoder.set_data(encoded, input.len()).unwrap();
+
+        // Force a negative prefix length after decoder initialization
+        decoder.prefix_lengths[0] = -1;
+        let mut out = vec![ByteArray::new(); input.len()];
+
+        let err = decoder.get(&mut out).unwrap_err();
+        assert!(
+            err.to_string()
+                .contains("Invalid DELTA_BYTE_ARRAY prefix length"),
+            "{}",
+            err
+        );
+    }
+
     #[test]
     fn test_get_decoders() {
         // supported encodings

Reply via email to