This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 86cab0c58f fix(parquet): fix CDC panic on nested ListArrays with null 
entries  (#9644)
86cab0c58f is described below

commit 86cab0c58f013a5c68bcf47163a1d279ff7018d5
Author: Krisztián Szűcs <[email protected]>
AuthorDate: Tue Apr 7 20:13:49 2026 +0200

    fix(parquet): fix CDC panic on nested ListArrays with null entries  (#9644)
    
    The CDC chunker's value_offset diverged from actual leaf array positions
    when null list entries had non-empty child offset ranges (valid per the
    Arrow columnar format spec). This caused slice_for_chunk to produce
    incorrect non_null_indices, leading to an out-of-bounds panic in
    write_mini_batch.
    
    Track non-null value counts (nni) separately from leaf slot counts in
    the chunker, and use them in slice_for_chunk to correctly index into
    non_null_indices regardless of gaps in the leaf array.
    
    # Which issue does this PR close?
    
    Closes https://github.com/apache/arrow-rs/issues/9637
    
    # Rationale for this change
    
    The Arrow spec allows null list entries to own non-empty child segments.
    When such arrays were written with CDC enabled, `CdcChunk::value_offset`
    diverged from the actual index into `non_null_indices`, causing an
    out-of-bounds panic.
    
    # What changes are included in this PR?
    
    - Redefine `CdcChunk::value_offset`/`num_values` as index/count into
    `non_null_indices` instead of leaf array positions.
    - Introduce `leaf_offset` in the nested branch to track leaf array
    position for hashing separately from `value_offset`.
    - Rewrite `slice_for_chunk` to directly index `non_null_indices`.
    
    # Are these changes tested?
    
    Yes. Unit tests for `slice_for_chunk` (nested nulls, all-null chunks)
    and end-to-end roundtrip tests for lists with non-empty null segments.
    
    # Are there any user-facing changes?
    
    No.
---
 parquet/src/arrow/arrow_writer/levels.rs | 203 ++++++++++++++-------------
 parquet/src/column/chunker/cdc.rs        | 234 +++++++++++++++++++++++++++----
 parquet/src/column/chunker/mod.rs        |   6 +-
 3 files changed, 318 insertions(+), 125 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/levels.rs 
b/parquet/src/arrow/arrow_writer/levels.rs
index d1da24872c..8374e905e1 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -805,37 +805,29 @@ impl ArrayLevels {
 
     /// Create a sliced view of this `ArrayLevels` for a CDC chunk.
     ///
-    /// Note: `def_levels`, `rep_levels`, and `non_null_indices` are copied 
(not zero-copy),
-    /// while `array` is sliced without copying.
+    /// The chunk's `value_offset`/`num_values` select the relevant slice of
+    /// `non_null_indices`. The array is sliced to the range covered by
+    /// those indices, and they are shifted to be relative to the slice.
     pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
-        let level_offset = chunk.level_offset;
-        let num_levels = chunk.num_levels;
-        let value_offset = chunk.value_offset;
-        let num_values = chunk.num_values;
-        let def_levels = self
-            .def_levels
-            .as_ref()
-            .map(|levels| levels[level_offset..level_offset + 
num_levels].to_vec());
-        let rep_levels = self
-            .rep_levels
-            .as_ref()
-            .map(|levels| levels[level_offset..level_offset + 
num_levels].to_vec());
-
-        // Filter non_null_indices to [value_offset, value_offset + num_values)
-        // and shift by -value_offset. Use binary search since the slice is 
sorted.
-        let value_end = value_offset + num_values;
-        let start = self
-            .non_null_indices
-            .partition_point(|&idx| idx < value_offset);
-        let end = self
-            .non_null_indices
-            .partition_point(|&idx| idx < value_end);
-        let non_null_indices: Vec<usize> = self.non_null_indices[start..end]
-            .iter()
-            .map(|&idx| idx - value_offset)
-            .collect();
+        let def_levels = self.def_levels.as_ref().map(|levels| {
+            levels[chunk.level_offset..chunk.level_offset + 
chunk.num_levels].to_vec()
+        });
+        let rep_levels = self.rep_levels.as_ref().map(|levels| {
+            levels[chunk.level_offset..chunk.level_offset + 
chunk.num_levels].to_vec()
+        });
 
-        let array = self.array.slice(value_offset, num_values);
+        // Select the non-null indices for this chunk.
+        let nni = 
&self.non_null_indices[chunk.value_offset..chunk.value_offset + 
chunk.num_values];
+        // Compute the array range spanned by the non-null indices.
+        // When nni is empty (all-null chunk), start=0, end=0 → zero-length
+        // array slice; write_batch_internal will process only the def/rep
+        // levels and write no values.
+        let start = nni.first().copied().unwrap_or(0);
+        let end = nni.last().map_or(0, |&i| i + 1);
+        // Shift indices to be relative to the sliced array.
+        let non_null_indices = nni.iter().map(|&idx| idx - start).collect();
+        // Slice the array to the computed range.
+        let array = self.array.slice(start, end - start);
         let logical_nulls = array.logical_nulls();
 
         Self {
@@ -2149,9 +2141,8 @@ mod tests {
     fn test_slice_for_chunk_flat() {
         // Case 1: required field (max_def_level=0, no def/rep levels stored).
         // Array has 6 values; all are non-null so non_null_indices covers 
every position.
-        // The chunk selects value_offset=2, num_values=3 → the sub-array [3, 
4, 5].
-        // Since there are no levels, num_levels=0 and level_offset are 
irrelevant.
-        // non_null_indices [0,1,2,3,4,5] filtered to [2,4) and shifted by -2 
→ [0,1,2].
+        // value_offset=2, num_values=3 → non_null_indices[2..5] = [2,3,4].
+        // Array is sliced (no def_levels → write_batch_internal uses 
values.len()).
         let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 
6]));
         let logical_nulls = array.logical_nulls();
         let levels = ArrayLevels {
@@ -2176,14 +2167,9 @@ mod tests {
 
         // Case 2: optional field (max_def_level=1, def levels present, no rep 
levels).
         // Array: [Some(1), None, Some(3), None, Some(5), Some(6)]
-        // def_levels: [1, 0, 1, 0, 1, 1]  (1=non-null, 0=null)
-        // non_null_indices: [0, 2, 4, 5]  (array positions of the four 
non-null values)
-        //
-        // The chunk selects level_offset=1, num_levels=3, value_offset=1, 
num_values=3:
-        //   - def_levels[1..4] = [0, 1, 0]  → null, non-null, null
-        //   - sub-array slice(1, 3) = [None, Some(3), None]
-        //   - non_null_indices filtered to [value_offset=1, value_end=4): 
only index 2 qualifies,
-        //     shifted by -1 → [1]  (position of Some(3) within the sliced 
sub-array)
+        // non_null_indices: [0, 2, 4, 5]
+        // value_offset=1, num_values=1 → non_null_indices[1..2] = [2].
+        // Array is not sliced (def_levels present → num_levels from 
def_levels.len()).
         let array: ArrayRef = Arc::new(Int32Array::from(vec![
             Some(1),
             None,
@@ -2206,90 +2192,111 @@ mod tests {
             level_offset: 1,
             num_levels: 3,
             value_offset: 1,
-            num_values: 3,
+            num_values: 1,
         });
         assert_eq!(sliced.def_levels, Some(vec![0, 1, 0]));
         assert!(sliced.rep_levels.is_none());
-        assert_eq!(sliced.non_null_indices, vec![1]);
-        assert_eq!(sliced.array.len(), 3);
+        assert_eq!(sliced.non_null_indices, vec![0]); // [2] shifted by -2 
(nni[0])
+        assert_eq!(sliced.array.len(), 1);
     }
 
     #[test]
-    fn test_slice_for_chunk_nested() {
-        // [[1,2],[3],[4,5]]: def=[2,2,2,2,2], rep=[0,1,0,0,1]
-        // Slice levels 2..5 (def=[2,2,2], rep=[0,0,1]), values 2..5
-        let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
+    fn test_slice_for_chunk_nested_with_nulls() {
+        // Regression test for https://github.com/apache/arrow-rs/issues/9637
+        //
+        // Simulates a List<Int32?> where null list entries have non-zero child
+        // ranges (valid per Arrow spec: "a null value may correspond to a
+        // non-empty segment in the child array"). This creates gaps in the
+        // leaf array that don't correspond to any levels.
+        //
+        // 5 rows with 2 null list entries owning non-empty child ranges:
+        //   row 0: [1]       → leaf[0]
+        //   row 1: null list → owns leaf[1..3] (gap of 2)
+        //   row 2: [2, null] → leaf[3], leaf[4]=null element
+        //   row 3: null list → owns leaf[5..8] (gap of 3)
+        //   row 4: [4, 5]   → leaf[8], leaf[9]
+        //
+        // def_levels: [3,  0,  3, 2,  0,  3, 3]
+        // rep_levels: [0,  0,  0, 1,  0,  0, 1]
+        // non_null_indices: [0, 3, 8, 9]
+        //   gaps in array: 0→3 (skip 1,2), 3→8 (skip 5,6,7)
+        let array: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(1), // 0: row 0
+            None,    // 1: gap (null list row 1)
+            None,    // 2: gap (null list row 1)
+            Some(2), // 3: row 2
+            None,    // 4: row 2, null element
+            None,    // 5: gap (null list row 3)
+            None,    // 6: gap (null list row 3)
+            None,    // 7: gap (null list row 3)
+            Some(4), // 8: row 4
+            Some(5), // 9: row 4
+        ]));
         let logical_nulls = array.logical_nulls();
         let levels = ArrayLevels {
-            def_levels: Some(vec![2, 2, 2, 2, 2]),
-            rep_levels: Some(vec![0, 1, 0, 0, 1]),
-            non_null_indices: vec![0, 1, 2, 3, 4],
-            max_def_level: 2,
+            def_levels: Some(vec![3, 0, 3, 2, 0, 3, 3]),
+            rep_levels: Some(vec![0, 0, 0, 1, 0, 0, 1]),
+            non_null_indices: vec![0, 3, 8, 9],
+            max_def_level: 3,
             max_rep_level: 1,
             array,
             logical_nulls,
         };
-        let sliced = levels.slice_for_chunk(&CdcChunk {
+
+        // Chunk 0: rows 0-1, nni=[0] → array sliced to [0..1]
+        let chunk0 = levels.slice_for_chunk(&CdcChunk {
+            level_offset: 0,
+            num_levels: 2,
+            value_offset: 0,
+            num_values: 1,
+        });
+        assert_eq!(chunk0.non_null_indices, vec![0]);
+        assert_eq!(chunk0.array.len(), 1);
+
+        // Chunk 1: rows 2-3, nni=[3] → array sliced to [3..4]
+        let chunk1 = levels.slice_for_chunk(&CdcChunk {
             level_offset: 2,
             num_levels: 3,
+            value_offset: 1,
+            num_values: 1,
+        });
+        assert_eq!(chunk1.non_null_indices, vec![0]);
+        assert_eq!(chunk1.array.len(), 1);
+
+        // Chunk 2: row 4, nni=[8, 9] → array sliced to [8..10]
+        let chunk2 = levels.slice_for_chunk(&CdcChunk {
+            level_offset: 5,
+            num_levels: 2,
             value_offset: 2,
-            num_values: 3,
+            num_values: 2,
         });
-        assert_eq!(sliced.def_levels, Some(vec![2, 2, 2]));
-        assert_eq!(sliced.rep_levels, Some(vec![0, 0, 1]));
-        // [0,1,2,3,4] filtered to [2,5) → [2,3,4] → shifted -2 → [0,1,2]
-        assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
-        assert_eq!(sliced.array.len(), 3);
+        assert_eq!(chunk2.non_null_indices, vec![0, 1]);
+        assert_eq!(chunk2.array.len(), 2);
     }
 
     #[test]
-    fn test_slice_for_chunk_non_null_indices_boundary() {
-        // [1, null, 3]: non_null_indices=[0, 2]; test inclusive lower / 
exclusive upper bounds
-        let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, 
Some(3)]));
+    fn test_slice_for_chunk_all_null() {
+        // All-null chunk: num_values=0 → empty nni slice → zero-length array.
+        let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, 
None, Some(4)]));
         let logical_nulls = array.logical_nulls();
         let levels = ArrayLevels {
-            def_levels: Some(vec![1, 0, 1]),
+            def_levels: Some(vec![1, 0, 0, 1]),
             rep_levels: None,
-            non_null_indices: vec![0, 2],
+            non_null_indices: vec![0, 3],
             max_def_level: 1,
             max_rep_level: 0,
             array,
             logical_nulls,
         };
-        assert_eq!(
-            levels
-                .slice_for_chunk(&CdcChunk {
-                    level_offset: 0,
-                    num_levels: 1,
-                    value_offset: 0,
-                    num_values: 1
-                })
-                .non_null_indices,
-            vec![0]
-        );
-        // idx 2 in range [1,3), shifted -1 → 1
-        assert_eq!(
-            levels
-                .slice_for_chunk(&CdcChunk {
-                    level_offset: 1,
-                    num_levels: 2,
-                    value_offset: 1,
-                    num_values: 2
-                })
-                .non_null_indices,
-            vec![1]
-        );
-        // idx 2 excluded from [1,2)
-        assert_eq!(
-            levels
-                .slice_for_chunk(&CdcChunk {
-                    level_offset: 1,
-                    num_levels: 1,
-                    value_offset: 1,
-                    num_values: 1
-                })
-                .non_null_indices,
-            Vec::<usize>::new()
-        );
+        // Chunk covering only the two null rows (levels 1..3), zero non-null 
values.
+        let sliced = levels.slice_for_chunk(&CdcChunk {
+            level_offset: 1,
+            num_levels: 2,
+            value_offset: 1,
+            num_values: 0,
+        });
+        assert_eq!(sliced.def_levels, Some(vec![0, 0]));
+        assert_eq!(sliced.non_null_indices, Vec::<usize>::new());
+        assert_eq!(sliced.array.len(), 0);
     }
 }
diff --git a/parquet/src/column/chunker/cdc.rs 
b/parquet/src/column/chunker/cdc.rs
index f21f58780a..cb4328536d 100644
--- a/parquet/src/column/chunker/cdc.rs
+++ b/parquet/src/column/chunker/cdc.rs
@@ -289,52 +289,100 @@ impl ContentDefinedChunker {
         let mut chunks = Vec::new();
         let mut prev_offset: usize = 0;
         let mut prev_value_offset: usize = 0;
-        // Total number of values seen; for non-nested data this equals 
num_levels.
-        let mut total_values: usize = num_levels;
+        let mut value_offset: usize = 0;
 
         if !has_rep_levels && !has_def_levels {
             // Fastest path: non-nested, non-null data.
+            // Every level corresponds to exactly one non-null value, so
+            // value_offset == level_offset and num_values == num_levels.
+            //
+            // Example: required Int32, array = [10, 20, 30]
+            //   level:         0   1   2
+            //   value_offset:  0   1   2
             for offset in 0..num_levels {
                 roll_value(self, offset);
                 if self.need_new_chunk() {
                     chunks.push(CdcChunk {
                         level_offset: prev_offset,
-                        value_offset: prev_offset,
                         num_levels: offset - prev_offset,
+                        value_offset: prev_offset,
                         num_values: offset - prev_offset,
                     });
                     prev_offset = offset;
                 }
             }
-            // Set the previous value offset to add the last chunk.
             prev_value_offset = prev_offset;
+            value_offset = num_levels;
         } else if !has_rep_levels {
-            // Non-nested data with nulls.
+            // Non-nested data with nulls. value_offset only increments for
+            // non-null values (def == max_def), so it diverges from the
+            // level offset when nulls are present.
+            //
+            // Example: optional Int32, array = [1, null, 2, null, 3]
+            //   def_levels:    [1, 0, 1, 0, 1]
+            //   level:          0  1  2  3  4
+            //   value_offset:   0     1     2  (only increments on def==1)
             let def_levels = def_levels.expect("def_levels required when 
max_def_level > 0");
             #[allow(clippy::needless_range_loop)]
             for offset in 0..num_levels {
                 let def_level = def_levels[offset];
                 self.roll_level(def_level);
                 if def_level == self.max_def_level {
+                    // For non-nested data, the leaf array has one slot per
+                    // level (nulls are array elements), so `offset` (the
+                    // level index) is the correct array index for hashing.
                     roll_value(self, offset);
                 }
+                // Check boundary before incrementing value_offset so that
+                // num_values reflects only entries in the completed chunk.
                 if self.need_new_chunk() {
                     chunks.push(CdcChunk {
                         level_offset: prev_offset,
-                        value_offset: prev_offset,
                         num_levels: offset - prev_offset,
-                        num_values: offset - prev_offset,
+                        value_offset: prev_value_offset,
+                        num_values: value_offset - prev_value_offset,
                     });
                     prev_offset = offset;
+                    prev_value_offset = value_offset;
+                }
+                if def_level == self.max_def_level {
+                    value_offset += 1;
                 }
             }
-            // Set the previous value offset to add the last chunk.
-            prev_value_offset = prev_offset;
         } else {
-            // Nested data with nulls.
+            // Nested data with nulls. Two counters are needed:
+            //
+            //   leaf_offset: index into the leaf values array for hashing,
+            //     incremented for all leaf slots (def >= 
repeated_ancestor_def_level),
+            //     including null elements.
+            //
+            //   value_offset: index into non_null_indices for chunk 
boundaries,
+            //     incremented only for non-null leaf values (def == 
max_def_level).
+            //
+            // These diverge when nullable elements exist inside lists.
+            //
+            // Example: List<Int32?> with repeated_ancestor_def_level=2, 
max_def=3
+            //   row 0: [1, null, 2]   (3 leaf slots, 2 non-null)
+            //   row 1: [3]            (1 leaf slot, 1 non-null)
+            //
+            //   leaf array:    [1, null, 2, 3]
+            //   def_levels:    [3,  2,   3, 3]
+            //   rep_levels:    [0,  1,   1, 0]
+            //
+            //   level  def  leaf_offset  value_offset  action
+            //   ─────  ───  ───────────  ────────────  
──────────────────────────
+            //     0     3       0             0        roll_value(0), 
value++, leaf++
+            //     1     2       1             1        leaf++ only (null 
element)
+            //     2     3       2             1        roll_value(2), 
value++, leaf++
+            //     3     3       3             2        roll_value(3), 
value++, leaf++
+            //
+            // roll_value(2) correctly indexes leaf array position 2 (value 
"2").
+            // Using value_offset=1 would index position 1 (the null slot).
+            //
+            // Using value_offset for roll_value would hash the wrong array 
slot.
             let def_levels = def_levels.expect("def_levels required for nested 
data");
             let rep_levels = rep_levels.expect("rep_levels required for nested 
data");
-            let mut value_offset: usize = 0;
+            let mut leaf_offset: usize = 0;
 
             for offset in 0..num_levels {
                 let def_level = def_levels[offset];
@@ -343,43 +391,45 @@ impl ContentDefinedChunker {
                 self.roll_level(def_level);
                 self.roll_level(rep_level);
                 if def_level == self.max_def_level {
-                    roll_value(self, value_offset);
+                    roll_value(self, leaf_offset);
                 }
 
+                // Check boundary before incrementing value_offset so that
+                // num_values reflects only entries in the completed chunk.
                 if rep_level == 0 && self.need_new_chunk() {
-                    // If we are at a record boundary and need a new chunk, 
create one.
                     let levels_to_write = offset - prev_offset;
                     if levels_to_write > 0 {
                         chunks.push(CdcChunk {
                             level_offset: prev_offset,
-                            value_offset: prev_value_offset,
                             num_levels: levels_to_write,
+                            value_offset: prev_value_offset,
                             num_values: value_offset - prev_value_offset,
                         });
                         prev_offset = offset;
                         prev_value_offset = value_offset;
                     }
                 }
-                if def_level >= self.repeated_ancestor_def_level {
-                    // We only increment the value offset if we have a leaf 
value.
+                if def_level == self.max_def_level {
                     value_offset += 1;
                 }
+                if def_level >= self.repeated_ancestor_def_level {
+                    leaf_offset += 1;
+                }
             }
-            total_values = value_offset;
         }
 
         // Add the last chunk if we have any levels left.
         if prev_offset < num_levels {
             chunks.push(CdcChunk {
                 level_offset: prev_offset,
-                value_offset: prev_value_offset,
                 num_levels: num_levels - prev_offset,
-                num_values: total_values - prev_value_offset,
+                value_offset: prev_value_offset,
+                num_values: value_offset - prev_value_offset,
             });
         }
 
         #[cfg(debug_assertions)]
-        self.validate_chunks(&chunks, num_levels, total_values);
+        self.validate_chunks(&chunks, num_levels, value_offset);
 
         chunks
     }
@@ -626,8 +676,9 @@ mod tests {
         assert_eq!(chunks1.len(), chunks2.len());
         for (a, b) in chunks1.iter().zip(chunks2.iter()) {
             assert_eq!(a.level_offset, b.level_offset);
-            assert_eq!(a.value_offset, b.value_offset);
             assert_eq!(a.num_levels, b.num_levels);
+            assert_eq!(a.value_offset, b.value_offset);
+            assert_eq!(a.num_values, b.num_values);
         }
     }
 
@@ -642,17 +693,23 @@ mod tests {
 
         let num_levels = 20;
         // def_level=1 means non-null, def_level=0 means null
+        // Pattern: null at indices 0, 3, 6, 9, 12, 15, 18 → 7 nulls, 13 
non-null
         let def_levels: Vec<i16> = (0..num_levels)
             .map(|i| if i % 3 == 0 { 0 } else { 1 })
             .collect();
+        let expected_non_null: usize = def_levels.iter().filter(|&&d| d == 
1).count();
 
         let chunks = chunker.calculate(Some(&def_levels), None, num_levels, 
|c, i| {
             c.roll_fixed::<4>(&(i as i32).to_le_bytes());
         });
 
         assert!(!chunks.is_empty());
-        let total: usize = chunks.iter().map(|c| c.num_levels).sum();
-        assert_eq!(total, num_levels);
+        let total_levels: usize = chunks.iter().map(|c| c.num_levels).sum();
+        let total_values: usize = chunks.iter().map(|c| c.num_values).sum();
+        assert_eq!(total_levels, num_levels);
+        assert_eq!(total_values, expected_non_null);
+        // With nulls present, total_values < total_levels
+        assert!(total_values < total_levels);
     }
 }
 
@@ -663,9 +720,12 @@ mod arrow_tests {
     use std::borrow::Borrow;
     use std::sync::Arc;
 
+    use arrow::util::data_gen::create_random_batch;
     use arrow_array::cast::AsArray;
     use arrow_array::{Array, ArrayRef, BooleanArray, Int32Array, RecordBatch};
-    use arrow_schema::{DataType, Field, Schema};
+    use arrow_buffer::Buffer;
+    use arrow_data::ArrayData;
+    use arrow_schema::{DataType, Field, Fields, Schema};
 
     use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
     use crate::arrow::arrow_writer::ArrowWriter;
@@ -2153,4 +2213,130 @@ mod arrow_tests {
             "all chunks after the first must be identical"
         );
     }
+
+    /// Regression test for <https://github.com/apache/arrow-rs/issues/9637>
+    ///
+    /// Writing nested list data with CDC enabled panicked with an 
out-of-bounds
+    /// slice access when null list entries had non-zero child ranges.
+    #[test]
+    fn test_cdc_list_roundtrip() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new(
+                "_1",
+                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, 
true))),
+                true,
+            ),
+            Field::new(
+                "_2",
+                
DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
+                true,
+            ),
+            Field::new(
+                "_3",
+                
DataType::LargeList(Arc::new(Field::new_list_field(DataType::Utf8, true))),
+                true,
+            ),
+        ]));
+        let batch = create_random_batch(schema, 10_000, 0.25, 0.75).unwrap();
+        write_with_cdc_options(
+            &[&batch],
+            CDC_MIN_CHUNK_SIZE,
+            CDC_MAX_CHUNK_SIZE,
+            None,
+            true,
+        );
+    }
+
+    /// Test CDC with deeply nested types: List<List<Int32>>, 
List<Struct<List<Int32>>>
+    #[test]
+    fn test_cdc_deeply_nested_roundtrip() {
+        let inner_field = Field::new_list_field(DataType::Int32, true);
+        let inner_type = DataType::List(Arc::new(inner_field));
+        let outer_field = Field::new_list_field(inner_type.clone(), true);
+        let list_list_type = DataType::List(Arc::new(outer_field));
+
+        let struct_inner_field = Field::new_list_field(DataType::Int32, true);
+        let struct_inner_type = DataType::List(Arc::new(struct_inner_field));
+        let struct_fields = Fields::from(vec![Field::new("a", 
struct_inner_type, true)]);
+        let struct_type = DataType::Struct(struct_fields);
+        let struct_list_field = Field::new_list_field(struct_type, true);
+        let list_struct_type = DataType::List(Arc::new(struct_list_field));
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("list_list", list_list_type, true),
+            Field::new("list_struct_list", list_struct_type, true),
+        ]));
+        let batch = create_random_batch(schema, 10_000, 0.25, 0.75).unwrap();
+        write_with_cdc_options(
+            &[&batch],
+            CDC_MIN_CHUNK_SIZE,
+            CDC_MAX_CHUNK_SIZE,
+            None,
+            true,
+        );
+    }
+
+    /// Test CDC with list arrays that have non-empty null segments.
+    ///
+    /// Per the Arrow columnar format spec: "a null value may correspond to a
+    /// non-empty segment in the child array". This test constructs such arrays
+    /// manually and verifies the CDC writer handles them correctly.
+    #[test]
+    fn test_cdc_list_non_empty_null_segments() {
+        // Build List<Int32> where null entries own non-zero child ranges:
+        //   row 0: [1, 2]     offsets[0..2]  valid
+        //   row 1: null        offsets[2..5]  null, but owns 3 child values
+        //   row 2: [6, 7]     offsets[5..7]  valid
+        //   row 3: null        offsets[7..9]  null, but owns 2 child values
+        //   row 4: [10]        offsets[9..10] valid
+        let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+        let offsets = Buffer::from_iter([0_i32, 2, 5, 7, 9, 10]);
+        let null_bitmap = Buffer::from([0b00010101]); // rows 0, 2, 4 valid
+
+        let list_type = 
DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false)));
+        let list_data = unsafe {
+            ArrayData::new_unchecked(
+                list_type.clone(),
+                5,
+                None,
+                Some(null_bitmap),
+                0,
+                vec![offsets],
+                vec![values.to_data()],
+            )
+        };
+        let list_array = arrow_array::make_array(list_data);
+
+        let schema = Arc::new(Schema::new(vec![Field::new("col", list_type, 
true)]));
+        let batch = RecordBatch::try_new(schema, vec![list_array]).unwrap();
+
+        let buf = write_with_cdc_options(
+            &[&batch],
+            CDC_MIN_CHUNK_SIZE,
+            CDC_MAX_CHUNK_SIZE,
+            None,
+            true,
+        );
+        let read = concat_batches(read_batches(&buf));
+        let read_list = read.column(0).as_list::<i32>();
+        assert_eq!(read_list.len(), 5);
+        assert!(read_list.is_valid(0));
+        assert!(read_list.is_null(1));
+        assert!(read_list.is_valid(2));
+        assert!(read_list.is_null(3));
+        assert!(read_list.is_valid(4));
+
+        let get_vals = |i: usize| -> Vec<i32> {
+            read_list
+                .value(i)
+                .as_primitive::<arrow_array::types::Int32Type>()
+                .values()
+                .iter()
+                .copied()
+                .collect()
+        };
+        assert_eq!(get_vals(0), vec![1, 2]);
+        assert_eq!(get_vals(2), vec![6, 7]);
+        assert_eq!(get_vals(4), vec![10]);
+    }
 }
diff --git a/parquet/src/column/chunker/mod.rs 
b/parquet/src/column/chunker/mod.rs
index c4caf18af6..42631e026d 100644
--- a/parquet/src/column/chunker/mod.rs
+++ b/parquet/src/column/chunker/mod.rs
@@ -31,10 +31,10 @@ pub(crate) use cdc::ContentDefinedChunker;
 pub(crate) struct CdcChunk {
     /// The start offset of this chunk inside the given levels.
     pub level_offset: usize,
-    /// The start offset of this chunk inside the given values array.
-    pub value_offset: usize,
     /// The number of levels in this chunk.
     pub num_levels: usize,
-    /// The number of values (Arrow array elements) in this chunk.
+    /// The start index into `non_null_indices` for this chunk.
+    pub value_offset: usize,
+    /// The number of `non_null_indices` entries in this chunk.
     pub num_values: usize,
 }

Reply via email to