liurenjie1024 commented on code in PR #295:
URL: https://github.com/apache/iceberg-rust/pull/295#discussion_r1582163907


##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +221,637 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| {
+                    field_id_map.get(field_id).cloned().ok_or_else(|| {
+                        Error::new(ErrorKind::DataInvalid, "Field id not found 
in schema")
+                    })
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                columns: &column_indices,
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices.clone()),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column {:?} in schema doesn't have field 
id",

Review Comment:
   How about adding `basic_info.name` and index into error message to make it 
better to debug?



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +221,637 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| {
+                    field_id_map.get(field_id).cloned().ok_or_else(|| {
+                        Error::new(ErrorKind::DataInvalid, "Field id not found 
in schema")
+                    })
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                columns: &column_indices,
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices.clone()),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column {:?} in schema doesn't have field 
id",
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: Vec<i32>,

Review Comment:
   Should this be a set?



##########
crates/iceberg/src/scan.rs:
##########
@@ -689,4 +720,90 @@ mod tests {
         let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
         assert_eq!(int64_arr.value(0), 3);
     }
+
+    #[tokio::test]
+    async fn test_filter_on_arrow_lt() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Filter: y < 3
+        let mut builder = fixture.table.scan();
+        let predicate = Reference::new("y").less_than(Datum::long(3));
+        builder = builder.filter(predicate);
+        let table_scan = builder.build().unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        assert_eq!(batches[0].num_rows(), 512);
+
+        let col = batches[0].column_by_name("x").unwrap();
+        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
+        assert_eq!(int64_arr.value(0), 1);
+
+        let col = batches[0].column_by_name("y").unwrap();
+        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
+        assert_eq!(int64_arr.value(0), 2);
+    }
+
+    #[tokio::test]
+    async fn test_filter_on_arrow_gt_eq() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Filter: y >= 5
+        let mut builder = fixture.table.scan();
+        let predicate = 
Reference::new("y").greater_than_or_equal_to(Datum::long(5));
+        builder = builder.filter(predicate);
+        let table_scan = builder.build().unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        assert_eq!(batches[0].num_rows(), 12);
+
+        let col = batches[0].column_by_name("x").unwrap();
+        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
+        assert_eq!(int64_arr.value(0), 1);
+
+        let col = batches[0].column_by_name("y").unwrap();
+        let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
+        assert_eq!(int64_arr.value(0), 5);
+    }
+
+    #[tokio::test]
+    async fn test_filter_on_arrow_is_null() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Filter: y is null
+        let mut builder = fixture.table.scan();
+        let predicate = Reference::new("y").is_null();
+        builder = builder.filter(predicate);
+        let table_scan = builder.build().unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+        assert_eq!(batches.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn test_filter_on_arrow_is_not_null() {

Review Comment:
   Thanks for the tests, is it possible to add serveral test cases for more 
complex types such as `AND`, `OR`?



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +221,637 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| {
+                    field_id_map.get(field_id).cloned().ok_or_else(|| {
+                        Error::new(ErrorKind::DataInvalid, "Field id not found 
in schema")
+                    })
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                columns: &column_indices,
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices.clone()),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column {:?} in schema doesn't have field 
id",
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: Vec<i32>,
+}
+
+impl BoundPredicateVisitor for CollectFieldIdVisitor {
+    type T = ();
+
+    fn always_true(&mut self) -> Result<Self::T> {

Review Comment:
   As we have discussed before, it's better to use concrete types rather than 
`Self::T` to make it easier to read.



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +221,637 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| {
+                    field_id_map.get(field_id).cloned().ok_or_else(|| {
+                        Error::new(ErrorKind::DataInvalid, "Field id not found 
in schema")
+                    })
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                columns: &column_indices,
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices.clone()),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column {:?} in schema doesn't have field 
id",
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: Vec<i32>,
+}
+
+impl BoundPredicateVisitor for CollectFieldIdVisitor {
+    type T = ();
+
+    fn always_true(&mut self) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn always_false(&mut self) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn and(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn or(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn not(&mut self, _inner: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn is_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn is_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn r#in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+}
+
+/// A visitor to convert Iceberg bound predicates to Arrow predicates.
+struct PredicateConverter<'a> {
+    /// The leaf column indices used in the predicates.
+    pub columns: &'a Vec<usize>,
+    /// The projection mask for the Arrow predicates.
+    pub projection_mask: ProjectionMask,
+    /// The Parquet schema descriptor.
+    pub parquet_schema: &'a SchemaDescriptor,
+    /// The map between field id and leaf column index in Parquet schema.
+    pub column_map: &'a HashMap<i32, usize>,
+}
+
+impl PredicateConverter<'_> {
+    /// When visiting a bound reference, we return the projection mask for the 
leaf column
+    /// which is used to project the column in the record batch.
+    fn bound_reference(&mut self, reference: &BoundReference) -> 
Result<ProjectionMask> {
+        // The leaf column's index in Parquet schema.
+        let column_idx = 
self.column_map.get(&reference.field().id).ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("Field id {} not found in schema", 
reference.field().id),
+            )
+        })?;
+
+        // Find the column index in projection mask.
+        let column_idx = self
+            .columns
+            .iter()

Review Comment:
   Why not just `Vec::get` ? This iteration needs to go through all columns.



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +221,637 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| {
+                    field_id_map.get(field_id).cloned().ok_or_else(|| {
+                        Error::new(ErrorKind::DataInvalid, "Field id not found 
in schema")
+                    })
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                columns: &column_indices,
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices.clone()),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column {:?} in schema doesn't have field 
id",
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: Vec<i32>,
+}
+
+impl BoundPredicateVisitor for CollectFieldIdVisitor {
+    type T = ();
+
+    fn always_true(&mut self) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn always_false(&mut self) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn and(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn or(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn not(&mut self, _inner: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn is_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn is_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn r#in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+}
+
+/// A visitor to convert Iceberg bound predicates to Arrow predicates.
+struct PredicateConverter<'a> {
+    /// The leaf column indices used in the predicates.
+    pub columns: &'a Vec<usize>,
+    /// The projection mask for the Arrow predicates.
+    pub projection_mask: ProjectionMask,
+    /// The Parquet schema descriptor.
+    pub parquet_schema: &'a SchemaDescriptor,
+    /// The map between field id and leaf column index in Parquet schema.
+    pub column_map: &'a HashMap<i32, usize>,
+}
+
+impl PredicateConverter<'_> {
+    /// When visiting a bound reference, we return the projection mask for the 
leaf column
+    /// which is used to project the column in the record batch.
+    fn bound_reference(&mut self, reference: &BoundReference) -> 
Result<ProjectionMask> {
+        // The leaf column's index in Parquet schema.
+        let column_idx = 
self.column_map.get(&reference.field().id).ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("Field id {} not found in schema", 
reference.field().id),
+            )
+        })?;
+
+        // Find the column index in projection mask.
+        let column_idx = self
+            .columns
+            .iter()
+            .position(|&x| x == *column_idx)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Column index {} not found in schema", 
*column_idx),
+                )
+            })?;
+
+        Ok(ProjectionMask::leaves(
+            self.parquet_schema,
+            vec![self.columns[column_idx]],
+        ))
+    }
+}
+
+fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send>> {
+    match datum.literal() {
+        PrimitiveLiteral::Boolean(value) => 
Ok(Box::new(BooleanArray::new_scalar(*value))),
+        PrimitiveLiteral::Int(value) => 
Ok(Box::new(Int32Array::new_scalar(*value))),
+        PrimitiveLiteral::Long(value) => 
Ok(Box::new(Int64Array::new_scalar(*value))),
+        PrimitiveLiteral::Float(value) => 
Ok(Box::new(Float32Array::new_scalar(value.as_f32()))),
+        PrimitiveLiteral::Double(value) => 
Ok(Box::new(Float64Array::new_scalar(value.as_f64()))),
+        l => Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!("Unsupported literal type: {:?}", l),

Review Comment:
   ```suggestion
               ErrorKind::FeatureUnsupported,
               format!("Converting datum from type {:?} to arrow not supported 
yet.", l),
   ```



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +221,637 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| {
+                    field_id_map.get(field_id).cloned().ok_or_else(|| {
+                        Error::new(ErrorKind::DataInvalid, "Field id not found 
in schema")
+                    })
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                columns: &column_indices,
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices.clone()),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column {:?} in schema doesn't have field 
id",
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: Vec<i32>,
+}
+
+impl BoundPredicateVisitor for CollectFieldIdVisitor {
+    type T = ();
+
+    fn always_true(&mut self) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn always_false(&mut self) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn and(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn or(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn not(&mut self, _inner: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn is_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn is_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn r#in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+}
+
+/// A visitor to convert Iceberg bound predicates to Arrow predicates.
+struct PredicateConverter<'a> {
+    /// The leaf column indices used in the predicates.
+    pub columns: &'a Vec<usize>,
+    /// The projection mask for the Arrow predicates.
+    pub projection_mask: ProjectionMask,
+    /// The Parquet schema descriptor.
+    pub parquet_schema: &'a SchemaDescriptor,
+    /// The map between field id and leaf column index in Parquet schema.
+    pub column_map: &'a HashMap<i32, usize>,
+}
+
+impl PredicateConverter<'_> {
+    /// When visiting a bound reference, we return the projection mask for the 
leaf column
+    /// which is used to project the column in the record batch.
+    fn bound_reference(&mut self, reference: &BoundReference) -> 
Result<ProjectionMask> {
+        // The leaf column's index in Parquet schema.
+        let column_idx = 
self.column_map.get(&reference.field().id).ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("Field id {} not found in schema", 
reference.field().id),
+            )
+        })?;
+
+        // Find the column index in projection mask.
+        let column_idx = self
+            .columns
+            .iter()
+            .position(|&x| x == *column_idx)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Column index {} not found in schema", 
*column_idx),
+                )
+            })?;
+
+        Ok(ProjectionMask::leaves(
+            self.parquet_schema,
+            vec![self.columns[column_idx]],
+        ))
+    }
+}
+
+fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send>> {

Review Comment:
   Do you mind to move this to arrow module? I guess this would be quite useful 
and we could add more type support late.



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +221,637 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| {
+                    field_id_map.get(field_id).cloned().ok_or_else(|| {
+                        Error::new(ErrorKind::DataInvalid, "Field id not found 
in schema")
+                    })
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                columns: &column_indices,
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices.clone()),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column {:?} in schema doesn't have field 
id",
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: Vec<i32>,
+}
+
+impl BoundPredicateVisitor for CollectFieldIdVisitor {
+    type T = ();
+
+    fn always_true(&mut self) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn always_false(&mut self) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn and(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn or(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn not(&mut self, _inner: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn is_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn is_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn r#in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+}
+
+/// A visitor to convert Iceberg bound predicates to Arrow predicates.
+struct PredicateConverter<'a> {
+    /// The leaf column indices used in the predicates.
+    pub columns: &'a Vec<usize>,
+    /// The projection mask for the Arrow predicates.
+    pub projection_mask: ProjectionMask,
+    /// The Parquet schema descriptor.
+    pub parquet_schema: &'a SchemaDescriptor,
+    /// The map between field id and leaf column index in Parquet schema.
+    pub column_map: &'a HashMap<i32, usize>,
+}
+
+impl PredicateConverter<'_> {
+    /// When visiting a bound reference, we return the projection mask for the 
leaf column
+    /// which is used to project the column in the record batch.
+    fn bound_reference(&mut self, reference: &BoundReference) -> 
Result<ProjectionMask> {
+        // The leaf column's index in Parquet schema.
+        let column_idx = 
self.column_map.get(&reference.field().id).ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("Field id {} not found in schema", 
reference.field().id),
+            )
+        })?;
+
+        // Find the column index in projection mask.
+        let column_idx = self
+            .columns
+            .iter()
+            .position(|&x| x == *column_idx)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Column index {} not found in schema", 
*column_idx),
+                )
+            })?;
+
+        Ok(ProjectionMask::leaves(
+            self.parquet_schema,
+            vec![self.columns[column_idx]],
+        ))
+    }
+}
+
+fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send>> {
+    match datum.literal() {
+        PrimitiveLiteral::Boolean(value) => 
Ok(Box::new(BooleanArray::new_scalar(*value))),
+        PrimitiveLiteral::Int(value) => 
Ok(Box::new(Int32Array::new_scalar(*value))),
+        PrimitiveLiteral::Long(value) => 
Ok(Box::new(Int64Array::new_scalar(*value))),
+        PrimitiveLiteral::Float(value) => 
Ok(Box::new(Float32Array::new_scalar(value.as_f32()))),
+        PrimitiveLiteral::Double(value) => 
Ok(Box::new(Float64Array::new_scalar(value.as_f64()))),
+        l => Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!("Unsupported literal type: {:?}", l),
+        )),
+    }
+}
+
+/// Recursively get the leaf column from the record batch. Assume that the 
nested columns in
+/// struct is projected to a single column.
+fn get_leaf_column(column: &ArrayRef) -> std::result::Result<ArrayRef, 
ArrowError> {
+    match column.data_type() {
+        DataType::Struct(fields) => {
+            if fields.len() != 1 {
+                return Err(ArrowError::SchemaError(
+                    "Struct column should have only one field after projection"
+                        .parse()
+                        .unwrap(),
+                ));
+            }
+            let struct_array = 
column.as_any().downcast_ref::<StructArray>().unwrap();
+            get_leaf_column(struct_array.column(0))
+        }
+        _ => Ok(column.clone()),
+    }
+}
+
+impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
+    type T = Box<dyn ArrowPredicate>;
+
+    fn always_true(&mut self) -> Result<Self::T> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])),

Review Comment:
   This maybe not a blocker, but is it possible to build a const array in arrow?



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +221,637 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| {
+                    field_id_map.get(field_id).cloned().ok_or_else(|| {
+                        Error::new(ErrorKind::DataInvalid, "Field id not found 
in schema")
+                    })
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                columns: &column_indices,
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices.clone()),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column {:?} in schema doesn't have field 
id",
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: Vec<i32>,
+}
+
+impl BoundPredicateVisitor for CollectFieldIdVisitor {
+    type T = ();
+
+    fn always_true(&mut self) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn always_false(&mut self) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn and(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn or(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn not(&mut self, _inner: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn is_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn is_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn r#in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+}
+
+/// A visitor to convert Iceberg bound predicates to Arrow predicates.
+struct PredicateConverter<'a> {
+    /// The leaf column indices used in the predicates.
+    pub columns: &'a Vec<usize>,
+    /// The projection mask for the Arrow predicates.
+    pub projection_mask: ProjectionMask,
+    /// The Parquet schema descriptor.
+    pub parquet_schema: &'a SchemaDescriptor,
+    /// The map between field id and leaf column index in Parquet schema.
+    pub column_map: &'a HashMap<i32, usize>,
+}
+
+impl PredicateConverter<'_> {
+    /// When visiting a bound reference, we return the projection mask for the 
leaf column
+    /// which is used to project the column in the record batch.
+    fn bound_reference(&mut self, reference: &BoundReference) -> 
Result<ProjectionMask> {
+        // The leaf column's index in Parquet schema.
+        let column_idx = 
self.column_map.get(&reference.field().id).ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("Field id {} not found in schema", 
reference.field().id),
+            )
+        })?;
+
+        // Find the column index in projection mask.
+        let column_idx = self
+            .columns
+            .iter()
+            .position(|&x| x == *column_idx)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Column index {} not found in schema", 
*column_idx),
+                )
+            })?;
+
+        Ok(ProjectionMask::leaves(
+            self.parquet_schema,
+            vec![self.columns[column_idx]],
+        ))
+    }
+}
+
+fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send>> {
+    match datum.literal() {
+        PrimitiveLiteral::Boolean(value) => 
Ok(Box::new(BooleanArray::new_scalar(*value))),
+        PrimitiveLiteral::Int(value) => 
Ok(Box::new(Int32Array::new_scalar(*value))),
+        PrimitiveLiteral::Long(value) => 
Ok(Box::new(Int64Array::new_scalar(*value))),
+        PrimitiveLiteral::Float(value) => 
Ok(Box::new(Float32Array::new_scalar(value.as_f32()))),
+        PrimitiveLiteral::Double(value) => 
Ok(Box::new(Float64Array::new_scalar(value.as_f64()))),
+        l => Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!("Unsupported literal type: {:?}", l),
+        )),
+    }
+}
+
+/// Recursively get the leaf column from the record batch. Assume that the 
nested columns in
+/// struct is projected to a single column.
+fn get_leaf_column(column: &ArrayRef) -> std::result::Result<ArrayRef, 
ArrowError> {
+    match column.data_type() {
+        DataType::Struct(fields) => {
+            if fields.len() != 1 {
+                return Err(ArrowError::SchemaError(
+                    "Struct column should have only one field after projection"
+                        .parse()
+                        .unwrap(),
+                ));
+            }
+            let struct_array = 
column.as_any().downcast_ref::<StructArray>().unwrap();
+            get_leaf_column(struct_array.column(0))
+        }
+        _ => Ok(column.clone()),
+    }
+}
+
+impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
+    type T = Box<dyn ArrowPredicate>;
+
+    fn always_true(&mut self) -> Result<Self::T> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            |batch| Ok(BooleanArray::from(vec![true; batch.num_rows()])),
+        )))
+    }
+
+    fn always_false(&mut self) -> Result<Self::T> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            |batch| Ok(BooleanArray::from(vec![false; batch.num_rows()])),
+        )))
+    }
+
+    fn and(&mut self, mut lhs: Self::T, mut rhs: Self::T) -> Result<Self::T> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let left = lhs.evaluate(batch.clone())?;
+                let right = rhs.evaluate(batch)?;
+                and(&left, &right)
+            },
+        )))
+    }
+
+    fn or(&mut self, mut lhs: Self::T, mut rhs: Self::T) -> Result<Self::T> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let left = lhs.evaluate(batch.clone())?;
+                let right = rhs.evaluate(batch)?;
+                or(&left, &right)
+            },
+        )))
+    }
+
+    fn not(&mut self, mut inner: Self::T) -> Result<Self::T> {
+        Ok(Box::new(ArrowPredicateFn::new(
+            self.projection_mask.clone(),
+            move |batch| {
+                let pred_ret = inner.evaluate(batch)?;
+                not(&pred_ret)
+            },
+        )))
+    }
+
+    fn is_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        let projected_mask = self.bound_reference(reference)?;

Review Comment:
   This seems incorrect to me. Let's say the predicate is `a is null AND b >1`, 
then the batch passed to this `ArrowPredicateFn` is constructed by projection 
mask of [a, b]. I think one possible solution is to use same project mask for 
all predicates, and pass the column_idx to get_leaf_column.



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +221,637 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| {
+                    field_id_map.get(field_id).cloned().ok_or_else(|| {
+                        Error::new(ErrorKind::DataInvalid, "Field id not found 
in schema")
+                    })
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                columns: &column_indices,
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices.clone()),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column {:?} in schema doesn't have field 
id",
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: Vec<i32>,
+}
+
+impl BoundPredicateVisitor for CollectFieldIdVisitor {
+    type T = ();
+
+    fn always_true(&mut self) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn always_false(&mut self) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn and(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn or(&mut self, _lhs: Self::T, _rhs: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn not(&mut self, _inner: Self::T) -> Result<Self::T> {
+        Ok(())
+    }
+
+    fn is_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_null(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn is_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_nan(
+        &mut self,
+        reference: &BoundReference,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn less_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn greater_than_or_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_eq(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_starts_with(
+        &mut self,
+        reference: &BoundReference,
+        _literal: &Datum,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn r#in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+
+    fn not_in(
+        &mut self,
+        reference: &BoundReference,
+        _literals: &FnvHashSet<Datum>,
+        _predicate: &BoundPredicate,
+    ) -> Result<Self::T> {
+        self.field_ids.push(reference.field().id);
+        Ok(())
+    }
+}
+
+/// A visitor to convert Iceberg bound predicates to Arrow predicates.
+struct PredicateConverter<'a> {
+    /// The leaf column indices used in the predicates.
+    pub columns: &'a Vec<usize>,
+    /// The projection mask for the Arrow predicates.
+    pub projection_mask: ProjectionMask,
+    /// The Parquet schema descriptor.
+    pub parquet_schema: &'a SchemaDescriptor,
+    /// The map between field id and leaf column index in Parquet schema.
+    pub column_map: &'a HashMap<i32, usize>,
+}
+
+impl PredicateConverter<'_> {
+    /// When visiting a bound reference, we return the projection mask for the 
leaf column
+    /// which is used to project the column in the record batch.
+    fn bound_reference(&mut self, reference: &BoundReference) -> 
Result<ProjectionMask> {
+        // The leaf column's index in Parquet schema.
+        let column_idx = 
self.column_map.get(&reference.field().id).ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("Field id {} not found in schema", 
reference.field().id),
+            )
+        })?;
+
+        // Find the column index in projection mask.
+        let column_idx = self
+            .columns
+            .iter()
+            .position(|&x| x == *column_idx)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Column index {} not found in schema", 
*column_idx),
+                )
+            })?;
+
+        Ok(ProjectionMask::leaves(
+            self.parquet_schema,
+            vec![self.columns[column_idx]],
+        ))
+    }
+}
+
+fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send>> {
+    match datum.literal() {
+        PrimitiveLiteral::Boolean(value) => 
Ok(Box::new(BooleanArray::new_scalar(*value))),
+        PrimitiveLiteral::Int(value) => 
Ok(Box::new(Int32Array::new_scalar(*value))),
+        PrimitiveLiteral::Long(value) => 
Ok(Box::new(Int64Array::new_scalar(*value))),
+        PrimitiveLiteral::Float(value) => 
Ok(Box::new(Float32Array::new_scalar(value.as_f32()))),
+        PrimitiveLiteral::Double(value) => 
Ok(Box::new(Float64Array::new_scalar(value.as_f64()))),
+        l => Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!("Unsupported literal type: {:?}", l),
+        )),
+    }
+}
+
+/// Recursively get the leaf column from the record batch. Assume that the 
nested columns in
+/// struct is projected to a single column.
+fn get_leaf_column(column: &ArrayRef) -> std::result::Result<ArrayRef, 
ArrowError> {
+    match column.data_type() {
+        DataType::Struct(fields) => {
+            if fields.len() != 1 {
+                return Err(ArrowError::SchemaError(
+                    "Struct column should have only one field after projection"
+                        .parse()
+                        .unwrap(),
+                ));
+            }
+            let struct_array = 
column.as_any().downcast_ref::<StructArray>().unwrap();
+            get_leaf_column(struct_array.column(0))
+        }
+        _ => Ok(column.clone()),
+    }
+}
+
+impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
+    type T = Box<dyn ArrowPredicate>;
+
+    fn always_true(&mut self) -> Result<Self::T> {

Review Comment:
   Please use concrete type here to improve readability😅



##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +216,399 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(&self, parquet_schema: &SchemaDescriptor) -> 
Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = self.build_field_id_map(parquet_schema)?;
+
+            // Collect Parquet column indices from field ids
+            let mut collector = CollectFieldIdVisitor { field_ids: vec![] };
+            visit_predicate(&mut collector, predicates).unwrap();
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| {
+                    field_id_map.get(field_id).cloned().ok_or_else(|| {

Review Comment:
   I think this is possible. For example, in version 1 iceberg table has no 
column a, and we inserted some data. Then we upgraded the schema to add column 
a, and it becomes version 2. Since iceberg is lazy, parquets files in version 1 
has no column a.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to