This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new b3e047f59a Fix Invalid offset in sparse column chunk data error for
multiple predicates (#9509)
b3e047f59a is described below
commit b3e047f59a562020a0fd50e7c68c4e6cbd53687d
Author: Peter L <[email protected]>
AuthorDate: Thu Mar 12 05:15:07 2026 +1030
Fix Invalid offset in sparse column chunk data error for multiple
predicates (#9509)
# Which issue does this PR close?
Raised an issue at https://github.com/apache/arrow-rs/issues/9516 for
this one
Same issue as https://github.com/apache/arrow-rs/issues/9239 but
extended to another scenario
# Rationale for this change
When there are multiple predicates being evaluated, we need to reset the
row selection policy before overriding the strategy.
Scenario:
- Dense initial RowSelection (alternating select/skip) covers all pages
→ Auto resolves to Mask
- Predicate 1 evaluates on column A, narrows selection to skip middle
pages
- Predicate 2's column B is fetched sparsely with the narrowed selection
(missing middle pages)
- Without the fix, the override for predicate 2 returns early
(policy=Mask, not Auto), so Mask is used and tries to read missing pages
→ "Invalid offset" error
# What changes are included in this PR?
This is a one line change to reset the selection policy in the
`RowGroupDecoderState::WaitingOnFilterData` arm
# Are these changes tested?
Yes a new test added that fails currently on `main`, but as you can see
it's a doozy to set up.
# Are there any user-facing changes?
Nope
---
.../src/arrow/push_decoder/reader_builder/mod.rs | 7 ++
parquet/tests/arrow_reader/row_filter/async.rs | 111 ++++++++++++++++++++-
2 files changed, 117 insertions(+), 1 deletion(-)
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
index 8fa299be88..d3d78ca7c2 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -437,6 +437,13 @@ impl RowGroupReaderBuilder {
.with_parquet_metadata(&self.metadata)
.build_array_reader(self.fields.as_deref(),
predicate.projection())?;
+ // Reset to original policy before each predicate so the
override
+ // can detect page skipping for THIS predicate's columns.
+ // Without this reset, a prior predicate's override (e.g. Mask)
+ // carries forward and the check returns early, missing
unfetched
+ // pages for subsequent predicates.
+ plan_builder =
plan_builder.with_row_selection_policy(self.row_selection_policy);
+
// Prepare to evaluate the filter.
// Note: first update the selection strategy to properly
handle any pages
// pruned during fetch
diff --git a/parquet/tests/arrow_reader/row_filter/async.rs
b/parquet/tests/arrow_reader/row_filter/async.rs
index 6fa616d714..66840bb814 100644
--- a/parquet/tests/arrow_reader/row_filter/async.rs
+++ b/parquet/tests/arrow_reader/row_filter/async.rs
@@ -21,7 +21,7 @@ use std::sync::Arc;
use arrow::{
array::AsArray,
compute::{concat_batches, kernels::cmp::eq, or},
- datatypes::TimestampNanosecondType,
+ datatypes::{Int32Type, TimestampNanosecondType},
};
use arrow_array::{
ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatch,
Scalar, StringArray,
@@ -525,3 +525,112 @@ async fn test_predicate_pushdown_with_skipped_pages() {
assert_eq!(batch.column(0).as_string(), &expected);
}
}
+
+/// Regression test: when multiple predicates are used, the first predicate's
+/// override of the selection strategy (to Mask) must NOT carry forward to
+/// subsequent predicates. Each predicate must get a fresh Auto policy so the
+/// override can detect page skipping for that predicate's specific columns.
+///
+/// Scenario:
+/// - Dense initial RowSelection (alternating select/skip) covers all pages →
Auto resolves to Mask
+/// - Predicate 1 evaluates on column A, narrows selection to skip middle pages
+/// - Predicate 2's column B is fetched sparsely with the narrowed selection
(missing middle pages)
+/// - Without the fix, the override for predicate 2 returns early
(policy=Mask, not Auto),
+/// so Mask is used and tries to read missing pages → "Invalid offset" error
+#[tokio::test]
+async fn test_multi_predicate_mask_policy_carryover() {
+ // 300 rows, 1 row group, 100 rows per page (3 pages)
+ let num_rows = 300usize;
+ let rows_per_page = 100;
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("filter_col", DataType::Int32, false),
+ Field::new("value_col", DataType::Int32, false),
+ ]));
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(Some(num_rows))
+ .set_data_page_row_count_limit(rows_per_page)
+ .set_write_batch_size(rows_per_page)
+ .set_dictionary_enabled(false)
+ .build();
+
+ // filter_col: 0 for first and last 100 rows, 1 for middle 100 rows
+ // value_col: just row index
+ let filter_values: Vec<i32> = (0..num_rows as i32)
+ .map(|i| if (100..200).contains(&i) { 1 } else { 0 })
+ .collect();
+ let value_values: Vec<i32> = (0..num_rows as i32).collect();
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(Int32Array::from(filter_values)) as ArrayRef,
+ Arc::new(Int32Array::from(value_values)) as ArrayRef,
+ ],
+ )
+ .unwrap();
+
+ let mut buffer = Vec::new();
+ let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(),
Some(props)).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+ let buffer = Bytes::from(buffer);
+
+ let reader = TestReader::new(buffer);
+ let options =
ArrowReaderOptions::default().with_page_index_policy(PageIndexPolicy::Required);
+ let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader,
options)
+ .await
+ .unwrap();
+
+ let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+
+ // Dense initial selection: Select(1), Skip(1) repeated → triggers Mask
strategy
+ // Covers all pages since every page has selected rows
+ let selectors: Vec<RowSelector> = (0..num_rows / 2)
+ .flat_map(|_| vec![RowSelector::select(1), RowSelector::skip(1)])
+ .collect();
+ let selection = RowSelection::from(selectors);
+
+ // Predicate 1 on filter_col: keeps only rows where filter_col == 0
+ // (first 100 and last 100 rows). After this, middle page is excluded.
+ let pred1 = ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr,
[0]), |batch| {
+ let col = batch.column(0).as_primitive::<Int32Type>();
+ Ok(BooleanArray::from_iter(
+ col.iter().map(|v| v.map(|val| val == 0)),
+ ))
+ });
+
+ // Predicate 2 on value_col: keeps rows where value_col < 250
+ // This column is fetched AFTER predicate 1 narrows the selection.
+ // Its sparse data will be missing the middle page.
+ let pred2 = ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr,
[1]), |batch| {
+ let col = batch.column(0).as_primitive::<Int32Type>();
+ Ok(BooleanArray::from_iter(
+ col.iter().map(|v| v.map(|val| val < 250)),
+ ))
+ });
+
+ let row_filter = RowFilter::new(vec![Box::new(pred1), Box::new(pred2)]);
+
+ // Output projection: both columns
+ let projection = ProjectionMask::roots(&schema_descr, [0, 1]);
+
+ let stream = builder
+ .with_row_filter(row_filter)
+ .with_row_selection(selection)
+ .with_projection(projection)
+ .with_max_predicate_cache_size(0)
+ .build()
+ .unwrap();
+
+ // Without the fix, this panics with:
+ // "Invalid offset in sparse column chunk data: ..., no matching page
found."
+ let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
+ let batch = concat_batches(&batches[0].schema(), &batches).unwrap();
+
+ // Verify results: rows where filter_col==0 AND value_col<250 AND original
alternating selection
+ // That's even-indexed rows in [0,100) with value<250 → rows 0,2,4,...,98
(50 rows)
+ // Plus even-indexed rows in [200,250) with value<250 → rows
200,202,...,248 (25 rows)
+ assert_eq!(batch.num_rows(), 75);
+}