This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new b3e047f59a Fix Invalid offset in sparse column chunk data error for 
multiple predicates (#9509)
b3e047f59a is described below

commit b3e047f59a562020a0fd50e7c68c4e6cbd53687d
Author: Peter L <[email protected]>
AuthorDate: Thu Mar 12 05:15:07 2026 +1030

    Fix Invalid offset in sparse column chunk data error for multiple 
predicates (#9509)
    
    # Which issue does this PR close?
    
    Raised an issue at https://github.com/apache/arrow-rs/issues/9516 for
    this one
    
    Same issue as https://github.com/apache/arrow-rs/issues/9239 but
    extended to another scenario
    
    # Rationale for this change
    
    When there are multiple predicates being evaluated, we need to reset the
    row selection policy before overriding the strategy.
    
    Scenario:
    - Dense initial RowSelection (alternating select/skip) covers all pages
    → Auto resolves to Mask
    - Predicate 1 evaluates on column A, narrows selection to skip middle
    pages
    - Predicate 2's column B is fetched sparsely with the narrowed selection
    (missing middle pages)
    - Without the fix, the override for predicate 2 returns early
    (policy=Mask, not Auto), so Mask is used and tries to read missing pages
    → "Invalid offset" error
    
    # What changes are included in this PR?
    
    This is a one line change to reset the selection policy in the
    `RowGroupDecoderState::WaitingOnFilterData` arm
    
    # Are these changes tested?
    
    Yes a new test added that fails currently on `main`, but as you can see
    it's a doozy to set up.
    
    # Are there any user-facing changes?
    
    Nope
---
 .../src/arrow/push_decoder/reader_builder/mod.rs   |   7 ++
 parquet/tests/arrow_reader/row_filter/async.rs     | 111 ++++++++++++++++++++-
 2 files changed, 117 insertions(+), 1 deletion(-)

diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs 
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
index 8fa299be88..d3d78ca7c2 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -437,6 +437,13 @@ impl RowGroupReaderBuilder {
                     .with_parquet_metadata(&self.metadata)
                     .build_array_reader(self.fields.as_deref(), 
predicate.projection())?;
 
+                // Reset to original policy before each predicate so the 
override
+                // can detect page skipping for THIS predicate's columns.
+                // Without this reset, a prior predicate's override (e.g. Mask)
+                // carries forward and the check returns early, missing 
unfetched
+                // pages for subsequent predicates.
+                plan_builder = 
plan_builder.with_row_selection_policy(self.row_selection_policy);
+
                 // Prepare to evaluate the filter.
                 // Note: first update the selection strategy to properly 
handle any pages
                 // pruned during fetch
diff --git a/parquet/tests/arrow_reader/row_filter/async.rs 
b/parquet/tests/arrow_reader/row_filter/async.rs
index 6fa616d714..66840bb814 100644
--- a/parquet/tests/arrow_reader/row_filter/async.rs
+++ b/parquet/tests/arrow_reader/row_filter/async.rs
@@ -21,7 +21,7 @@ use std::sync::Arc;
 use arrow::{
     array::AsArray,
     compute::{concat_batches, kernels::cmp::eq, or},
-    datatypes::TimestampNanosecondType,
+    datatypes::{Int32Type, TimestampNanosecondType},
 };
 use arrow_array::{
     ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatch, 
Scalar, StringArray,
@@ -525,3 +525,112 @@ async fn test_predicate_pushdown_with_skipped_pages() {
         assert_eq!(batch.column(0).as_string(), &expected);
     }
 }
+
+/// Regression test: when multiple predicates are used, the first predicate's
+/// override of the selection strategy (to Mask) must NOT carry forward to
+/// subsequent predicates. Each predicate must get a fresh Auto policy so the
+/// override can detect page skipping for that predicate's specific columns.
+///
+/// Scenario:
+/// - Dense initial RowSelection (alternating select/skip) covers all pages → 
Auto resolves to Mask
+/// - Predicate 1 evaluates on column A, narrows selection to skip middle pages
+/// - Predicate 2's column B is fetched sparsely with the narrowed selection 
(missing middle pages)
+/// - Without the fix, the override for predicate 2 returns early 
(policy=Mask, not Auto),
+///   so Mask is used and tries to read missing pages → "Invalid offset" error
+#[tokio::test]
+async fn test_multi_predicate_mask_policy_carryover() {
+    // 300 rows, 1 row group, 100 rows per page (3 pages)
+    let num_rows = 300usize;
+    let rows_per_page = 100;
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("filter_col", DataType::Int32, false),
+        Field::new("value_col", DataType::Int32, false),
+    ]));
+
+    let props = WriterProperties::builder()
+        .set_max_row_group_row_count(Some(num_rows))
+        .set_data_page_row_count_limit(rows_per_page)
+        .set_write_batch_size(rows_per_page)
+        .set_dictionary_enabled(false)
+        .build();
+
+    // filter_col: 0 for first and last 100 rows, 1 for middle 100 rows
+    // value_col: just row index
+    let filter_values: Vec<i32> = (0..num_rows as i32)
+        .map(|i| if (100..200).contains(&i) { 1 } else { 0 })
+        .collect();
+    let value_values: Vec<i32> = (0..num_rows as i32).collect();
+
+    let batch = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(filter_values)) as ArrayRef,
+            Arc::new(Int32Array::from(value_values)) as ArrayRef,
+        ],
+    )
+    .unwrap();
+
+    let mut buffer = Vec::new();
+    let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), 
Some(props)).unwrap();
+    writer.write(&batch).unwrap();
+    writer.close().unwrap();
+    let buffer = Bytes::from(buffer);
+
+    let reader = TestReader::new(buffer);
+    let options = 
ArrowReaderOptions::default().with_page_index_policy(PageIndexPolicy::Required);
+    let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, 
options)
+        .await
+        .unwrap();
+
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+
+    // Dense initial selection: Select(1), Skip(1) repeated → triggers Mask 
strategy
+    // Covers all pages since every page has selected rows
+    let selectors: Vec<RowSelector> = (0..num_rows / 2)
+        .flat_map(|_| vec![RowSelector::select(1), RowSelector::skip(1)])
+        .collect();
+    let selection = RowSelection::from(selectors);
+
+    // Predicate 1 on filter_col: keeps only rows where filter_col == 0
+    // (first 100 and last 100 rows). After this, middle page is excluded.
+    let pred1 = ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr, 
[0]), |batch| {
+        let col = batch.column(0).as_primitive::<Int32Type>();
+        Ok(BooleanArray::from_iter(
+            col.iter().map(|v| v.map(|val| val == 0)),
+        ))
+    });
+
+    // Predicate 2 on value_col: keeps rows where value_col < 250
+    // This column is fetched AFTER predicate 1 narrows the selection.
+    // Its sparse data will be missing the middle page.
+    let pred2 = ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr, 
[1]), |batch| {
+        let col = batch.column(0).as_primitive::<Int32Type>();
+        Ok(BooleanArray::from_iter(
+            col.iter().map(|v| v.map(|val| val < 250)),
+        ))
+    });
+
+    let row_filter = RowFilter::new(vec![Box::new(pred1), Box::new(pred2)]);
+
+    // Output projection: both columns
+    let projection = ProjectionMask::roots(&schema_descr, [0, 1]);
+
+    let stream = builder
+        .with_row_filter(row_filter)
+        .with_row_selection(selection)
+        .with_projection(projection)
+        .with_max_predicate_cache_size(0)
+        .build()
+        .unwrap();
+
+    // Without the fix, this panics with:
+    // "Invalid offset in sparse column chunk data: ..., no matching page 
found."
+    let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
+    let batch = concat_batches(&batches[0].schema(), &batches).unwrap();
+
+    // Verify results: rows where filter_col==0 AND value_col<250 AND original 
alternating selection
+    // That's even-indexed rows in [0,100) with value<250 → rows 0,2,4,...,98 
(50 rows)
+    // Plus even-indexed rows in [200,250) with value<250 → rows 
200,202,...,248 (25 rows)
+    assert_eq!(batch.num_rows(), 75);
+}

Reply via email to