This is an automated email from the ASF dual-hosted git repository.
Jefffrey pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 43d22d82c1 fix(parquet): return error instead of panicking in
pad_nulls on corrupt input (#10108)
43d22d82c1 is described below
commit 43d22d82c1571f9442925c07e83d6b54279a1457
Author: Eric Li <[email protected]>
AuthorDate: Thu Jun 18 20:45:04 2026 -0700
fix(parquet): return error instead of panicking in pad_nulls on corrupt
input (#10108)
# Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax.
-->
- Closes #10107 .
# Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
ValuesBuffer::pad_nulls asserted internal invariants that corrupt
parquet data can violate, panicking instead of surfacing a decode error.
# What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
Change pad_nulls to return Result and convert the assertions in
OffsetBuffer and FixedLenByteArrayBuffer into ParquetError::General,
propagating at the record reader call site.
# Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
If this PR claims a performance improvement, please include evidence
such as benchmark results.
-->
Adds a regression test.
# Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
If there are any breaking changes to public APIs, please call them out.
-->
No breakages to public API, just returns decode Err instead of
panicking.
---
parquet/src/arrow/array_reader/byte_array.rs | 8 ++-
.../arrow/array_reader/byte_array_dictionary.rs | 12 +++--
parquet/src/arrow/array_reader/byte_view_array.rs | 4 +-
.../src/arrow/array_reader/fixed_len_byte_array.rs | 11 ++++-
parquet/src/arrow/buffer/dictionary_buffer.rs | 10 ++--
parquet/src/arrow/buffer/offset_buffer.rs | 57 +++++++++++++++++++---
parquet/src/arrow/buffer/view_buffer.rs | 9 ++--
parquet/src/arrow/record_reader/buffer.rs | 8 ++-
parquet/src/arrow/record_reader/mod.rs | 2 +-
9 files changed, 95 insertions(+), 26 deletions(-)
diff --git a/parquet/src/arrow/array_reader/byte_array.rs
b/parquet/src/arrow/array_reader/byte_array.rs
index cf40d0576d..db50313046 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -652,7 +652,9 @@ mod tests {
let valid = [false, false, true, true, false, true, true, false,
false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
- output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
+ output
+ .pad_nulls(0, 4, valid.len(), valid_buffer.as_slice())
+ .unwrap();
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
let strings =
array.as_any().downcast_ref::<StringArray>().unwrap();
@@ -706,7 +708,9 @@ mod tests {
let valid = [false, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
- output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice());
+ output
+ .pad_nulls(0, 2, valid.len(), valid_buffer.as_slice())
+ .unwrap();
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
let strings =
array.as_any().downcast_ref::<StringArray>().unwrap();
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 01dd9bcf8b..ba14b4d6b0 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -433,7 +433,9 @@ mod tests {
let mut valid = vec![false, false, true, true, false, true];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
- output.pad_nulls(0, 3, valid.len(), valid_buffer.as_slice());
+ output
+ .pad_nulls(0, 3, valid.len(), valid_buffer.as_slice())
+ .unwrap();
assert!(matches!(output, DictionaryBuffer::Dict { .. }));
@@ -441,7 +443,7 @@ mod tests {
valid.extend_from_slice(&[false, false, true, true, false, true, true,
false]);
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
- output.pad_nulls(6, 4, 8, valid_buffer.as_slice());
+ output.pad_nulls(6, 4, 8, valid_buffer.as_slice()).unwrap();
assert!(matches!(output, DictionaryBuffer::Dict { .. }));
@@ -512,7 +514,7 @@ mod tests {
let valid = [true, true, true, true, true];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
- output.pad_nulls(0, 5, 5, valid_buffer.as_slice());
+ output.pad_nulls(0, 5, 5, valid_buffer.as_slice()).unwrap();
assert!(matches!(output, DictionaryBuffer::Dict { .. }));
@@ -656,7 +658,7 @@ mod tests {
decoder.set_data(encoding, page, 8, None).unwrap();
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
- output.pad_nulls(0, 0, 8, &[0]);
+ output.pad_nulls(0, 0, 8, &[0]).unwrap();
let array = output
.into_array(Some(Buffer::from(&[0])), &data_type)
.unwrap();
@@ -671,7 +673,7 @@ mod tests {
decoder.set_data(encoding, page, 8, None).unwrap();
assert_eq!(decoder.skip_values(1024).unwrap(), 0);
- output.pad_nulls(0, 0, 8, &[0]);
+ output.pad_nulls(0, 0, 8, &[0]).unwrap();
let array = output
.into_array(Some(Buffer::from(&[0])), &data_type)
.unwrap();
diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs
b/parquet/src/arrow/array_reader/byte_view_array.rs
index c134261609..ba78502145 100644
--- a/parquet/src/arrow/array_reader/byte_view_array.rs
+++ b/parquet/src/arrow/array_reader/byte_view_array.rs
@@ -794,7 +794,9 @@ mod tests {
let valid = [false, false, true, true, false, true, true, false,
false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
- output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
+ output
+ .pad_nulls(0, 4, valid.len(), valid_buffer.as_slice())
+ .unwrap();
let array = output.into_array(Some(valid_buffer),
&ArrowType::Utf8View);
let strings =
array.as_any().downcast_ref::<StringViewArray>().unwrap();
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index f7e83510cf..f48a219160 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -310,10 +310,16 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
values_read: usize,
levels_read: usize,
valid_mask: &[u8],
- ) {
+ ) -> Result<()> {
let byte_length = self.byte_length.unwrap_or_default();
- assert_eq!(self.buffer.len(), (read_offset + values_read) *
byte_length);
+ if self.buffer.len() != (read_offset + values_read) * byte_length {
+ return Err(general_err!(
+ "found inconsistent buffer length while padding nulls:
expected {} bytes, got {}",
+ (read_offset + values_read) * byte_length,
+ self.buffer.len()
+ ));
+ }
self.buffer
.resize((read_offset + levels_read) * byte_length, 0);
@@ -339,6 +345,7 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
};
move_values(&mut self.buffer, byte_length, values_range,
valid_mask, op);
}
+ Ok(())
}
}
diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs
b/parquet/src/arrow/buffer/dictionary_buffer.rs
index c3cd5744d2..abf7653029 100644
--- a/parquet/src/arrow/buffer/dictionary_buffer.rs
+++ b/parquet/src/arrow/buffer/dictionary_buffer.rs
@@ -206,7 +206,7 @@ impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer
for DictionaryBuffer<K
values_read: usize,
levels_read: usize,
valid_mask: &[u8],
- ) {
+ ) -> Result<()> {
match self {
Self::Dict { keys, .. } => {
keys.resize(read_offset + levels_read, K::default());
@@ -294,7 +294,9 @@ mod tests {
let mut valid = vec![false, false, true, true, false, true, true,
true];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
- buffer.pad_nulls(0, values.len(), valid.len(),
valid_buffer.as_slice());
+ buffer
+ .pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice())
+ .unwrap();
// Read some data not preserving the dictionary
@@ -305,7 +307,9 @@ mod tests {
valid.extend_from_slice(&[false, false, true, false, true]);
let null_buffer = Buffer::from_iter(valid.iter().cloned());
- buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
+ buffer
+ .pad_nulls(read_offset, 2, 5, null_buffer.as_slice())
+ .unwrap();
assert_eq!(buffer.len(), 13);
let split = std::mem::replace(&mut buffer,
DictionaryBuffer::with_capacity(0));
diff --git a/parquet/src/arrow/buffer/offset_buffer.rs
b/parquet/src/arrow/buffer/offset_buffer.rs
index 2d9d1c9b6b..3090842679 100644
--- a/parquet/src/arrow/buffer/offset_buffer.rs
+++ b/parquet/src/arrow/buffer/offset_buffer.rs
@@ -157,8 +157,14 @@ impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
values_read: usize,
levels_read: usize,
valid_mask: &[u8],
- ) {
- assert_eq!(self.offsets.len(), read_offset + values_read + 1);
+ ) -> Result<()> {
+ if self.offsets.len() != read_offset + values_read + 1 {
+ return Err(general_err!(
+ "found inconsistent offsets while padding nulls: expected {}
offsets, got {}",
+ read_offset + values_read + 1,
+ self.offsets.len()
+ ));
+ }
self.offsets
.resize(read_offset + levels_read + 1, I::default());
@@ -173,8 +179,9 @@ impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
.rev()
.zip(iter_set_bits_rev(valid_mask))
{
- assert!(level_pos >= value_pos);
- assert!(level_pos < last_pos);
+ if level_pos < value_pos || level_pos >= last_pos {
+ return Err(general_err!("found corrupt level data while
padding nulls"));
+ }
let end_offset = offsets[value_pos + 1];
let start_offset = offsets[value_pos];
@@ -185,7 +192,7 @@ impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
}
if level_pos == value_pos {
- return;
+ return Ok(());
}
offsets[level_pos] = start_offset;
@@ -197,6 +204,7 @@ impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
for x in &mut offsets[values_range.start + 1..last_pos] {
*x = last_start_offset
}
+ Ok(())
}
}
@@ -268,7 +276,9 @@ mod tests {
let valid_mask = Buffer::from_iter(valid.iter().copied());
// Both trailing and leading nulls
- buffer.pad_nulls(1, values.len() - 1, valid.len() - 1,
valid_mask.as_slice());
+ buffer
+ .pad_nulls(1, values.len() - 1, valid.len() - 1,
valid_mask.as_slice())
+ .unwrap();
let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
@@ -330,11 +340,44 @@ mod tests {
buffer.check_valid_utf8(12).unwrap_err();
}
+ #[test]
+ fn test_pad_nulls_corrupt_input_returns_err() {
+ // Corrupt input must produce a decode error rather than panicking.
+
+ // Offsets inconsistent with `values_read`: only one value was pushed,
+ // but three are claimed to have been read.
+ let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
+ buffer.try_push("a".as_bytes(), false).unwrap();
+ let valid_mask = Buffer::from_iter([true, false, false]);
+ let err = buffer
+ .pad_nulls(0, 3, 3, valid_mask.as_slice())
+ .unwrap_err();
+ assert!(
+ err.to_string().contains("inconsistent offsets"),
+ "unexpected error: {err}"
+ );
+
+ // Valid mask has fewer set bits than `values_read`, which previously
+ // tripped an assertion in the null-padding loop.
+ let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
+ for v in ["a", "b", "c"] {
+ buffer.try_push(v.as_bytes(), false).unwrap();
+ }
+ let valid_mask = Buffer::from_iter([true, false, false]);
+ let err = buffer
+ .pad_nulls(0, 3, 3, valid_mask.as_slice())
+ .unwrap_err();
+ assert!(
+ err.to_string().contains("corrupt level data"),
+ "unexpected error: {err}"
+ );
+ }
+
#[test]
fn test_pad_nulls_empty() {
let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
let valid_mask = Buffer::from_iter(std::iter::repeat_n(false, 9));
- buffer.pad_nulls(0, 0, 9, valid_mask.as_slice());
+ buffer.pad_nulls(0, 0, 9, valid_mask.as_slice()).unwrap();
let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
diff --git a/parquet/src/arrow/buffer/view_buffer.rs
b/parquet/src/arrow/buffer/view_buffer.rs
index b1bdeb64e5..f20ca116e9 100644
--- a/parquet/src/arrow/buffer/view_buffer.rs
+++ b/parquet/src/arrow/buffer/view_buffer.rs
@@ -16,6 +16,7 @@
// under the License.
use crate::arrow::record_reader::buffer::ValuesBuffer;
+use crate::errors::Result;
use arrow_array::{ArrayRef, BinaryViewArray, StringViewArray};
use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer};
use arrow_schema::DataType as ArrowType;
@@ -81,9 +82,9 @@ impl ValuesBuffer for ViewBuffer {
values_read: usize,
levels_read: usize,
valid_mask: &[u8],
- ) {
+ ) -> Result<()> {
self.views
- .pad_nulls(read_offset, values_read, levels_read, valid_mask);
+ .pad_nulls(read_offset, values_read, levels_read, valid_mask)
}
}
@@ -146,7 +147,9 @@ mod tests {
let valid = [true, false, false, true, false, false, true];
let valid_mask = Buffer::from_iter(valid.iter().copied());
- buffer.pad_nulls(1, 2, valid.len() - 1, valid_mask.as_slice());
+ buffer
+ .pad_nulls(1, 2, valid.len() - 1, valid_mask.as_slice())
+ .unwrap();
let array = buffer.into_array(Some(valid_mask), &ArrowType::Utf8View);
let strings = array
diff --git a/parquet/src/arrow/record_reader/buffer.rs
b/parquet/src/arrow/record_reader/buffer.rs
index 6e0855dda3..40fb598e59 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -16,6 +16,7 @@
// under the License.
use crate::arrow::buffer::bit_util::iter_set_bits_rev;
+use crate::errors::Result;
/// A buffer that supports padding with nulls
pub trait ValuesBuffer {
@@ -39,13 +40,15 @@ pub trait ValuesBuffer {
/// - `levels_read` - the number of levels read
/// - `valid_mask` - a packed mask of valid levels
///
+ /// Returns an error if the inputs are inconsistent, for example because
the
+ /// decoded data was corrupt. This must not panic on such input.
fn pad_nulls(
&mut self,
read_offset: usize,
values_read: usize,
levels_read: usize,
valid_mask: &[u8],
- );
+ ) -> Result<()>;
}
impl<T: Copy + Default> ValuesBuffer for Vec<T> {
@@ -59,7 +62,7 @@ impl<T: Copy + Default> ValuesBuffer for Vec<T> {
values_read: usize,
levels_read: usize,
valid_mask: &[u8],
- ) {
+ ) -> Result<()> {
self.resize(read_offset + levels_read, T::default());
let values_range = read_offset..read_offset + values_read;
@@ -70,5 +73,6 @@ impl<T: Copy + Default> ValuesBuffer for Vec<T> {
}
self[level_pos] = self[value_pos];
}
+ Ok(())
}
}
diff --git a/parquet/src/arrow/record_reader/mod.rs
b/parquet/src/arrow/record_reader/mod.rs
index d2d1326239..625aa674b2 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -251,7 +251,7 @@ where
values_read,
levels_read,
def_levels.nulls().as_slice(),
- );
+ )?;
}
self.num_records += records_read;