This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 420b4e2f Table Scan: Add Row Selection Filtering (#565)
420b4e2f is described below

commit 420b4e2fb476b621862327f7eb02031fe292d755
Author: Scott Donnelly <sc...@donnel.ly>
AuthorDate: Tue Sep 24 08:20:23 2024 +0100

    Table Scan: Add Row Selection Filtering (#565)
    
    * feat(scan): add row selection capability via PageIndexEvaluator
    
    * test(row-selection): add first few row selection tests
    
    * feat(scan): add more tests, fix bug where min/max args swapped
    
    * fix: ad test and fix for logic bug in PageIndexEvaluator in-clause handler
    
    * feat: changes suggested from PR review
---
 crates/iceberg/Cargo.toml                          |    1 +
 crates/iceberg/src/arrow/reader.rs                 |   95 +-
 crates/iceberg/src/expr/visitors/mod.rs            |    1 +
 .../src/expr/visitors/page_index_evaluator.rs      | 1498 ++++++++++++++++++++
 crates/iceberg/src/scan.rs                         |   26 +-
 5 files changed, 1617 insertions(+), 4 deletions(-)

diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 6166d360..4d016094 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -67,6 +67,7 @@ opendal = { workspace = true }
 ordered-float = { workspace = true }
 parquet = { workspace = true, features = ["async"] }
 paste = { workspace = true }
+rand = { workspace = true }
 reqwest = { workspace = true }
 rust_decimal = { workspace = true }
 serde = { workspace = true }
diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index 59294554..10f97153 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -32,7 +32,7 @@ use fnv::FnvHashSet;
 use futures::channel::mpsc::{channel, Sender};
 use futures::future::BoxFuture;
 use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
-use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, 
RowFilter};
+use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, 
RowFilter, RowSelection};
 use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
 use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, 
PARQUET_FIELD_ID_META_KEY};
 use parquet::file::metadata::ParquetMetaData;
@@ -41,6 +41,7 @@ use parquet::schema::types::{SchemaDescriptor, Type as 
ParquetType};
 use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
 use crate::error::Result;
 use crate::expr::visitors::bound_predicate_visitor::{visit, 
BoundPredicateVisitor};
+use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
 use 
crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
 use crate::expr::{BoundPredicate, BoundReference};
 use crate::io::{FileIO, FileMetadata, FileRead};
@@ -56,6 +57,7 @@ pub struct ArrowReaderBuilder {
     file_io: FileIO,
     concurrency_limit_data_files: usize,
     row_group_filtering_enabled: bool,
+    row_selection_enabled: bool,
 }
 
 impl ArrowReaderBuilder {
@@ -68,6 +70,7 @@ impl ArrowReaderBuilder {
             file_io,
             concurrency_limit_data_files: num_cpus,
             row_group_filtering_enabled: true,
+            row_selection_enabled: false,
         }
     }
 
@@ -90,6 +93,12 @@ impl ArrowReaderBuilder {
         self
     }
 
+    /// Determines whether to enable row selection.
+    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) 
-> Self {
+        self.row_selection_enabled = row_selection_enabled;
+        self
+    }
+
     /// Build the ArrowReader.
     pub fn build(self) -> ArrowReader {
         ArrowReader {
@@ -97,6 +106,7 @@ impl ArrowReaderBuilder {
             file_io: self.file_io,
             concurrency_limit_data_files: self.concurrency_limit_data_files,
             row_group_filtering_enabled: self.row_group_filtering_enabled,
+            row_selection_enabled: self.row_selection_enabled,
         }
     }
 }
@@ -111,6 +121,7 @@ pub struct ArrowReader {
     concurrency_limit_data_files: usize,
 
     row_group_filtering_enabled: bool,
+    row_selection_enabled: bool,
 }
 
 impl ArrowReader {
@@ -121,6 +132,7 @@ impl ArrowReader {
         let batch_size = self.batch_size;
         let concurrency_limit_data_files = self.concurrency_limit_data_files;
         let row_group_filtering_enabled = self.row_group_filtering_enabled;
+        let row_selection_enabled = self.row_selection_enabled;
 
         let (tx, rx) = channel(concurrency_limit_data_files);
         let mut channel_for_error = tx.clone();
@@ -142,6 +154,7 @@ impl ArrowReader {
                                         file_io,
                                         tx,
                                         row_group_filtering_enabled,
+                                        row_selection_enabled,
                                     )
                                     .await
                                 })
@@ -168,6 +181,7 @@ impl ArrowReader {
         file_io: FileIO,
         mut tx: Sender<Result<RecordBatch>>,
         row_group_filtering_enabled: bool,
+        row_selection_enabled: bool,
     ) -> Result<()> {
         // Get the metadata for the Parquet file we need to read and build
         // a reader for the data within
@@ -176,11 +190,12 @@ impl ArrowReader {
             try_join!(parquet_file.metadata(), parquet_file.reader())?;
         let parquet_file_reader = ArrowFileReader::new(parquet_metadata, 
parquet_reader);
 
+        let should_load_page_index = row_selection_enabled && 
task.predicate().is_some();
+
         // Start creating the record batch stream, which wraps the parquet 
file reader
         let mut record_batch_stream_builder = 
ParquetRecordBatchStreamBuilder::new_with_options(
             parquet_file_reader,
-            // Page index will be required in upcoming row selection PR
-            ArrowReaderOptions::new().with_page_index(false),
+            ArrowReaderOptions::new().with_page_index(should_load_page_index),
         )
         .await?;
 
@@ -224,6 +239,19 @@ impl ArrowReader {
                 selected_row_groups = Some(result);
             }
 
+            if row_selection_enabled {
+                let row_selection = Self::get_row_selection(
+                    predicate,
+                    record_batch_stream_builder.metadata(),
+                    &selected_row_groups,
+                    &field_id_map,
+                    task.schema(),
+                )?;
+
+                record_batch_stream_builder =
+                    
record_batch_stream_builder.with_row_selection(row_selection);
+            }
+
             if let Some(selected_row_groups) = selected_row_groups {
                 record_batch_stream_builder =
                     
record_batch_stream_builder.with_row_groups(selected_row_groups);
@@ -377,6 +405,67 @@ impl ArrowReader {
 
         Ok(results)
     }
+
+    fn get_row_selection(
+        predicate: &BoundPredicate,
+        parquet_metadata: &Arc<ParquetMetaData>,
+        selected_row_groups: &Option<Vec<usize>>,
+        field_id_map: &HashMap<i32, usize>,
+        snapshot_schema: &Schema,
+    ) -> Result<RowSelection> {
+        let Some(column_index) = parquet_metadata.column_index() else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Parquet file metadata does not contain a column index",
+            ));
+        };
+
+        let Some(offset_index) = parquet_metadata.offset_index() else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Parquet file metadata does not contain an offset index",
+            ));
+        };
+
+        let mut selected_row_groups_idx = 0;
+
+        let page_index = column_index
+            .iter()
+            .enumerate()
+            .zip(offset_index)
+            .zip(parquet_metadata.row_groups());
+
+        let mut results = Vec::new();
+        for (((idx, column_index), offset_index), row_group_metadata) in 
page_index {
+            if let Some(selected_row_groups) = selected_row_groups {
+                // skip row groups that aren't present in selected_row_groups
+                if idx == selected_row_groups[selected_row_groups_idx] {
+                    selected_row_groups_idx += 1;
+                } else {
+                    continue;
+                }
+            }
+
+            let selections_for_page = PageIndexEvaluator::eval(
+                predicate,
+                column_index,
+                offset_index,
+                row_group_metadata,
+                field_id_map,
+                snapshot_schema,
+            )?;
+
+            results.push(selections_for_page);
+
+            if let Some(selected_row_groups) = selected_row_groups {
+                if selected_row_groups_idx == selected_row_groups.len() {
+                    break;
+                }
+            }
+        }
+
+        Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
+    }
 }
 
 /// Build the map of parquet field id to Parquet column index in the schema.
diff --git a/crates/iceberg/src/expr/visitors/mod.rs 
b/crates/iceberg/src/expr/visitors/mod.rs
index 06bfd8cd..69ddf4bb 100644
--- a/crates/iceberg/src/expr/visitors/mod.rs
+++ b/crates/iceberg/src/expr/visitors/mod.rs
@@ -20,4 +20,5 @@ pub(crate) mod expression_evaluator;
 pub(crate) mod inclusive_metrics_evaluator;
 pub(crate) mod inclusive_projection;
 pub(crate) mod manifest_evaluator;
+pub(crate) mod page_index_evaluator;
 pub(crate) mod row_group_metrics_evaluator;
diff --git a/crates/iceberg/src/expr/visitors/page_index_evaluator.rs 
b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs
new file mode 100644
index 00000000..e8c1849a
--- /dev/null
+++ b/crates/iceberg/src/expr/visitors/page_index_evaluator.rs
@@ -0,0 +1,1498 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Evaluates predicates against a Parquet Page Index
+
+use std::collections::HashMap;
+
+use fnv::FnvHashSet;
+use ordered_float::OrderedFloat;
+use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+use parquet::file::metadata::RowGroupMetaData;
+use parquet::file::page_index::index::Index;
+use parquet::format::PageLocation;
+
+use crate::expr::visitors::bound_predicate_visitor::{visit, 
BoundPredicateVisitor};
+use crate::expr::{BoundPredicate, BoundReference};
+use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType, Schema};
+use crate::{Error, ErrorKind, Result};
+
+type OffsetIndex = Vec<Vec<PageLocation>>;
+
+const IN_PREDICATE_LIMIT: usize = 200;
+
+enum MissingColBehavior {
+    CantMatch,
+    MightMatch,
+}
+
+enum PageNullCount {
+    AllNull,
+    NoneNull,
+    SomeNull,
+    Unknown,
+}
+
+impl PageNullCount {
+    fn from_row_and_null_counts(num_rows: usize, null_count: Option<i64>) -> 
Self {
+        match (num_rows, null_count) {
+            (x, Some(y)) if x == y as usize => PageNullCount::AllNull,
+            (_, Some(0)) => PageNullCount::NoneNull,
+            (_, Some(_)) => PageNullCount::SomeNull,
+            _ => PageNullCount::Unknown,
+        }
+    }
+}
+
+pub(crate) struct PageIndexEvaluator<'a> {
+    column_index: &'a [Index],
+    offset_index: &'a OffsetIndex,
+    row_group_metadata: &'a RowGroupMetaData,
+    iceberg_field_id_to_parquet_column_index: &'a HashMap<i32, usize>,
+    snapshot_schema: &'a Schema,
+}
+
+impl<'a> PageIndexEvaluator<'a> {
+    pub(crate) fn new(
+        column_index: &'a [Index],
+        offset_index: &'a OffsetIndex,
+        row_group_metadata: &'a RowGroupMetaData,
+        field_id_map: &'a HashMap<i32, usize>,
+        snapshot_schema: &'a Schema,
+    ) -> Self {
+        Self {
+            column_index,
+            offset_index,
+            row_group_metadata,
+            iceberg_field_id_to_parquet_column_index: field_id_map,
+            snapshot_schema,
+        }
+    }
+
+    /// Evaluate this `PageIndexEvaluator`'s filter predicate against a
+    /// specific page's column index entry in a parquet file's page index.
+    /// [`ArrowReader`] uses the resulting [`RowSelection`] to reject
+    /// pages within a parquet file's row group that cannot contain rows
+    /// matching the filter predicate.
+    pub(crate) fn eval(
+        filter: &'a BoundPredicate,
+        column_index: &'a [Index],
+        offset_index: &'a OffsetIndex,
+        row_group_metadata: &'a RowGroupMetaData,
+        field_id_map: &'a HashMap<i32, usize>,
+        snapshot_schema: &'a Schema,
+    ) -> Result<Vec<RowSelector>> {
+        if row_group_metadata.num_rows() == 0 {
+            return Ok(vec![]);
+        }
+
+        let mut evaluator = Self::new(
+            column_index,
+            offset_index,
+            row_group_metadata,
+            field_id_map,
+            snapshot_schema,
+        );
+
+        Ok(visit(&mut evaluator, filter)?.iter().copied().collect())
+    }
+
+    fn select_all_rows(&self) -> Result<RowSelection> {
+        Ok(vec![RowSelector::select(
+            self.row_group_metadata.num_rows() as usize
+        )]
+        .into())
+    }
+
+    fn skip_all_rows(&self) -> Result<RowSelection> {
+        Ok(vec![RowSelector::skip(
+            self.row_group_metadata.num_rows() as usize
+        )]
+        .into())
+    }
+
+    fn calc_row_selection<F>(
+        &self,
+        field_id: i32,
+        predicate: F,
+        missing_col_behavior: MissingColBehavior,
+    ) -> Result<RowSelection>
+    where
+        F: Fn(Option<Datum>, Option<Datum>, PageNullCount) -> Result<bool>,
+    {
+        let Some(&parquet_column_index) =
+            self.iceberg_field_id_to_parquet_column_index.get(&field_id)
+        else {
+            // if the snapshot's column is not present in the row group,
+            // exit early
+            return match missing_col_behavior {
+                MissingColBehavior::CantMatch => self.skip_all_rows(),
+                MissingColBehavior::MightMatch => self.select_all_rows(),
+            };
+        };
+
+        let Some(field) = self.snapshot_schema.field_by_id(field_id) else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                format!("Field with id {} missing from snapshot schema", 
field_id),
+            ));
+        };
+
+        let Some(field_type) = field.field_type.as_primitive_type() else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                format!(
+                    "Field with id {} not convertible to primitive type",
+                    field_id
+                ),
+            ));
+        };
+
+        let Some(column_index) = self.column_index.get(parquet_column_index) 
else {
+            // This should not happen, but we fail soft anyway so that the 
scan is still
+            // successful, just a bit slower
+            return self.select_all_rows();
+        };
+
+        let Some(offset_index) = self.offset_index.get(parquet_column_index) 
else {
+            // if we have a column index, we should always have an offset 
index.
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                format!("Missing offset index for field id {}", field_id),
+            ));
+        };
+
+        // TODO: cache row_counts to avoid recalcing if the same column
+        //       appears multiple times in the filter predicate
+        let row_counts = self.calc_row_counts(offset_index);
+
+        let Some(page_filter) = Self::apply_predicate_to_column_index(
+            predicate,
+            field_type,
+            column_index,
+            &row_counts,
+        )?
+        else {
+            return self.select_all_rows();
+        };
+
+        let row_selectors: Vec<_> = row_counts
+            .iter()
+            .zip(page_filter.iter())
+            .map(|(&row_count, &is_selected)| {
+                if is_selected {
+                    RowSelector::select(row_count)
+                } else {
+                    RowSelector::skip(row_count)
+                }
+            })
+            .collect();
+
+        Ok(row_selectors.into())
+    }
+
+    /// returns a list of row counts per page
+    fn calc_row_counts(&self, offset_index: &[PageLocation]) -> Vec<usize> {
+        let mut remaining_rows = self.row_group_metadata.num_rows() as usize;
+        let mut row_counts = Vec::with_capacity(self.offset_index.len());
+
+        for (idx, page_location) in offset_index.iter().enumerate() {
+            let row_count = if idx < offset_index.len() - 1 {
+                let row_count = (offset_index[idx + 1].first_row_index
+                    - page_location.first_row_index) as usize;
+                remaining_rows -= row_count;
+                row_count
+            } else {
+                remaining_rows
+            };
+
+            row_counts.push(row_count);
+        }
+
+        row_counts
+    }
+
+    fn apply_predicate_to_column_index<F>(
+        predicate: F,
+        field_type: &PrimitiveType,
+        column_index: &Index,
+        row_counts: &[usize],
+    ) -> Result<Option<Vec<bool>>>
+    where
+        F: Fn(Option<Datum>, Option<Datum>, PageNullCount) -> Result<bool>,
+    {
+        let result: Result<Vec<bool>> = match column_index {
+            Index::NONE => {
+                return Ok(None);
+            }
+            Index::BOOLEAN(idx) => idx
+                .indexes
+                .iter()
+                .zip(row_counts.iter())
+                .map(|(item, &row_count)| {
+                    predicate(
+                        item.max.map(|val| {
+                            Datum::new(field_type.clone(), 
PrimitiveLiteral::Boolean(val))
+                        }),
+                        item.min.map(|val| {
+                            Datum::new(field_type.clone(), 
PrimitiveLiteral::Boolean(val))
+                        }),
+                        PageNullCount::from_row_and_null_counts(row_count, 
item.null_count),
+                    )
+                })
+                .collect(),
+            Index::INT32(idx) => idx
+                .indexes
+                .iter()
+                .zip(row_counts.iter())
+                .map(|(item, &row_count)| {
+                    predicate(
+                        item.max
+                            .map(|val| Datum::new(field_type.clone(), 
PrimitiveLiteral::Int(val))),
+                        item.min
+                            .map(|val| Datum::new(field_type.clone(), 
PrimitiveLiteral::Int(val))),
+                        PageNullCount::from_row_and_null_counts(row_count, 
item.null_count),
+                    )
+                })
+                .collect(),
+            Index::INT64(idx) => idx
+                .indexes
+                .iter()
+                .zip(row_counts.iter())
+                .map(|(item, &row_count)| {
+                    predicate(
+                        item.max
+                            .map(|val| Datum::new(field_type.clone(), 
PrimitiveLiteral::Long(val))),
+                        item.min
+                            .map(|val| Datum::new(field_type.clone(), 
PrimitiveLiteral::Long(val))),
+                        PageNullCount::from_row_and_null_counts(row_count, 
item.null_count),
+                    )
+                })
+                .collect(),
+            Index::FLOAT(idx) => idx
+                .indexes
+                .iter()
+                .zip(row_counts.iter())
+                .map(|(item, &row_count)| {
+                    predicate(
+                        item.min.map(|val| {
+                            Datum::new(
+                                field_type.clone(),
+                                
PrimitiveLiteral::Float(OrderedFloat::from(val)),
+                            )
+                        }),
+                        item.max.map(|val| {
+                            Datum::new(
+                                field_type.clone(),
+                                
PrimitiveLiteral::Float(OrderedFloat::from(val)),
+                            )
+                        }),
+                        PageNullCount::from_row_and_null_counts(row_count, 
item.null_count),
+                    )
+                })
+                .collect(),
+            Index::DOUBLE(idx) => idx
+                .indexes
+                .iter()
+                .zip(row_counts.iter())
+                .map(|(item, &row_count)| {
+                    predicate(
+                        item.max.map(|val| {
+                            Datum::new(
+                                field_type.clone(),
+                                
PrimitiveLiteral::Double(OrderedFloat::from(val)),
+                            )
+                        }),
+                        item.min.map(|val| {
+                            Datum::new(
+                                field_type.clone(),
+                                
PrimitiveLiteral::Double(OrderedFloat::from(val)),
+                            )
+                        }),
+                        PageNullCount::from_row_and_null_counts(row_count, 
item.null_count),
+                    )
+                })
+                .collect(),
+            Index::BYTE_ARRAY(idx) => idx
+                .indexes
+                .iter()
+                .zip(row_counts.iter())
+                .map(|(item, &row_count)| {
+                    predicate(
+                        item.min.clone().map(|val| {
+                            Datum::new(
+                                field_type.clone(),
+                                PrimitiveLiteral::String(
+                                    
String::from_utf8(val.data().to_vec()).unwrap(),
+                                ),
+                            )
+                        }),
+                        item.max.clone().map(|val| {
+                            Datum::new(
+                                field_type.clone(),
+                                PrimitiveLiteral::String(
+                                    
String::from_utf8(val.data().to_vec()).unwrap(),
+                                ),
+                            )
+                        }),
+                        PageNullCount::from_row_and_null_counts(row_count, 
item.null_count),
+                    )
+                })
+                .collect(),
+            Index::FIXED_LEN_BYTE_ARRAY(_) => {
+                return Err(Error::new(
+                    ErrorKind::FeatureUnsupported,
+                    "unsupported 'FIXED_LEN_BYTE_ARRAY' index type in 
column_index",
+                ))
+            }
+            Index::INT96(_) => {
+                return Err(Error::new(
+                    ErrorKind::FeatureUnsupported,
+                    "unsupported 'INT96' index type in column_index",
+                ))
+            }
+        };
+
+        Ok(Some(result?))
+    }
+
+    fn visit_inequality(
+        &mut self,
+        reference: &BoundReference,
+        datum: &Datum,
+        cmp_fn: fn(&Datum, &Datum) -> bool,
+        use_lower_bound: bool,
+    ) -> Result<RowSelection> {
+        let field_id = reference.field().id;
+
+        self.calc_row_selection(
+            field_id,
+            |min, max, null_count| {
+                if matches!(null_count, PageNullCount::AllNull) {
+                    return Ok(false);
+                }
+
+                if datum.is_nan() {
+                    // NaN indicates unreliable bounds.
+                    return Ok(true);
+                }
+
+                let bound = if use_lower_bound { min } else { max };
+
+                if let Some(bound) = bound {
+                    if cmp_fn(&bound, datum) {
+                        return Ok(true);
+                    }
+
+                    return Ok(false);
+                }
+
+                Ok(true)
+            },
+            MissingColBehavior::MightMatch,
+        )
+    }
+}
+
+impl BoundPredicateVisitor for PageIndexEvaluator<'_> {
+    type T = RowSelection;
+
+    fn always_true(&mut self) -> Result<RowSelection> {
+        self.select_all_rows()
+    }
+
+    fn always_false(&mut self) -> Result<RowSelection> {
+        self.skip_all_rows()
+    }
+
+    fn and(&mut self, lhs: RowSelection, rhs: RowSelection) -> 
Result<RowSelection> {
+        Ok(lhs.intersection(&rhs))
+    }
+
+    fn or(&mut self, lhs: RowSelection, rhs: RowSelection) -> 
Result<RowSelection> {
+        Ok(union_row_selections(&lhs, &rhs))
+    }
+
+    fn not(&mut self, _: RowSelection) -> Result<RowSelection> {
+        Err(Error::new(
+            ErrorKind::Unexpected,
+            "NOT unsupported at this point. NOT-rewrite should be performed 
first",
+        ))
+    }
+
+    fn is_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        let field_id = reference.field().id;
+
+        self.calc_row_selection(
+            field_id,
+            |_max, _min, null_count| Ok(!matches!(null_count, 
PageNullCount::NoneNull)),
+            MissingColBehavior::MightMatch,
+        )
+    }
+
+    fn not_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        let field_id = reference.field().id;
+
+        self.calc_row_selection(
+            field_id,
+            |_max, _min, null_count| Ok(!matches!(null_count, 
PageNullCount::AllNull)),
+            MissingColBehavior::CantMatch,
+        )
+    }
+
+    fn is_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        // NaN counts not present in ColumnChunkMetadata Statistics.
+        // Only float columns can be NaN.
+        if reference.field().field_type.is_floating_type() {
+            self.select_all_rows()
+        } else {
+            self.skip_all_rows()
+        }
+    }
+
+    fn not_nan(
+        &mut self,
+        _reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        // NaN counts not present in ColumnChunkMetadata Statistics
+        self.select_all_rows()
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        datum: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        self.visit_inequality(reference, datum, PartialOrd::lt, true)
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        datum: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        self.visit_inequality(reference, datum, PartialOrd::le, true)
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        datum: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        self.visit_inequality(reference, datum, PartialOrd::gt, false)
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        datum: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        self.visit_inequality(reference, datum, PartialOrd::ge, false)
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        datum: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        let field_id = reference.field().id;
+
+        self.calc_row_selection(
+            field_id,
+            |min, max, nulls| {
+                if matches!(nulls, PageNullCount::AllNull) {
+                    return Ok(false);
+                }
+
+                if let Some(min) = min {
+                    if min.gt(datum) {
+                        return Ok(false);
+                    }
+                }
+
+                if let Some(max) = max {
+                    if max.lt(datum) {
+                        return Ok(false);
+                    }
+                }
+
+                Ok(true)
+            },
+            MissingColBehavior::CantMatch,
+        )
+    }
+
+    fn not_eq(
+        &mut self,
+        _reference: &BoundReference,
+        _datum: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        // Because the bounds are not necessarily a min or max value,
+        // this cannot be answered using them. notEq(col, X) with (X, Y)
+        // doesn't guarantee that X is a value in col.
+        self.select_all_rows()
+    }
+
+    fn starts_with(
+        &mut self,
+        reference: &BoundReference,
+        datum: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        let field_id = reference.field().id;
+
+        let PrimitiveLiteral::String(datum) = datum.literal() else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Cannot use StartsWith operator on non-string values",
+            ));
+        };
+
+        self.calc_row_selection(
+            field_id,
+            |min, max, nulls| {
+                if matches!(nulls, PageNullCount::AllNull) {
+                    return Ok(false);
+                }
+
+                if let Some(lower_bound) = min {
+                    let PrimitiveLiteral::String(lower_bound) = 
lower_bound.literal() else {
+                        return Err(Error::new(
+                            ErrorKind::Unexpected,
+                            "Cannot use StartsWith operator on non-string 
lower_bound value",
+                        ));
+                    };
+
+                    let prefix_length = 
lower_bound.chars().count().min(datum.chars().count());
+
+                    // truncate lower bound so that its length
+                    // is not greater than the length of prefix
+                    let truncated_lower_bound =
+                        
lower_bound.chars().take(prefix_length).collect::<String>();
+                    if datum < &truncated_lower_bound {
+                        return Ok(false);
+                    }
+                }
+
+                if let Some(upper_bound) = max {
+                    let PrimitiveLiteral::String(upper_bound) = 
upper_bound.literal() else {
+                        return Err(Error::new(
+                            ErrorKind::Unexpected,
+                            "Cannot use StartsWith operator on non-string 
upper_bound value",
+                        ));
+                    };
+
+                    let prefix_length = 
upper_bound.chars().count().min(datum.chars().count());
+
+                    // truncate upper bound so that its length
+                    // is not greater than the length of prefix
+                    let truncated_upper_bound =
+                        
upper_bound.chars().take(prefix_length).collect::<String>();
+                    if datum > &truncated_upper_bound {
+                        return Ok(false);
+                    }
+                }
+
+                Ok(true)
+            },
+            MissingColBehavior::CantMatch,
+        )
+    }
+
+    fn not_starts_with(
+        &mut self,
+        reference: &BoundReference,
+        datum: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        let field_id = reference.field().id;
+
+        // notStartsWith will match unless all values must start with the 
prefix.
+        // This happens when the lower and upper bounds both start with the 
prefix.
+
+        let PrimitiveLiteral::String(prefix) = datum.literal() else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Cannot use StartsWith operator on non-string values",
+            ));
+        };
+
+        self.calc_row_selection(
+            field_id,
+            |min, max, nulls| {
+                if !matches!(nulls, PageNullCount::NoneNull) {
+                    return Ok(true);
+                }
+
+                let Some(lower_bound) = min else {
+                    return Ok(true);
+                };
+
+                let PrimitiveLiteral::String(lower_bound_str) = 
lower_bound.literal() else {
+                    return Err(Error::new(
+                        ErrorKind::Unexpected,
+                        "Cannot use NotStartsWith operator on non-string 
lower_bound value",
+                    ));
+                };
+
+                if lower_bound_str < prefix {
+                    // if lower is shorter than the prefix then lower doesn't 
start with the prefix
+                    return Ok(true);
+                }
+
+                let prefix_len = prefix.chars().count();
+
+                if 
lower_bound_str.chars().take(prefix_len).collect::<String>() == *prefix {
+                    // lower bound matches the prefix
+
+                    let Some(upper_bound) = max else {
+                        return Ok(true);
+                    };
+
+                    let PrimitiveLiteral::String(upper_bound) = 
upper_bound.literal() else {
+                        return Err(Error::new(
+                            ErrorKind::Unexpected,
+                            "Cannot use NotStartsWith operator on non-string 
upper_bound value",
+                        ));
+                    };
+
+                    // if upper is shorter than the prefix then upper can't 
start with the prefix
+                    if upper_bound.chars().count() < prefix_len {
+                        return Ok(true);
+                    }
+
+                    if 
upper_bound.chars().take(prefix_len).collect::<String>() == *prefix {
+                        // both bounds match the prefix, so all rows must 
match the
+                        // prefix and therefore do not satisfy the predicate
+                        return Ok(false);
+                    }
+                }
+
+                Ok(true)
+            },
+            MissingColBehavior::MightMatch,
+        )
+    }
+
+    fn r#in(
+        &mut self,
+        reference: &BoundReference,
+        literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        let field_id = reference.field().id;
+
+        if literals.len() > IN_PREDICATE_LIMIT {
+            // skip evaluating the predicate if the number of values is too big
+            return self.select_all_rows();
+        }
+        self.calc_row_selection(
+            field_id,
+            |min, max, nulls| {
+                if matches!(nulls, PageNullCount::AllNull) {
+                    return Ok(false);
+                }
+
+                match (min, max) {
+                    (Some(min), Some(max)) => {
+                        if literals
+                            .iter()
+                            .all(|datum| datum.lt(&min) || datum.gt(&max))
+                        {
+                            // if all values are outside the bounds, rows 
cannot match.
+                            return Ok(false);
+                        }
+                    }
+                    (Some(min), _) => {
+                        if !literals.iter().any(|datum| datum.ge(&min)) {
+                            // if none of the values are greater than the min 
bound, rows cant match
+                            return Ok(false);
+                        }
+                    }
+                    (_, Some(max)) => {
+                        if !literals.iter().any(|datum| datum.le(&max)) {
+                            // if all values are greater than upper bound, 
rows cannot match.
+                            return Ok(false);
+                        }
+                    }
+
+                    _ => {}
+                }
+
+                Ok(true)
+            },
+            MissingColBehavior::CantMatch,
+        )
+    }
+
+    fn not_in(
+        &mut self,
+        _reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<RowSelection> {
+        // Because the bounds are not necessarily a min or max value,
+        // this cannot be answered using them. notIn(col, {X, ...})
+        // with (X, Y) doesn't guarantee that X is a value in col.
+        self.select_all_rows()
+    }
+}
+
+/// Combine two lists of `RowSelection` return the union of them
+/// For example:
+/// self:      NNYYYYNNYYNYN
+/// other:     NYNNNNNNY
+///
+/// returned:  NYYYYYNNYYNYN
+///
+/// This can be removed from here once RowSelection::union is in parquet::arrow
+/// (Hopefully once https://github.com/apache/arrow-rs/pull/6308 gets merged)
+fn union_row_selections(left: &RowSelection, right: &RowSelection) -> 
RowSelection {
+    let mut l_iter = left.iter().copied().peekable();
+    let mut r_iter = right.iter().copied().peekable();
+
+    let iter = std::iter::from_fn(move || {
+        loop {
+            let l = l_iter.peek_mut();
+            let r = r_iter.peek_mut();
+
+            match (l, r) {
+                (Some(a), _) if a.row_count == 0 => {
+                    l_iter.next().unwrap();
+                }
+                (_, Some(b)) if b.row_count == 0 => {
+                    r_iter.next().unwrap();
+                }
+                (Some(l), Some(r)) => {
+                    return match (l.skip, r.skip) {
+                        // Skip both ranges
+                        (true, true) => {
+                            if l.row_count < r.row_count {
+                                let skip = l.row_count;
+                                r.row_count -= l.row_count;
+                                l_iter.next();
+                                Some(RowSelector::skip(skip))
+                            } else {
+                                let skip = r.row_count;
+                                l.row_count -= skip;
+                                r_iter.next();
+                                Some(RowSelector::skip(skip))
+                            }
+                        }
+                        // Keep rows from left
+                        (false, true) => {
+                            if l.row_count < r.row_count {
+                                r.row_count -= l.row_count;
+                                l_iter.next()
+                            } else {
+                                let r_row_count = r.row_count;
+                                l.row_count -= r_row_count;
+                                r_iter.next();
+                                Some(RowSelector::select(r_row_count))
+                            }
+                        }
+                        // Keep rows from right
+                        (true, false) => {
+                            if l.row_count < r.row_count {
+                                let l_row_count = l.row_count;
+                                r.row_count -= l_row_count;
+                                l_iter.next();
+                                Some(RowSelector::select(l_row_count))
+                            } else {
+                                l.row_count -= r.row_count;
+                                r_iter.next()
+                            }
+                        }
+                        // Keep at least one
+                        _ => {
+                            if l.row_count < r.row_count {
+                                r.row_count -= l.row_count;
+                                l_iter.next()
+                            } else {
+                                l.row_count -= r.row_count;
+                                r_iter.next()
+                            }
+                        }
+                    };
+                }
+                (Some(_), None) => return l_iter.next(),
+                (None, Some(_)) => return r_iter.next(),
+                (None, None) => return None,
+            }
+        }
+    });
+
+    iter.collect()
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+    use std::sync::Arc;
+
+    use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+    use parquet::basic::{LogicalType as ParquetLogicalType, Type as 
ParquetPhysicalType};
+    use parquet::data_type::ByteArray;
+    use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
+    use parquet::file::page_index::index::{Index, NativeIndex, PageIndex};
+    use parquet::file::statistics::Statistics;
+    use parquet::format::{BoundaryOrder, PageLocation};
+    use parquet::schema::types::{
+        ColumnDescriptor, ColumnPath, SchemaDescriptor, Type as 
parquetSchemaType,
+    };
+    use rand::{thread_rng, Rng};
+
+    use super::{union_row_selections, PageIndexEvaluator};
+    use crate::expr::{Bind, Reference};
+    use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
+    use crate::{ErrorKind, Result};
+
+    #[test]
+    fn test_union_row_selections() {
+        let selection = RowSelection::from(vec![RowSelector::select(1048576)]);
+        let result = union_row_selections(&selection, &selection);
+        assert_eq!(result, selection);
+
+        // NYNYY
+        let a = RowSelection::from(vec![
+            RowSelector::skip(10),
+            RowSelector::select(10),
+            RowSelector::skip(10),
+            RowSelector::select(20),
+        ]);
+
+        // NNYYN
+        let b = RowSelection::from(vec![
+            RowSelector::skip(20),
+            RowSelector::select(20),
+            RowSelector::skip(10),
+        ]);
+
+        let result = union_row_selections(&a, &b);
+
+        // NYYYY
+        assert_eq!(result.iter().collect::<Vec<_>>(), vec![
+            &RowSelector::skip(10),
+            &RowSelector::select(40)
+        ]);
+    }
+
+    #[test]
+    fn eval_matches_no_rows_for_empty_row_group() -> Result<()> {
+        let row_group_metadata = create_row_group_metadata(0, 0, None, 0, 
None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_float")
+            .greater_than(Datum::float(1.0))
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        )?;
+
+        let expected = vec![];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn eval_is_null_select_only_pages_with_nulls() -> Result<()> {
+        let row_group_metadata = create_row_group_metadata(4096, 1000, None, 
1000, None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_float")
+            .is_null()
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        )?;
+
+        let expected = vec![
+            RowSelector::select(1024),
+            RowSelector::skip(1024),
+            RowSelector::select(2048),
+        ];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn eval_is_not_null_dont_select_pages_with_all_nulls() -> Result<()> {
+        let row_group_metadata = create_row_group_metadata(4096, 1000, None, 
1000, None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_float")
+            .is_not_null()
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        )?;
+
+        let expected = vec![RowSelector::skip(1024), 
RowSelector::select(3072)];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn eval_is_nan_select_all() -> Result<()> {
+        let row_group_metadata = create_row_group_metadata(4096, 1000, None, 
1000, None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_float")
+            .is_nan()
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        )?;
+
+        let expected = vec![RowSelector::select(4096)];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn eval_not_nan_select_all() -> Result<()> {
+        let row_group_metadata = create_row_group_metadata(4096, 1000, None, 
1000, None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_float")
+            .is_not_nan()
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        )?;
+
+        let expected = vec![RowSelector::select(4096)];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn eval_inequality_nan_datum_all_rows_except_all_null_pages() -> 
Result<()> {
+        let row_group_metadata = create_row_group_metadata(4096, 1000, None, 
1000, None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_float")
+            .less_than(Datum::float(f32::NAN))
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        )?;
+
+        let expected = vec![RowSelector::skip(1024), 
RowSelector::select(3072)];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn eval_inequality_pages_containing_value_except_all_null_pages() -> 
Result<()> {
+        let row_group_metadata = create_row_group_metadata(4096, 1000, None, 
1000, None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_float")
+            .less_than(Datum::float(5.0))
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        )?;
+
+        let expected = vec![
+            RowSelector::skip(1024),
+            RowSelector::select(1024),
+            RowSelector::skip(1024),
+            RowSelector::select(1024),
+        ];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn eval_eq_pages_containing_value_except_all_null_pages() -> Result<()> {
+        let row_group_metadata = create_row_group_metadata(4096, 1000, None, 
1000, None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_float")
+            .equal_to(Datum::float(5.0))
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        )?;
+
+        let expected = vec![
+            RowSelector::skip(1024),
+            RowSelector::select(1024),
+            RowSelector::skip(1024),
+            RowSelector::select(1024),
+        ];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn eval_not_eq_all_rows() -> Result<()> {
+        let row_group_metadata = create_row_group_metadata(4096, 1000, None, 
1000, None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_float")
+            .not_equal_to(Datum::float(5.0))
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        )?;
+
+        let expected = vec![RowSelector::select(4096)];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn eval_starts_with_error_float_col() -> Result<()> {
+        let row_group_metadata = create_row_group_metadata(4096, 1000, None, 
1000, None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_float")
+            .starts_with(Datum::float(5.0))
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        );
+
+        assert_eq!(result.unwrap_err().kind(), ErrorKind::Unexpected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn eval_starts_with_pages_containing_value_except_all_null_pages() -> 
Result<()> {
+        let row_group_metadata = create_row_group_metadata(4096, 1000, None, 
1000, None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_string")
+            .starts_with(Datum::string("B"))
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        )?;
+
+        let expected = vec![
+            RowSelector::select(512),
+            RowSelector::skip(3536),
+            RowSelector::select(48),
+        ];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn 
eval_not_starts_with_pages_containing_value_except_pages_with_min_and_max_equal_to_prefix_and_all_null_pages(
+    ) -> Result<()> {
+        let row_group_metadata = create_row_group_metadata(4096, 1000, None, 
1000, None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_string")
+            .not_starts_with(Datum::string("DE"))
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        )?;
+
+        let expected = vec![
+            RowSelector::select(512),
+            RowSelector::skip(512),
+            RowSelector::select(3072),
+        ];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn eval_in_length_of_set_above_limit_all_rows() -> Result<()> {
+        let mut rng = thread_rng();
+
+        let row_group_metadata = create_row_group_metadata(4096, 1000, None, 
1000, None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_float")
+            .is_in(std::iter::repeat_with(|| 
Datum::float(rng.gen_range(0.0..10.0))).take(1000))
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        )?;
+
+        let expected = vec![RowSelector::select(4096)];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn eval_in_valid_set_size_some_rows() -> Result<()> {
+        let row_group_metadata = create_row_group_metadata(4096, 1000, None, 
1000, None)?;
+        let (column_index, offset_index) = create_page_index()?;
+
+        let (iceberg_schema_ref, field_id_map) = 
build_iceberg_schema_and_field_map()?;
+
+        let filter = Reference::new("col_string")
+            .is_in([Datum::string("AARDVARK"), Datum::string("ICEBERG")])
+            .bind(iceberg_schema_ref.clone(), false)?;
+
+        let result = PageIndexEvaluator::eval(
+            &filter,
+            &column_index,
+            &offset_index,
+            &row_group_metadata,
+            &field_id_map,
+            iceberg_schema_ref.as_ref(),
+        )?;
+
+        let expected = vec![
+            RowSelector::select(512),
+            RowSelector::skip(512),
+            RowSelector::select(2976),
+            RowSelector::skip(48),
+            RowSelector::select(48),
+        ];
+
+        assert_eq!(result, expected);
+
+        Ok(())
+    }
+
+    fn build_iceberg_schema_and_field_map() -> Result<(Arc<Schema>, 
HashMap<i32, usize>)> {
+        let iceberg_schema = Schema::builder()
+            .with_fields([
+                Arc::new(NestedField::new(
+                    1,
+                    "col_float",
+                    Type::Primitive(PrimitiveType::Float),
+                    false,
+                )),
+                Arc::new(NestedField::new(
+                    2,
+                    "col_string",
+                    Type::Primitive(PrimitiveType::String),
+                    false,
+                )),
+            ])
+            .build()?;
+        let iceberg_schema_ref = Arc::new(iceberg_schema);
+
+        let field_id_map = HashMap::from_iter([(1, 0), (2, 1)]);
+
+        Ok((iceberg_schema_ref, field_id_map))
+    }
+
+    fn build_parquet_schema_descriptor() -> Result<Arc<SchemaDescriptor>> {
+        let field_1 = Arc::new(
+            parquetSchemaType::primitive_type_builder("col_float", 
ParquetPhysicalType::FLOAT)
+                .with_id(Some(1))
+                .build()?,
+        );
+
+        let field_2 = Arc::new(
+            parquetSchemaType::primitive_type_builder(
+                "col_string",
+                ParquetPhysicalType::BYTE_ARRAY,
+            )
+            .with_id(Some(2))
+            .with_logical_type(Some(ParquetLogicalType::String))
+            .build()?,
+        );
+
+        let group_type = Arc::new(
+            parquetSchemaType::group_type_builder("all")
+                .with_id(Some(1000))
+                .with_fields(vec![field_1, field_2])
+                .build()?,
+        );
+
+        let schema_descriptor = SchemaDescriptor::new(group_type);
+        let schema_descriptor_arc = Arc::new(schema_descriptor);
+        Ok(schema_descriptor_arc)
+    }
+
+    fn create_row_group_metadata(
+        num_rows: i64,
+        col_1_num_vals: i64,
+        col_1_stats: Option<Statistics>,
+        col_2_num_vals: i64,
+        col_2_stats: Option<Statistics>,
+    ) -> Result<RowGroupMetaData> {
+        let schema_descriptor_arc = build_parquet_schema_descriptor()?;
+
+        let column_1_desc_ptr = Arc::new(ColumnDescriptor::new(
+            schema_descriptor_arc.column(0).self_type_ptr(),
+            1,
+            1,
+            ColumnPath::new(vec!["col_float".to_string()]),
+        ));
+
+        let column_2_desc_ptr = Arc::new(ColumnDescriptor::new(
+            schema_descriptor_arc.column(1).self_type_ptr(),
+            1,
+            1,
+            ColumnPath::new(vec!["col_string".to_string()]),
+        ));
+
+        let mut col_1_meta =
+            
ColumnChunkMetaData::builder(column_1_desc_ptr).set_num_values(col_1_num_vals);
+        if let Some(stats1) = col_1_stats {
+            col_1_meta = col_1_meta.set_statistics(stats1)
+        }
+
+        let mut col_2_meta =
+            
ColumnChunkMetaData::builder(column_2_desc_ptr).set_num_values(col_2_num_vals);
+        if let Some(stats2) = col_2_stats {
+            col_2_meta = col_2_meta.set_statistics(stats2)
+        }
+
+        let row_group_metadata = 
RowGroupMetaData::builder(schema_descriptor_arc)
+            .set_num_rows(num_rows)
+            .set_column_metadata(vec![
+                col_1_meta.build()?,
+                // .set_statistics(Statistics::float(None, None, None, 1, 
false))
+                col_2_meta.build()?,
+            ])
+            .build();
+
+        Ok(row_group_metadata?)
+    }
+
+    fn create_page_index() -> Result<(Vec<Index>, Vec<Vec<PageLocation>>)> {
+        let idx_float = Index::FLOAT(NativeIndex::<f32> {
+            indexes: vec![
+                PageIndex {
+                    min: None,
+                    max: None,
+                    null_count: Some(1024),
+                },
+                PageIndex {
+                    min: Some(0.0),
+                    max: Some(10.0),
+                    null_count: Some(0),
+                },
+                PageIndex {
+                    min: Some(10.0),
+                    max: Some(20.0),
+                    null_count: Some(1),
+                },
+                PageIndex {
+                    min: None,
+                    max: None,
+                    null_count: None,
+                },
+            ],
+            boundary_order: BoundaryOrder(0), // UNORDERED
+        });
+
+        let idx_string = Index::BYTE_ARRAY(NativeIndex::<ByteArray> {
+            indexes: vec![
+                PageIndex {
+                    min: Some("AA".into()),
+                    max: Some("DD".into()),
+                    null_count: Some(0),
+                },
+                PageIndex {
+                    min: Some("DE".into()),
+                    max: Some("DE".into()),
+                    null_count: Some(0),
+                },
+                PageIndex {
+                    min: Some("DF".into()),
+                    max: Some("UJ".into()),
+                    null_count: Some(1),
+                },
+                PageIndex {
+                    min: None,
+                    max: None,
+                    null_count: Some(48),
+                },
+                PageIndex {
+                    min: None,
+                    max: None,
+                    null_count: None,
+                },
+            ],
+            boundary_order: BoundaryOrder(0), // UNORDERED
+        });
+
+        let page_locs_float = vec![
+            PageLocation::new(0, 1024, 0),
+            PageLocation::new(1024, 1024, 1024),
+            PageLocation::new(2048, 1024, 2048),
+            PageLocation::new(3072, 1024, 3072),
+        ];
+
+        let page_locs_string = vec![
+            PageLocation::new(0, 512, 0),
+            PageLocation::new(512, 512, 512),
+            PageLocation::new(1024, 2976, 1024),
+            PageLocation::new(4000, 48, 4000),
+            PageLocation::new(4048, 48, 4048),
+        ];
+
+        Ok((vec![idx_float, idx_string], vec![
+            page_locs_float,
+            page_locs_string,
+        ]))
+    }
+}
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index bc7f10a0..0d8a4bf0 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -61,6 +61,7 @@ pub struct TableScanBuilder<'a> {
     concurrency_limit_manifest_entries: usize,
     concurrency_limit_manifest_files: usize,
     row_group_filtering_enabled: bool,
+    row_selection_enabled: bool,
 }
 
 impl<'a> TableScanBuilder<'a> {
@@ -78,6 +79,7 @@ impl<'a> TableScanBuilder<'a> {
             concurrency_limit_manifest_entries: num_cpus,
             concurrency_limit_manifest_files: num_cpus,
             row_group_filtering_enabled: true,
+            row_selection_enabled: false,
         }
     }
 
@@ -157,6 +159,25 @@ impl<'a> TableScanBuilder<'a> {
         self
     }
 
+    /// Determines whether to enable row selection.
+    /// When enabled, if a read is performed with a filter predicate,
+    /// then (for row groups that have not been skipped) the page index
+    /// for each row group in a parquet file is parsed and evaluated
+    /// against the filter predicate to determine if ranges of rows
+    /// within a row group can be skipped, based upon the page-level
+    /// statistics for each column.
+    ///
+    /// Defaults to being disabled. Enabling requires parsing the parquet page
+    /// index, which can be slow enough that parsing the page index outweighs 
any
+    /// gains from the reduced number of rows that need scanning.
+    /// It is recommended to experiment with partitioning, sorting, row group 
size,
+    /// page size, and page row limit Iceberg settings on the table being 
scanned in
+    /// order to get the best performance from using row selection.
+    pub fn with_row_selection_enabled(mut self, row_selection_enabled: bool) 
-> Self {
+        self.row_selection_enabled = row_selection_enabled;
+        self
+    }
+
     /// Build the table scan.
     pub fn build(self) -> Result<TableScan> {
         let snapshot = match self.snapshot_id {
@@ -268,6 +289,7 @@ impl<'a> TableScanBuilder<'a> {
             concurrency_limit_manifest_entries: 
self.concurrency_limit_manifest_entries,
             concurrency_limit_manifest_files: 
self.concurrency_limit_manifest_files,
             row_group_filtering_enabled: self.row_group_filtering_enabled,
+            row_selection_enabled: self.row_selection_enabled,
         })
     }
 }
@@ -292,6 +314,7 @@ pub struct TableScan {
     concurrency_limit_data_files: usize,
 
     row_group_filtering_enabled: bool,
+    row_selection_enabled: bool,
 }
 
 /// PlanContext wraps a [`SnapshotRef`] alongside all the other
@@ -378,7 +401,8 @@ impl TableScan {
     pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
         let mut arrow_reader_builder = 
ArrowReaderBuilder::new(self.file_io.clone())
             
.with_data_file_concurrency_limit(self.concurrency_limit_data_files)
-            
.with_row_group_filtering_enabled(self.row_group_filtering_enabled);
+            .with_row_group_filtering_enabled(self.row_group_filtering_enabled)
+            .with_row_selection_enabled(self.row_selection_enabled);
 
         if let Some(batch_size) = self.batch_size {
             arrow_reader_builder = 
arrow_reader_builder.with_batch_size(batch_size);

Reply via email to