This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 86cab0c58f fix(parquet): fix CDC panic on nested ListArrays with null
entries (#9644)
86cab0c58f is described below
commit 86cab0c58f013a5c68bcf47163a1d279ff7018d5
Author: Krisztián Szűcs <[email protected]>
AuthorDate: Tue Apr 7 20:13:49 2026 +0200
fix(parquet): fix CDC panic on nested ListArrays with null entries (#9644)
The CDC chunker's value_offset diverged from actual leaf array positions
when null list entries had non-empty child offset ranges (valid per the
Arrow columnar format spec). This caused slice_for_chunk to produce
incorrect non_null_indices, leading to an out-of-bounds panic in
write_mini_batch.
Track non-null value counts (nni) separately from leaf slot counts in
the chunker, and use them in slice_for_chunk to correctly index into
non_null_indices regardless of gaps in the leaf array.
# Which issue does this PR close?
Closes https://github.com/apache/arrow-rs/issues/9637
# Rationale for this change
The Arrow spec allows null list entries to own non-empty child segments.
When such arrays were written with CDC enabled, `CdcChunk::value_offset`
diverged from the actual index into `non_null_indices`, causing an
out-of-bounds panic.
# What changes are included in this PR?
- Redefine `CdcChunk::value_offset`/`num_values` as index/count into
`non_null_indices` instead of leaf array positions.
- Introduce `leaf_offset` in the nested branch to track leaf array
position for hashing separately from `value_offset`.
- Rewrite `slice_for_chunk` to directly index `non_null_indices`.
# Are these changes tested?
Yes. Unit tests for `slice_for_chunk` (nested nulls, all-null chunks)
and end-to-end roundtrip tests for lists with non-empty null segments.
# Are there any user-facing changes?
No.
---
parquet/src/arrow/arrow_writer/levels.rs | 203 ++++++++++++++-------------
parquet/src/column/chunker/cdc.rs | 234 +++++++++++++++++++++++++++----
parquet/src/column/chunker/mod.rs | 6 +-
3 files changed, 318 insertions(+), 125 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/levels.rs
b/parquet/src/arrow/arrow_writer/levels.rs
index d1da24872c..8374e905e1 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -805,37 +805,29 @@ impl ArrayLevels {
/// Create a sliced view of this `ArrayLevels` for a CDC chunk.
///
- /// Note: `def_levels`, `rep_levels`, and `non_null_indices` are copied
(not zero-copy),
- /// while `array` is sliced without copying.
+ /// The chunk's `value_offset`/`num_values` select the relevant slice of
+ /// `non_null_indices`. The array is sliced to the range covered by
+ /// those indices, and they are shifted to be relative to the slice.
pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
- let level_offset = chunk.level_offset;
- let num_levels = chunk.num_levels;
- let value_offset = chunk.value_offset;
- let num_values = chunk.num_values;
- let def_levels = self
- .def_levels
- .as_ref()
- .map(|levels| levels[level_offset..level_offset +
num_levels].to_vec());
- let rep_levels = self
- .rep_levels
- .as_ref()
- .map(|levels| levels[level_offset..level_offset +
num_levels].to_vec());
-
- // Filter non_null_indices to [value_offset, value_offset + num_values)
- // and shift by -value_offset. Use binary search since the slice is
sorted.
- let value_end = value_offset + num_values;
- let start = self
- .non_null_indices
- .partition_point(|&idx| idx < value_offset);
- let end = self
- .non_null_indices
- .partition_point(|&idx| idx < value_end);
- let non_null_indices: Vec<usize> = self.non_null_indices[start..end]
- .iter()
- .map(|&idx| idx - value_offset)
- .collect();
+ let def_levels = self.def_levels.as_ref().map(|levels| {
+ levels[chunk.level_offset..chunk.level_offset +
chunk.num_levels].to_vec()
+ });
+ let rep_levels = self.rep_levels.as_ref().map(|levels| {
+ levels[chunk.level_offset..chunk.level_offset +
chunk.num_levels].to_vec()
+ });
- let array = self.array.slice(value_offset, num_values);
+ // Select the non-null indices for this chunk.
+ let nni =
&self.non_null_indices[chunk.value_offset..chunk.value_offset +
chunk.num_values];
+ // Compute the array range spanned by the non-null indices.
+ // When nni is empty (all-null chunk), start=0, end=0 → zero-length
+ // array slice; write_batch_internal will process only the def/rep
+ // levels and write no values.
+ let start = nni.first().copied().unwrap_or(0);
+ let end = nni.last().map_or(0, |&i| i + 1);
+ // Shift indices to be relative to the sliced array.
+ let non_null_indices = nni.iter().map(|&idx| idx - start).collect();
+ // Slice the array to the computed range.
+ let array = self.array.slice(start, end - start);
let logical_nulls = array.logical_nulls();
Self {
@@ -2149,9 +2141,8 @@ mod tests {
fn test_slice_for_chunk_flat() {
// Case 1: required field (max_def_level=0, no def/rep levels stored).
// Array has 6 values; all are non-null so non_null_indices covers
every position.
- // The chunk selects value_offset=2, num_values=3 → the sub-array [3,
4, 5].
- // Since there are no levels, num_levels=0 and level_offset are
irrelevant.
- // non_null_indices [0,1,2,3,4,5] filtered to [2,4) and shifted by -2
→ [0,1,2].
+ // value_offset=2, num_values=3 → non_null_indices[2..5] = [2,3,4].
+ // Array is sliced (no def_levels → write_batch_internal uses
values.len()).
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5,
6]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
@@ -2176,14 +2167,9 @@ mod tests {
// Case 2: optional field (max_def_level=1, def levels present, no rep
levels).
// Array: [Some(1), None, Some(3), None, Some(5), Some(6)]
- // def_levels: [1, 0, 1, 0, 1, 1] (1=non-null, 0=null)
- // non_null_indices: [0, 2, 4, 5] (array positions of the four
non-null values)
- //
- // The chunk selects level_offset=1, num_levels=3, value_offset=1,
num_values=3:
- // - def_levels[1..4] = [0, 1, 0] → null, non-null, null
- // - sub-array slice(1, 3) = [None, Some(3), None]
- // - non_null_indices filtered to [value_offset=1, value_end=4):
only index 2 qualifies,
- // shifted by -1 → [1] (position of Some(3) within the sliced
sub-array)
+ // non_null_indices: [0, 2, 4, 5]
+ // value_offset=1, num_values=1 → non_null_indices[1..2] = [2].
+ // Array is not sliced (def_levels present → num_levels from
def_levels.len()).
let array: ArrayRef = Arc::new(Int32Array::from(vec![
Some(1),
None,
@@ -2206,90 +2192,111 @@ mod tests {
level_offset: 1,
num_levels: 3,
value_offset: 1,
- num_values: 3,
+ num_values: 1,
});
assert_eq!(sliced.def_levels, Some(vec![0, 1, 0]));
assert!(sliced.rep_levels.is_none());
- assert_eq!(sliced.non_null_indices, vec![1]);
- assert_eq!(sliced.array.len(), 3);
+ assert_eq!(sliced.non_null_indices, vec![0]); // [2] shifted by -2
(nni[0])
+ assert_eq!(sliced.array.len(), 1);
}
#[test]
- fn test_slice_for_chunk_nested() {
- // [[1,2],[3],[4,5]]: def=[2,2,2,2,2], rep=[0,1,0,0,1]
- // Slice levels 2..5 (def=[2,2,2], rep=[0,0,1]), values 2..5
- let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
+ fn test_slice_for_chunk_nested_with_nulls() {
+ // Regression test for https://github.com/apache/arrow-rs/issues/9637
+ //
+ // Simulates a List<Int32?> where null list entries have non-zero child
+ // ranges (valid per Arrow spec: "a null value may correspond to a
+ // non-empty segment in the child array"). This creates gaps in the
+ // leaf array that don't correspond to any levels.
+ //
+ // 5 rows with 2 null list entries owning non-empty child ranges:
+ // row 0: [1] → leaf[0]
+ // row 1: null list → owns leaf[1..3] (gap of 2)
+ // row 2: [2, null] → leaf[3], leaf[4]=null element
+ // row 3: null list → owns leaf[5..8] (gap of 3)
+ // row 4: [4, 5] → leaf[8], leaf[9]
+ //
+ // def_levels: [3, 0, 3, 2, 0, 3, 3]
+ // rep_levels: [0, 0, 0, 1, 0, 0, 1]
+ // non_null_indices: [0, 3, 8, 9]
+ // gaps in array: 0→3 (skip 1,2), 3→8 (skip 5,6,7)
+ let array: ArrayRef = Arc::new(Int32Array::from(vec![
+ Some(1), // 0: row 0
+ None, // 1: gap (null list row 1)
+ None, // 2: gap (null list row 1)
+ Some(2), // 3: row 2
+ None, // 4: row 2, null element
+ None, // 5: gap (null list row 3)
+ None, // 6: gap (null list row 3)
+ None, // 7: gap (null list row 3)
+ Some(4), // 8: row 4
+ Some(5), // 9: row 4
+ ]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
- def_levels: Some(vec![2, 2, 2, 2, 2]),
- rep_levels: Some(vec![0, 1, 0, 0, 1]),
- non_null_indices: vec![0, 1, 2, 3, 4],
- max_def_level: 2,
+ def_levels: Some(vec![3, 0, 3, 2, 0, 3, 3]),
+ rep_levels: Some(vec![0, 0, 0, 1, 0, 0, 1]),
+ non_null_indices: vec![0, 3, 8, 9],
+ max_def_level: 3,
max_rep_level: 1,
array,
logical_nulls,
};
- let sliced = levels.slice_for_chunk(&CdcChunk {
+
+ // Chunk 0: rows 0-1, nni=[0] → array sliced to [0..1]
+ let chunk0 = levels.slice_for_chunk(&CdcChunk {
+ level_offset: 0,
+ num_levels: 2,
+ value_offset: 0,
+ num_values: 1,
+ });
+ assert_eq!(chunk0.non_null_indices, vec![0]);
+ assert_eq!(chunk0.array.len(), 1);
+
+ // Chunk 1: rows 2-3, nni=[3] → array sliced to [3..4]
+ let chunk1 = levels.slice_for_chunk(&CdcChunk {
level_offset: 2,
num_levels: 3,
+ value_offset: 1,
+ num_values: 1,
+ });
+ assert_eq!(chunk1.non_null_indices, vec![0]);
+ assert_eq!(chunk1.array.len(), 1);
+
+ // Chunk 2: row 4, nni=[8, 9] → array sliced to [8..10]
+ let chunk2 = levels.slice_for_chunk(&CdcChunk {
+ level_offset: 5,
+ num_levels: 2,
value_offset: 2,
- num_values: 3,
+ num_values: 2,
});
- assert_eq!(sliced.def_levels, Some(vec![2, 2, 2]));
- assert_eq!(sliced.rep_levels, Some(vec![0, 0, 1]));
- // [0,1,2,3,4] filtered to [2,5) → [2,3,4] → shifted -2 → [0,1,2]
- assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
- assert_eq!(sliced.array.len(), 3);
+ assert_eq!(chunk2.non_null_indices, vec![0, 1]);
+ assert_eq!(chunk2.array.len(), 2);
}
#[test]
- fn test_slice_for_chunk_non_null_indices_boundary() {
- // [1, null, 3]: non_null_indices=[0, 2]; test inclusive lower /
exclusive upper bounds
- let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None,
Some(3)]));
+ fn test_slice_for_chunk_all_null() {
+ // All-null chunk: num_values=0 → empty nni slice → zero-length array.
+ let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None,
None, Some(4)]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
- def_levels: Some(vec![1, 0, 1]),
+ def_levels: Some(vec![1, 0, 0, 1]),
rep_levels: None,
- non_null_indices: vec![0, 2],
+ non_null_indices: vec![0, 3],
max_def_level: 1,
max_rep_level: 0,
array,
logical_nulls,
};
- assert_eq!(
- levels
- .slice_for_chunk(&CdcChunk {
- level_offset: 0,
- num_levels: 1,
- value_offset: 0,
- num_values: 1
- })
- .non_null_indices,
- vec![0]
- );
- // idx 2 in range [1,3), shifted -1 → 1
- assert_eq!(
- levels
- .slice_for_chunk(&CdcChunk {
- level_offset: 1,
- num_levels: 2,
- value_offset: 1,
- num_values: 2
- })
- .non_null_indices,
- vec![1]
- );
- // idx 2 excluded from [1,2)
- assert_eq!(
- levels
- .slice_for_chunk(&CdcChunk {
- level_offset: 1,
- num_levels: 1,
- value_offset: 1,
- num_values: 1
- })
- .non_null_indices,
- Vec::<usize>::new()
- );
+ // Chunk covering only the two null rows (levels 1..3), zero non-null
values.
+ let sliced = levels.slice_for_chunk(&CdcChunk {
+ level_offset: 1,
+ num_levels: 2,
+ value_offset: 1,
+ num_values: 0,
+ });
+ assert_eq!(sliced.def_levels, Some(vec![0, 0]));
+ assert_eq!(sliced.non_null_indices, Vec::<usize>::new());
+ assert_eq!(sliced.array.len(), 0);
}
}
diff --git a/parquet/src/column/chunker/cdc.rs
b/parquet/src/column/chunker/cdc.rs
index f21f58780a..cb4328536d 100644
--- a/parquet/src/column/chunker/cdc.rs
+++ b/parquet/src/column/chunker/cdc.rs
@@ -289,52 +289,100 @@ impl ContentDefinedChunker {
let mut chunks = Vec::new();
let mut prev_offset: usize = 0;
let mut prev_value_offset: usize = 0;
- // Total number of values seen; for non-nested data this equals
num_levels.
- let mut total_values: usize = num_levels;
+ let mut value_offset: usize = 0;
if !has_rep_levels && !has_def_levels {
// Fastest path: non-nested, non-null data.
+ // Every level corresponds to exactly one non-null value, so
+ // value_offset == level_offset and num_values == num_levels.
+ //
+ // Example: required Int32, array = [10, 20, 30]
+ // level: 0 1 2
+ // value_offset: 0 1 2
for offset in 0..num_levels {
roll_value(self, offset);
if self.need_new_chunk() {
chunks.push(CdcChunk {
level_offset: prev_offset,
- value_offset: prev_offset,
num_levels: offset - prev_offset,
+ value_offset: prev_offset,
num_values: offset - prev_offset,
});
prev_offset = offset;
}
}
- // Set the previous value offset to add the last chunk.
prev_value_offset = prev_offset;
+ value_offset = num_levels;
} else if !has_rep_levels {
- // Non-nested data with nulls.
+ // Non-nested data with nulls. value_offset only increments for
+ // non-null values (def == max_def), so it diverges from the
+ // level offset when nulls are present.
+ //
+ // Example: optional Int32, array = [1, null, 2, null, 3]
+ // def_levels: [1, 0, 1, 0, 1]
+ // level: 0 1 2 3 4
+ // value_offset: 0 1 2 (only increments on def==1)
let def_levels = def_levels.expect("def_levels required when
max_def_level > 0");
#[allow(clippy::needless_range_loop)]
for offset in 0..num_levels {
let def_level = def_levels[offset];
self.roll_level(def_level);
if def_level == self.max_def_level {
+ // For non-nested data, the leaf array has one slot per
+ // level (nulls are array elements), so `offset` (the
+ // level index) is the correct array index for hashing.
roll_value(self, offset);
}
+ // Check boundary before incrementing value_offset so that
+ // num_values reflects only entries in the completed chunk.
if self.need_new_chunk() {
chunks.push(CdcChunk {
level_offset: prev_offset,
- value_offset: prev_offset,
num_levels: offset - prev_offset,
- num_values: offset - prev_offset,
+ value_offset: prev_value_offset,
+ num_values: value_offset - prev_value_offset,
});
prev_offset = offset;
+ prev_value_offset = value_offset;
+ }
+ if def_level == self.max_def_level {
+ value_offset += 1;
}
}
- // Set the previous value offset to add the last chunk.
- prev_value_offset = prev_offset;
} else {
- // Nested data with nulls.
+ // Nested data with nulls. Two counters are needed:
+ //
+ // leaf_offset: index into the leaf values array for hashing,
+ // incremented for all leaf slots (def >=
repeated_ancestor_def_level),
+ // including null elements.
+ //
+ // value_offset: index into non_null_indices for chunk
boundaries,
+ // incremented only for non-null leaf values (def ==
max_def_level).
+ //
+ // These diverge when nullable elements exist inside lists.
+ //
+ // Example: List<Int32?> with repeated_ancestor_def_level=2,
max_def=3
+ // row 0: [1, null, 2] (3 leaf slots, 2 non-null)
+ // row 1: [3] (1 leaf slot, 1 non-null)
+ //
+ // leaf array: [1, null, 2, 3]
+ // def_levels: [3, 2, 3, 3]
+ // rep_levels: [0, 1, 1, 0]
+ //
+ // level def leaf_offset value_offset action
+ // ───── ─── ─────────── ────────────
──────────────────────────
+ // 0 3 0 0 roll_value(0),
value++, leaf++
+ // 1 2 1 1 leaf++ only (null
element)
+ // 2 3 2 1 roll_value(2),
value++, leaf++
+ // 3 3 3 2 roll_value(3),
value++, leaf++
+ //
+ // roll_value(2) correctly indexes leaf array position 2 (value
"2").
+ // Using value_offset=1 would index position 1 (the null slot).
+ //
+ // Using value_offset for roll_value would hash the wrong array
slot.
let def_levels = def_levels.expect("def_levels required for nested
data");
let rep_levels = rep_levels.expect("rep_levels required for nested
data");
- let mut value_offset: usize = 0;
+ let mut leaf_offset: usize = 0;
for offset in 0..num_levels {
let def_level = def_levels[offset];
@@ -343,43 +391,45 @@ impl ContentDefinedChunker {
self.roll_level(def_level);
self.roll_level(rep_level);
if def_level == self.max_def_level {
- roll_value(self, value_offset);
+ roll_value(self, leaf_offset);
}
+ // Check boundary before incrementing value_offset so that
+ // num_values reflects only entries in the completed chunk.
if rep_level == 0 && self.need_new_chunk() {
- // If we are at a record boundary and need a new chunk,
create one.
let levels_to_write = offset - prev_offset;
if levels_to_write > 0 {
chunks.push(CdcChunk {
level_offset: prev_offset,
- value_offset: prev_value_offset,
num_levels: levels_to_write,
+ value_offset: prev_value_offset,
num_values: value_offset - prev_value_offset,
});
prev_offset = offset;
prev_value_offset = value_offset;
}
}
- if def_level >= self.repeated_ancestor_def_level {
- // We only increment the value offset if we have a leaf
value.
+ if def_level == self.max_def_level {
value_offset += 1;
}
+ if def_level >= self.repeated_ancestor_def_level {
+ leaf_offset += 1;
+ }
}
- total_values = value_offset;
}
// Add the last chunk if we have any levels left.
if prev_offset < num_levels {
chunks.push(CdcChunk {
level_offset: prev_offset,
- value_offset: prev_value_offset,
num_levels: num_levels - prev_offset,
- num_values: total_values - prev_value_offset,
+ value_offset: prev_value_offset,
+ num_values: value_offset - prev_value_offset,
});
}
#[cfg(debug_assertions)]
- self.validate_chunks(&chunks, num_levels, total_values);
+ self.validate_chunks(&chunks, num_levels, value_offset);
chunks
}
@@ -626,8 +676,9 @@ mod tests {
assert_eq!(chunks1.len(), chunks2.len());
for (a, b) in chunks1.iter().zip(chunks2.iter()) {
assert_eq!(a.level_offset, b.level_offset);
- assert_eq!(a.value_offset, b.value_offset);
assert_eq!(a.num_levels, b.num_levels);
+ assert_eq!(a.value_offset, b.value_offset);
+ assert_eq!(a.num_values, b.num_values);
}
}
@@ -642,17 +693,23 @@ mod tests {
let num_levels = 20;
// def_level=1 means non-null, def_level=0 means null
+ // Pattern: null at indices 0, 3, 6, 9, 12, 15, 18 → 7 nulls, 13
non-null
let def_levels: Vec<i16> = (0..num_levels)
.map(|i| if i % 3 == 0 { 0 } else { 1 })
.collect();
+ let expected_non_null: usize = def_levels.iter().filter(|&&d| d ==
1).count();
let chunks = chunker.calculate(Some(&def_levels), None, num_levels,
|c, i| {
c.roll_fixed::<4>(&(i as i32).to_le_bytes());
});
assert!(!chunks.is_empty());
- let total: usize = chunks.iter().map(|c| c.num_levels).sum();
- assert_eq!(total, num_levels);
+ let total_levels: usize = chunks.iter().map(|c| c.num_levels).sum();
+ let total_values: usize = chunks.iter().map(|c| c.num_values).sum();
+ assert_eq!(total_levels, num_levels);
+ assert_eq!(total_values, expected_non_null);
+ // With nulls present, total_values < total_levels
+ assert!(total_values < total_levels);
}
}
@@ -663,9 +720,12 @@ mod arrow_tests {
use std::borrow::Borrow;
use std::sync::Arc;
+ use arrow::util::data_gen::create_random_batch;
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, BooleanArray, Int32Array, RecordBatch};
- use arrow_schema::{DataType, Field, Schema};
+ use arrow_buffer::Buffer;
+ use arrow_data::ArrayData;
+ use arrow_schema::{DataType, Field, Fields, Schema};
use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use crate::arrow::arrow_writer::ArrowWriter;
@@ -2153,4 +2213,130 @@ mod arrow_tests {
"all chunks after the first must be identical"
);
}
+
+ /// Regression test for <https://github.com/apache/arrow-rs/issues/9637>
+ ///
+ /// Writing nested list data with CDC enabled panicked with an
out-of-bounds
+ /// slice access when null list entries had non-zero child ranges.
+ #[test]
+ fn test_cdc_list_roundtrip() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new(
+ "_1",
+ DataType::List(Arc::new(Field::new_list_field(DataType::Int32,
true))),
+ true,
+ ),
+ Field::new(
+ "_2",
+
DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
+ true,
+ ),
+ Field::new(
+ "_3",
+
DataType::LargeList(Arc::new(Field::new_list_field(DataType::Utf8, true))),
+ true,
+ ),
+ ]));
+ let batch = create_random_batch(schema, 10_000, 0.25, 0.75).unwrap();
+ write_with_cdc_options(
+ &[&batch],
+ CDC_MIN_CHUNK_SIZE,
+ CDC_MAX_CHUNK_SIZE,
+ None,
+ true,
+ );
+ }
+
+ /// Test CDC with deeply nested types: List<List<Int32>>,
List<Struct<List<Int32>>>
+ #[test]
+ fn test_cdc_deeply_nested_roundtrip() {
+ let inner_field = Field::new_list_field(DataType::Int32, true);
+ let inner_type = DataType::List(Arc::new(inner_field));
+ let outer_field = Field::new_list_field(inner_type.clone(), true);
+ let list_list_type = DataType::List(Arc::new(outer_field));
+
+ let struct_inner_field = Field::new_list_field(DataType::Int32, true);
+ let struct_inner_type = DataType::List(Arc::new(struct_inner_field));
+ let struct_fields = Fields::from(vec![Field::new("a",
struct_inner_type, true)]);
+ let struct_type = DataType::Struct(struct_fields);
+ let struct_list_field = Field::new_list_field(struct_type, true);
+ let list_struct_type = DataType::List(Arc::new(struct_list_field));
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("list_list", list_list_type, true),
+ Field::new("list_struct_list", list_struct_type, true),
+ ]));
+ let batch = create_random_batch(schema, 10_000, 0.25, 0.75).unwrap();
+ write_with_cdc_options(
+ &[&batch],
+ CDC_MIN_CHUNK_SIZE,
+ CDC_MAX_CHUNK_SIZE,
+ None,
+ true,
+ );
+ }
+
+ /// Test CDC with list arrays that have non-empty null segments.
+ ///
+ /// Per the Arrow columnar format spec: "a null value may correspond to a
+ /// non-empty segment in the child array". This test constructs such arrays
+ /// manually and verifies the CDC writer handles them correctly.
+ #[test]
+ fn test_cdc_list_non_empty_null_segments() {
+ // Build List<Int32> where null entries own non-zero child ranges:
+ // row 0: [1, 2] offsets[0..2] valid
+ // row 1: null offsets[2..5] null, but owns 3 child values
+ // row 2: [6, 7] offsets[5..7] valid
+ // row 3: null offsets[7..9] null, but owns 2 child values
+ // row 4: [10] offsets[9..10] valid
+ let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+ let offsets = Buffer::from_iter([0_i32, 2, 5, 7, 9, 10]);
+ let null_bitmap = Buffer::from([0b00010101]); // rows 0, 2, 4 valid
+
+ let list_type =
DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false)));
+ let list_data = unsafe {
+ ArrayData::new_unchecked(
+ list_type.clone(),
+ 5,
+ None,
+ Some(null_bitmap),
+ 0,
+ vec![offsets],
+ vec![values.to_data()],
+ )
+ };
+ let list_array = arrow_array::make_array(list_data);
+
+ let schema = Arc::new(Schema::new(vec![Field::new("col", list_type,
true)]));
+ let batch = RecordBatch::try_new(schema, vec![list_array]).unwrap();
+
+ let buf = write_with_cdc_options(
+ &[&batch],
+ CDC_MIN_CHUNK_SIZE,
+ CDC_MAX_CHUNK_SIZE,
+ None,
+ true,
+ );
+ let read = concat_batches(read_batches(&buf));
+ let read_list = read.column(0).as_list::<i32>();
+ assert_eq!(read_list.len(), 5);
+ assert!(read_list.is_valid(0));
+ assert!(read_list.is_null(1));
+ assert!(read_list.is_valid(2));
+ assert!(read_list.is_null(3));
+ assert!(read_list.is_valid(4));
+
+ let get_vals = |i: usize| -> Vec<i32> {
+ read_list
+ .value(i)
+ .as_primitive::<arrow_array::types::Int32Type>()
+ .values()
+ .iter()
+ .copied()
+ .collect()
+ };
+ assert_eq!(get_vals(0), vec![1, 2]);
+ assert_eq!(get_vals(2), vec![6, 7]);
+ assert_eq!(get_vals(4), vec![10]);
+ }
}
diff --git a/parquet/src/column/chunker/mod.rs
b/parquet/src/column/chunker/mod.rs
index c4caf18af6..42631e026d 100644
--- a/parquet/src/column/chunker/mod.rs
+++ b/parquet/src/column/chunker/mod.rs
@@ -31,10 +31,10 @@ pub(crate) use cdc::ContentDefinedChunker;
pub(crate) struct CdcChunk {
/// The start offset of this chunk inside the given levels.
pub level_offset: usize,
- /// The start offset of this chunk inside the given values array.
- pub value_offset: usize,
/// The number of levels in this chunk.
pub num_levels: usize,
- /// The number of values (Arrow array elements) in this chunk.
+ /// The start index into `non_null_indices` for this chunk.
+ pub value_offset: usize,
+ /// The number of `non_null_indices` entries in this chunk.
pub num_values: usize,
}