viirya commented on code in PR #295:
URL: https://github.com/apache/iceberg-rust/pull/295#discussion_r1582266774


##########
crates/iceberg/src/arrow/reader.rs:
##########
@@ -186,4 +221,637 @@ impl ArrowReader {
             Ok(ProjectionMask::leaves(parquet_schema, indices))
         }
     }
+
+    fn get_row_filter(
+        &self,
+        parquet_schema: &SchemaDescriptor,
+        collector: &CollectFieldIdVisitor,
+    ) -> Result<Option<RowFilter>> {
+        if let Some(predicates) = &self.predicates {
+            let field_id_map = build_field_id_map(parquet_schema)?;
+
+            let column_indices = collector
+                .field_ids
+                .iter()
+                .map(|field_id| {
+                    field_id_map.get(field_id).cloned().ok_or_else(|| {
+                        Error::new(ErrorKind::DataInvalid, "Field id not found 
in schema")
+                    })
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            // Convert BoundPredicates to ArrowPredicates
+            let mut converter = PredicateConverter {
+                columns: &column_indices,
+                projection_mask: ProjectionMask::leaves(parquet_schema, 
column_indices.clone()),
+                parquet_schema,
+                column_map: &field_id_map,
+            };
+            let arrow_predicate = visit(&mut converter, predicates)?;
+            Ok(Some(RowFilter::new(vec![arrow_predicate])))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Build the map of field id to Parquet column index in the schema.
+fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> 
Result<HashMap<i32, usize>> {
+    let mut column_map = HashMap::new();
+    for (idx, field) in parquet_schema.columns().iter().enumerate() {
+        let field_type = field.self_type();
+        match field_type {
+            ParquetType::PrimitiveType { basic_info, .. } => {
+                if !basic_info.has_id() {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Leave column {:?} in schema doesn't have field 
id",
+                            field_type
+                        ),
+                    ));
+                }
+                column_map.insert(basic_info.id(), idx);
+            }
+            ParquetType::GroupType { .. } => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Leave column in schema should be primitive type but 
got {:?}",
+                        field_type
+                    ),
+                ));
+            }
+        };
+    }
+
+    Ok(column_map)
+}
+
+/// A visitor to collect field ids from bound predicates.
+struct CollectFieldIdVisitor {
+    field_ids: Vec<i32>,
+}
+
+impl BoundPredicateVisitor for CollectFieldIdVisitor {
+    type T = ();
+
+    fn always_true(&mut self) -> Result<Self::T> {

Review Comment:
   I probably missed the previous comment. Let me change the type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to