This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new c09945f3f4 parquet: fix panic in DeltaByteArrayDecoder on invalid
prefix lengths (#9797)
c09945f3f4 is described below
commit c09945f3f487156a0ac0a3ca9947294f1aae5383
Author: pchintar <[email protected]>
AuthorDate: Fri Apr 24 18:44:39 2026 -0400
parquet: fix panic in DeltaByteArrayDecoder on invalid prefix lengths
(#9797)
# Which issue does this PR close?
- Closes #9796 .
# Rationale for this change
Currently, `DeltaByteArrayDecoder::get` assumes prefix lengths are
always valid and directly slices `previous_value`. Invalid prefix
lengths (negative or exceeding previous value length) can cause a panic
instead of returning an error.
# What changes are included in this PR?
* Add validation for decoded prefix lengths:
* reject negative values
* reject values exceeding `previous_value.len()`
* Return `Err` instead of panicking on invalid input
* Add a regression test using corrupted encoded data
# Are these changes tested?
Yes.
* Added `test_delta_byte_array_invalid_prefix_len_returns_error`
* Test:
* encodes valid data
* corrupts prefix-length stream
* verifies decoder returns `Err` (previously panicked)
* All the other existing tests pass
# Are there any user-facing changes?
No.
* No API changes
* Only improves error handling for invalid input
---
parquet/src/encodings/decoding.rs | 97 ++++++++++++++++++++++++++++++++++++++-
1 file changed, 96 insertions(+), 1 deletion(-)
diff --git a/parquet/src/encodings/decoding.rs
b/parquet/src/encodings/decoding.rs
index f7f4d9be47..da07d23eb6 100644
--- a/parquet/src/encodings/decoding.rs
+++ b/parquet/src/encodings/decoding.rs
@@ -1134,7 +1134,21 @@ impl<T: DataType> Decoder<T> for
DeltaByteArrayDecoder<T> {
let suffix = v[0].data();
// Extract current prefix length, can be 0
- let prefix_len = self.prefix_lengths[self.current_idx] as
usize;
+ let prefix_len =
usize::try_from(self.prefix_lengths[self.current_idx])
+ .map_err(|_| {
+ general_err!(
+ "Invalid DELTA_BYTE_ARRAY prefix length {}",
+ self.prefix_lengths[self.current_idx]
+ )
+ })?;
+
+ if prefix_len > self.previous_value.len() {
+ return Err(general_err!(
+ "Invalid DELTA_BYTE_ARRAY prefix length {} exceeds
previous value length {}",
+ prefix_len,
+ self.previous_value.len()
+ ));
+ }
// Concatenate prefix with suffix
let mut result = Vec::with_capacity(prefix_len +
suffix.len());
@@ -1182,6 +1196,87 @@ mod tests {
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath,
Type as SchemaType};
use crate::util::test_common::rand_gen::RandGen;
+ #[test]
+ fn test_delta_byte_array_invalid_prefix_len_returns_error() {
+ let col_descr = create_test_col_desc_ptr(-1, Type::BYTE_ARRAY);
+
+ let mut encoder =
+ get_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY,
&col_descr).unwrap();
+ let input = vec![ByteArray::from("a"), ByteArray::from("ab")];
+ encoder.put(&input).unwrap();
+ let encoded = encoder.flush_buffer().unwrap();
+
+ // First, decode just the prefix-length stream so we know where the
suffix stream starts.
+ let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
+ prefix_len_decoder
+ .set_data(encoded.clone(), input.len())
+ .unwrap();
+ let num_prefixes = prefix_len_decoder.values_left();
+ let mut prefix_lengths = vec![0; num_prefixes];
+ prefix_len_decoder.get(&mut prefix_lengths).unwrap();
+
+ // check: valid encoding should produce prefix lengths [0, 1]
+ assert_eq!(prefix_lengths, vec![0, 1]);
+
+ let prefix_stream_end = prefix_len_decoder.get_offset();
+
+ // Corrupt the prefix-length stream itself:
+ // replace it with a valid DELTA_BINARY_PACKED stream for [1, 1],
+ // so the first decoded prefix length becomes impossible because
previous_value is empty.
+ let mut prefix_encoder = get_encoder::<Int32Type>(
+ Encoding::DELTA_BINARY_PACKED,
+ &create_test_col_desc_ptr(-1, Type::INT32),
+ )
+ .unwrap();
+ prefix_encoder.put(&[1i32, 1i32]).unwrap();
+ let corrupted_prefix = prefix_encoder.flush_buffer().unwrap();
+
+ let mut corrupted = Vec::new();
+ corrupted.extend_from_slice(corrupted_prefix.as_ref());
+ corrupted.extend_from_slice(&encoded[prefix_stream_end..]);
+
+ let mut decoder = DeltaByteArrayDecoder::<ByteArrayType>::new();
+ decoder
+ .set_data(Bytes::from(corrupted), input.len())
+ .unwrap();
+
+ let mut out = vec![ByteArray::new(); input.len()];
+
+ let err = decoder.get(&mut out).unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("Invalid DELTA_BYTE_ARRAY prefix length"),
+ "{}",
+ err
+ );
+ }
+
+ #[test]
+ fn test_delta_byte_array_negative_prefix_len_returns_error() {
+ let col_descr = create_test_col_desc_ptr(-1, Type::BYTE_ARRAY);
+
+ let mut encoder =
+ get_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY,
&col_descr).unwrap();
+ let input = vec![ByteArray::from("a"), ByteArray::from("ab")];
+ encoder.put(&input).unwrap();
+ let encoded = encoder.flush_buffer().unwrap();
+
+ let mut decoder = DeltaByteArrayDecoder::<ByteArrayType>::new();
+ decoder.set_data(encoded, input.len()).unwrap();
+
+ // Force a negative prefix length after decoder initialization
+ decoder.prefix_lengths[0] = -1;
+ let mut out = vec![ByteArray::new(); input.len()];
+
+ let err = decoder.get(&mut out).unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("Invalid DELTA_BYTE_ARRAY prefix length"),
+ "{}",
+ err
+ );
+ }
+
#[test]
fn test_get_decoders() {
// supported encodings