This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d2e2cdafed Fix skip_records over-counting when partial record precedes
num_rows page skip (#9374)
d2e2cdafed is described below
commit d2e2cdafed93a8e0152fe1d018ec2cef154ccb20
Author: Jonas Dedden <[email protected]>
AuthorDate: Mon Mar 9 21:32:53 2026 +0100
Fix skip_records over-counting when partial record precedes num_rows page
skip (#9374)
# Which issue does this PR close?
- Closes #9370 .
# Rationale for this change
The bug occurs when using RowSelection with nested types (like
List<String>) when:
1. A column has multiple pages in a row group
2. The selected rows span across page boundaries
3. The first page is entirely consumed during skip operations
The issue was in `arrow-rs/parquet/src/column/reader.rs:287-382`
(`skip_records` function).
**Root cause:** When `skip_records` completed successfully after
crossing page boundaries, the `has_partial` state in the
`RepetitionLevelDecoder` could incorrectly remain true.
This happened when:
- The skip operation exhausted a page where has_record_delimiter was
false
- The skip found the remaining records on the next page by counting a
delimiter at index 0
- When a subsequent read_records(1) was called, the stale
has_partial=true state caused count_records to incorrectly interpret the
first repetition level (0) at index 0 as ending a "phantom" partial
record, returning (1 record, 0 levels, 0 values) instead of properly
reading the actual record data.
For a more descriptive explanation, look here:
https://github.com/apache/arrow-rs/issues/9370#issuecomment-3861143928
# What changes are included in this PR?
Added code at the end of skip_records to reset the partial record state
when all requested records have been successfully skipped.
This ensures that after skip_records completes, we're at a clean record
boundary with no lingering partial record state, fixing the array length
mismatch in StructArrayReader.
# Are these changes tested?
Commit
https://github.com/apache/arrow-rs/commit/365bd9a4ced7897f391e4533930a0c9683952723
introduces a test showcasing this issue with v2 data pages only on a
unit-test level. PR https://github.com/apache/arrow-rs/pull/9399 could
be used to showcase the issue in an end-to-end way.
Previously wrong assumption that thought it had to do with mixing v1 and
v2 data pages:
```
In b52e043 I added a test that I validated to fail whenever I remove my fix.
Bug Mechanism
The bug requires three ingredients:
1. Page 1 (DataPage v1): Contains a nested column (with rep levels).
During skip_records, all levels on this page are consumed. count_records sees
no following rep=0 delimiter, so it sets
has_partial=true. Since has_record_delimiter is false (the default
InMemoryPageReader returns false when more pages exist), flush_partial is not
called.
2. Page 2 (DataPage v2): Has num_rows available in its metadata. When
num_rows <= remaining_records, the entire page is skipped via skip_next_page()
— this does not touch the rep level decoder at all,
so has_partial remains stale true from page 1.
3. Page 3 (DataPage v1): When read_records loads this page, the stale
has_partial=true causes the rep=0 at position 0 to be misinterpreted as
completing a "phantom" partial record. This produces (1
record, 0 levels, 0 values) instead of reading the actual record data.
Test Verification
- With fix (flush_partial at end of skip_records): read_records(1)
correctly returns (1, 2, 2) with values [70, 80]
- Without fix: read_records(1) returns (1, 0, 0) — a phantom record with
no data, which is what causes the "Not all children array length are the same!"
error when different sibling columns in a struct
produce different record counts
```
---------
Co-authored-by: Ed Seidl <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/src/column/page.rs | 9 +-
parquet/src/column/reader.rs | 131 ++++++++++++++++++++++++++
parquet/src/file/serialized_reader.rs | 7 +-
parquet/tests/arrow_reader/row_filter/sync.rs | 2 -
4 files changed, 145 insertions(+), 4 deletions(-)
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index f18b296c1c..4cfc07a028 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -406,7 +406,14 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send
{
/// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
/// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
fn at_record_boundary(&mut self) -> Result<bool> {
- Ok(self.peek_next_page()?.is_none())
+ match self.peek_next_page()? {
+ // Last page in the column chunk - always a record boundary
+ None => Ok(true),
+ // A V2 data page is required by the parquet spec to start at a
+ // record boundary, so the current page ends at one. V2 pages
+ // are identified by having `num_rows` set in their header.
+ Some(metadata) => Ok(metadata.num_rows.is_some()),
+ }
}
}
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 387a0602a6..29cb50185a 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -1361,4 +1361,135 @@ mod tests {
);
}
}
+
+ /// Regression test for <https://github.com/apache/arrow-rs/issues/9370>
+ ///
+ /// Reproduces the production scenario: all DataPage v2 pages for a
+ /// list column (rep_level=1) read without an offset index (i.e.
+ /// `at_record_boundary` returns false for non-last pages).
+ ///
+ /// When a prior operation (here `skip_records(1)`) loads a v2 page,
+ /// and a subsequent `skip_records` exhausts the remaining levels on
+ /// that page, the rep level decoder is left with `has_partial=true`.
+ /// Because `has_record_delimiter` is false, the partial is not
+ /// flushed during level-based processing. When the next v2 page is
+ /// then peeked with `num_rows` available, the whole-page-skip
+ /// shortcut must flush the pending partial first. Otherwise:
+ ///
+ /// 1. The skip over-counts (skips N+1 records instead of N), and
+ /// 2. The stale `has_partial` causes a subsequent `read_records` to
+ /// produce a "phantom" record with 0 values.
+ #[test]
+ fn test_skip_records_v2_page_skip_accounts_for_partial() {
+ use crate::encodings::levels::LevelEncoder;
+
+ let max_rep_level: i16 = 1;
+ let max_def_level: i16 = 1;
+
+ // Column descriptor for a list element column (rep=1, def=1)
+ let primitive_type = SchemaType::primitive_type_builder("element",
PhysicalType::INT32)
+ .with_repetition(Repetition::REQUIRED)
+ .build()
+ .unwrap();
+ let desc = Arc::new(ColumnDescriptor::new(
+ Arc::new(primitive_type),
+ max_def_level,
+ max_rep_level,
+ ColumnPath::new(vec!["list".to_string(), "element".to_string()]),
+ ));
+
+ // Helper: build a DataPage v2 for this list column.
+ let make_v2_page =
+ |rep_levels: &[i16], def_levels: &[i16], values: &[i32], num_rows:
u32| -> Page {
+ let mut rep_enc = LevelEncoder::v2(max_rep_level,
rep_levels.len());
+ rep_enc.put(rep_levels);
+ let rep_bytes = rep_enc.consume();
+
+ let mut def_enc = LevelEncoder::v2(max_def_level,
def_levels.len());
+ def_enc.put(def_levels);
+ let def_bytes = def_enc.consume();
+
+ let val_bytes: Vec<u8> = values.iter().flat_map(|v|
v.to_le_bytes()).collect();
+
+ let mut buf = Vec::new();
+ buf.extend_from_slice(&rep_bytes);
+ buf.extend_from_slice(&def_bytes);
+ buf.extend_from_slice(&val_bytes);
+
+ Page::DataPageV2 {
+ buf: Bytes::from(buf),
+ num_values: rep_levels.len() as u32,
+ encoding: Encoding::PLAIN,
+ num_nulls: 0,
+ num_rows,
+ def_levels_byte_len: def_bytes.len() as u32,
+ rep_levels_byte_len: rep_bytes.len() as u32,
+ is_compressed: false,
+ statistics: None,
+ }
+ };
+
+ // All pages are DataPage v2 (matching the production scenario where
+ // parquet-rs writes only v2 data pages and no offset index is loaded,
+ // so at_record_boundary() returns false for non-last pages).
+
+ // Page 1 (v2): 2 records × 2 elements = [10,20], [30,40]
+ let page1 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[10, 20, 30,
40], 2);
+
+ // Page 2 (v2): 2 records × 2 elements = [50,60], [70,80]
+ let page2 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[50, 60, 70,
80], 2);
+
+ // Page 3 (v2): 1 record × 2 elements = [90,100]
+ let page3 = make_v2_page(&[0, 1], &[1, 1], &[90, 100], 1);
+
+ // 5 records total: [10,20], [30,40], [50,60], [70,80], [90,100]
+ let pages = VecDeque::from(vec![page1, page2, page3]);
+ let page_reader = InMemoryPageReader::new(pages);
+ let column_reader: ColumnReader = get_column_reader(desc,
Box::new(page_reader));
+ let mut typed_reader =
get_typed_column_reader::<Int32Type>(column_reader);
+
+ // Step 1 — skip 1 record:
+ // Peek page 1: num_rows=2, remaining=1 → rows(2) > remaining(1),
+ // so the page is LOADED (not whole-page-skipped).
+ // Level-based skip consumes rep levels [0,1] for record [10,20],
+ // stopping at the 0 that starts record [30,40].
+ let skipped = typed_reader.skip_records(1).unwrap();
+ assert_eq!(skipped, 1);
+
+ // Step 2 — skip 2 more records ([30,40] and [50,60]):
+ // Mid-page in page 1 with 2 remaining levels [0,1] for [30,40].
+ // skip_rep_levels(2, 2): the leading 0 does NOT act as a record
+ // delimiter (has_partial=false, idx==0), so count_records returns
+ // (true, 0, 2) — all levels consumed, has_partial=true, 0 records.
+ //
+ // has_record_delimiter is false → no flush at page boundary.
+ // Page 1 exhausted → peek page 2 (v2, num_rows=2).
+ //
+ // With fix: flush_partial → remaining 2→1, page 2 NOT skipped
+ // (rows=2 > remaining=1). Load page 2, skip 1 record [50,60].
+ //
+ // Without fix: rows(2) <= remaining(2) → page 2 whole-page-skipped,
+ // over-counting by 1. has_partial stays true (stale from page 1).
+ let skipped = typed_reader.skip_records(2).unwrap();
+ assert_eq!(skipped, 2);
+
+ // Step 3 — read 1 record:
+ let mut values = Vec::new();
+ let mut def_levels = Vec::new();
+ let mut rep_levels = Vec::new();
+
+ let (records, values_read, levels_read) = typed_reader
+ .read_records(1, Some(&mut def_levels), Some(&mut rep_levels),
&mut values)
+ .unwrap();
+
+ // Without the fix: (1, 0, 0) — phantom record from stale has_partial;
+ // the rep=0 on page 3 "completes" the phantom, yielding 0 values.
+ // With the fix: (1, 2, 2) — correctly reads record [70, 80].
+ assert_eq!(records, 1, "should read exactly 1 record");
+ assert_eq!(levels_read, 2, "should read 2 levels for the record");
+ assert_eq!(values_read, 2, "should read 2 non-null values");
+ assert_eq!(values, vec![70, 80], "should contain 4th record's values");
+ assert_eq!(rep_levels, vec![0, 1], "rep levels for a 2-element list");
+ assert_eq!(def_levels, vec![1, 1], "def levels (all non-null)");
+ }
}
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index b3b6383f78..254ccb779a 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -1158,7 +1158,12 @@ impl<R: ChunkReader> PageReader for
SerializedPageReader<R> {
fn at_record_boundary(&mut self) -> Result<bool> {
match &mut self.state {
- SerializedPageReaderState::Values { .. } =>
Ok(self.peek_next_page()?.is_none()),
+ SerializedPageReaderState::Values { .. } => match
self.peek_next_page()? {
+ None => Ok(true),
+ // V2 data pages must start at record boundaries per the
parquet
+ // spec, so the current page ends at one.
+ Some(metadata) => Ok(metadata.num_rows.is_some()),
+ },
SerializedPageReaderState::Pages { .. } => Ok(true),
}
}
diff --git a/parquet/tests/arrow_reader/row_filter/sync.rs
b/parquet/tests/arrow_reader/row_filter/sync.rs
index e59fa392cf..77a75220dc 100644
--- a/parquet/tests/arrow_reader/row_filter/sync.rs
+++ b/parquet/tests/arrow_reader/row_filter/sync.rs
@@ -206,7 +206,6 @@ fn test_row_filter_full_page_skip_is_handled() {
/// Without the fix, the list column over-skips by one record, causing
/// struct children to disagree on record counts.
#[test]
-#[should_panic(expected = "StructArrayReader out of sync in read_records,
expected 1 read, got 0")]
fn test_row_selection_list_column_v2_page_boundary_skip() {
use arrow_array::builder::{Int32Builder, ListBuilder};
@@ -327,7 +326,6 @@ fn test_row_selection_list_column_v2_page_boundary_skip() {
/// bug causes one leaf to over-skip by one record while the other stays
/// correct.
#[test]
-#[should_panic(expected = "Not all children array length are the same!")]
fn test_list_struct_page_boundary_desync_produces_length_mismatch() {
use arrow_array::Array;
use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder,
StructBuilder};