This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 33f6ab76 Fix: Support nested struct field filtering (#2628)
33f6ab76 is described below

commit 33f6ab762a1ced71869dc400e932c799a0af2605
Author: Yftach Zur <[email protected]>
AuthorDate: Tue Nov 11 09:42:46 2025 +0100

    Fix: Support nested struct field filtering (#2628)
    
    Fixes #953
    
    # Rationale for this change
    Fixes filtering on nested struct fields when using PyArrow for scan
    operations.
    
    ## Are these changes tested?
    Yes, the full test suite + new tests
    
    ## Are there any user-facing changes?
    Now, filtering  a scan using a nested field will work
    
    ## Problem
    
    When filtering on nested struct fields (e.g., `parentField.childField ==
    'value'`), PyArrow would fail with:
    ```
    ArrowInvalid: No match for FieldRef.Name(childField) in ...
    ```
    
    The issue occurred because PyArrow requires nested field references as
    tuples (e.g., `("parent", "child")`) rather than dotted strings (e.g.,
    `"parent.child"`).
    
    ## Solution
    
    1. Modified `_ConvertToArrowExpression` to accept an optional `Schema`
    parameter
    2. Added `_get_field_name()` method that converts dotted field paths to
    tuples for nested struct fields
    3. Updated `expression_to_pyarrow()` to accept and pass the schema
    parameter
    4. Updated all call sites to pass the schema when available
    
    ## Changes
    
    - `pyiceberg/io/pyarrow.py`:
    - Modified `_ConvertToArrowExpression` class to handle nested field
    paths
      - Updated `expression_to_pyarrow()` signature to accept schema
      - Updated `_expression_to_complementary_pyarrow()` signature
    - `pyiceberg/table/__init__.py`:
    - Updated call to `_expression_to_complementary_pyarrow()` to pass
    schema
    - Tests:
    - Added `test_ref_binding_nested_struct_field()` for comprehensive
    nested field testing
      - Enhanced `test_nested_fields()` with issue #953 scenarios
    
    ## Example
    
    ```python
    # Now works correctly:
    table.scan(row_filter="parent.child == 'abc123'").to_polars()
    ```
    
    The fix converts the field reference from:
    - ❌ `FieldRef.Name(run_id)` (fails - field not found)
    - ✅ `FieldRef.Nested(FieldRef.Name(mazeMetadata) FieldRef.Name(run_id))`
    (works!)
    
    ---------
    
    Co-authored-by: Yftach Zur <[email protected]>
    Co-authored-by: Claude <[email protected]>
---
 pyiceberg/io/pyarrow.py               | 80 ++++++++++++++++++++++++++---------
 pyiceberg/table/__init__.py           |  2 +-
 tests/expressions/test_expressions.py | 52 +++++++++++++++++++++++
 tests/expressions/test_parser.py      |  3 ++
 4 files changed, 117 insertions(+), 20 deletions(-)

diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 46d7fe6b..5be4c5d2 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -808,51 +808,83 @@ def _convert_scalar(value: Any, iceberg_type: 
IcebergType) -> pa.scalar:
 
 
 class _ConvertToArrowExpression(BoundBooleanExpressionVisitor[pc.Expression]):
+    """Convert Iceberg bound expressions to PyArrow expressions.
+
+    Args:
+        schema: Optional Iceberg schema to resolve full field paths for nested 
fields.
+                If not provided, only the field name will be used (not dotted 
path).
+    """
+
+    _schema: Schema | None
+
+    def __init__(self, schema: Schema | None = None):
+        self._schema = schema
+
+    def _get_field_name(self, term: BoundTerm[Any]) -> str | Tuple[str, ...]:
+        """Get the field name or nested field path for a bound term.
+
+        For nested struct fields, returns a tuple of field names (e.g., 
("mazeMetadata", "run_id")).
+        For top-level fields, returns just the field name as a string.
+
+        PyArrow requires nested field references as tuples, not dotted strings.
+        """
+        if self._schema is not None:
+            # Use the schema to get the full dotted path for nested fields
+            full_name = 
self._schema.find_column_name(term.ref().field.field_id)
+            if full_name is not None:
+                # If the field name contains dots, it's a nested field
+                # Convert "parent.child" to ("parent", "child") for PyArrow
+                if "." in full_name:
+                    return tuple(full_name.split("."))
+                return full_name
+        # Fallback to just the field name if schema is not available
+        return term.ref().field.name
+
     def visit_in(self, term: BoundTerm[Any], literals: Set[Any]) -> 
pc.Expression:
         pyarrow_literals = pa.array(literals, 
type=schema_to_pyarrow(term.ref().field.field_type))
-        return pc.field(term.ref().field.name).isin(pyarrow_literals)
+        return pc.field(self._get_field_name(term)).isin(pyarrow_literals)
 
     def visit_not_in(self, term: BoundTerm[Any], literals: Set[Any]) -> 
pc.Expression:
         pyarrow_literals = pa.array(literals, 
type=schema_to_pyarrow(term.ref().field.field_type))
-        return ~pc.field(term.ref().field.name).isin(pyarrow_literals)
+        return ~pc.field(self._get_field_name(term)).isin(pyarrow_literals)
 
     def visit_is_nan(self, term: BoundTerm[Any]) -> pc.Expression:
-        ref = pc.field(term.ref().field.name)
+        ref = pc.field(self._get_field_name(term))
         return pc.is_nan(ref)
 
     def visit_not_nan(self, term: BoundTerm[Any]) -> pc.Expression:
-        ref = pc.field(term.ref().field.name)
+        ref = pc.field(self._get_field_name(term))
         return ~pc.is_nan(ref)
 
     def visit_is_null(self, term: BoundTerm[Any]) -> pc.Expression:
-        return pc.field(term.ref().field.name).is_null(nan_is_null=False)
+        return pc.field(self._get_field_name(term)).is_null(nan_is_null=False)
 
     def visit_not_null(self, term: BoundTerm[Any]) -> pc.Expression:
-        return pc.field(term.ref().field.name).is_valid()
+        return pc.field(self._get_field_name(term)).is_valid()
 
     def visit_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> 
pc.Expression:
-        return pc.field(term.ref().field.name) == 
_convert_scalar(literal.value, term.ref().field.field_type)
+        return pc.field(self._get_field_name(term)) == 
_convert_scalar(literal.value, term.ref().field.field_type)
 
     def visit_not_equal(self, term: BoundTerm[Any], literal: Literal[Any]) -> 
pc.Expression:
-        return pc.field(term.ref().field.name) != 
_convert_scalar(literal.value, term.ref().field.field_type)
+        return pc.field(self._get_field_name(term)) != 
_convert_scalar(literal.value, term.ref().field.field_type)
 
     def visit_greater_than_or_equal(self, term: BoundTerm[Any], literal: 
Literal[Any]) -> pc.Expression:
-        return pc.field(term.ref().field.name) >= 
_convert_scalar(literal.value, term.ref().field.field_type)
+        return pc.field(self._get_field_name(term)) >= 
_convert_scalar(literal.value, term.ref().field.field_type)
 
     def visit_greater_than(self, term: BoundTerm[Any], literal: Literal[Any]) 
-> pc.Expression:
-        return pc.field(term.ref().field.name) > 
_convert_scalar(literal.value, term.ref().field.field_type)
+        return pc.field(self._get_field_name(term)) > 
_convert_scalar(literal.value, term.ref().field.field_type)
 
     def visit_less_than(self, term: BoundTerm[Any], literal: Literal[Any]) -> 
pc.Expression:
-        return pc.field(term.ref().field.name) < 
_convert_scalar(literal.value, term.ref().field.field_type)
+        return pc.field(self._get_field_name(term)) < 
_convert_scalar(literal.value, term.ref().field.field_type)
 
     def visit_less_than_or_equal(self, term: BoundTerm[Any], literal: 
Literal[Any]) -> pc.Expression:
-        return pc.field(term.ref().field.name) <= 
_convert_scalar(literal.value, term.ref().field.field_type)
+        return pc.field(self._get_field_name(term)) <= 
_convert_scalar(literal.value, term.ref().field.field_type)
 
     def visit_starts_with(self, term: BoundTerm[Any], literal: Literal[Any]) 
-> pc.Expression:
-        return pc.starts_with(pc.field(term.ref().field.name), literal.value)
+        return pc.starts_with(pc.field(self._get_field_name(term)), 
literal.value)
 
     def visit_not_starts_with(self, term: BoundTerm[Any], literal: 
Literal[Any]) -> pc.Expression:
-        return ~pc.starts_with(pc.field(term.ref().field.name), literal.value)
+        return ~pc.starts_with(pc.field(self._get_field_name(term)), 
literal.value)
 
     def visit_true(self) -> pc.Expression:
         return pc.scalar(True)
@@ -988,11 +1020,21 @@ class 
_NullNaNUnmentionedTermsCollector(BoundBooleanExpressionVisitor[None]):
         boolean_expression_visit(expr, self)
 
 
-def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
-    return boolean_expression_visit(expr, _ConvertToArrowExpression())
+def expression_to_pyarrow(expr: BooleanExpression, schema: Schema | None = 
None) -> pc.Expression:
+    """Convert an Iceberg boolean expression to a PyArrow expression.
+
+    Args:
+        expr: The Iceberg boolean expression to convert.
+        schema: Optional Iceberg schema to resolve full field paths for nested 
fields.
+                If provided, nested struct fields will use dotted paths (e.g., 
"parent.child").
+
+    Returns:
+        A PyArrow compute expression.
+    """
+    return boolean_expression_visit(expr, _ConvertToArrowExpression(schema))
 
 
-def _expression_to_complementary_pyarrow(expr: BooleanExpression) -> 
pc.Expression:
+def _expression_to_complementary_pyarrow(expr: BooleanExpression, schema: 
Schema | None = None) -> pc.Expression:
     """Complementary filter conversion function of expression_to_pyarrow.
 
     Could not use expression_to_pyarrow(Not(expr)) to achieve this 
complementary effect because ~ in pyarrow.compute.Expression does not handle 
null.
@@ -1013,7 +1055,7 @@ def _expression_to_complementary_pyarrow(expr: 
BooleanExpression) -> pc.Expressi
         preserve_expr = Or(preserve_expr, BoundIsNull(term=term))
     for term in nan_unmentioned_bound_terms:
         preserve_expr = Or(preserve_expr, BoundIsNaN(term=term))
-    return expression_to_pyarrow(preserve_expr)
+    return expression_to_pyarrow(preserve_expr, schema)
 
 
 @lru_cache
@@ -1553,7 +1595,7 @@ def _task_to_record_batches(
                 bound_row_filter, file_schema, case_sensitive=case_sensitive, 
projected_field_values=projected_missing_fields
             )
             bound_file_filter = bind(file_schema, translated_row_filter, 
case_sensitive=case_sensitive)
-            pyarrow_filter = expression_to_pyarrow(bound_file_filter)
+            pyarrow_filter = expression_to_pyarrow(bound_file_filter, 
file_schema)
 
         file_project_schema = prune_columns(file_schema, projected_field_ids, 
select_full_types=False)
 
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 5fd83a81..cc7c1c6a 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -674,7 +674,7 @@ class Transaction:
         # Check if there are any files that require an actual rewrite of a 
data file
         if delete_snapshot.rewrites_needed is True:
             bound_delete_filter = bind(self.table_metadata.schema(), 
delete_filter, case_sensitive)
-            preserve_row_filter = 
_expression_to_complementary_pyarrow(bound_delete_filter)
+            preserve_row_filter = 
_expression_to_complementary_pyarrow(bound_delete_filter, 
self.table_metadata.schema())
 
             file_scan = self._scan(row_filter=delete_filter, 
case_sensitive=case_sensitive)
             if branch is not None:
diff --git a/tests/expressions/test_expressions.py 
b/tests/expressions/test_expressions.py
index 57a39e08..dbee2ca0 100644
--- a/tests/expressions/test_expressions.py
+++ b/tests/expressions/test_expressions.py
@@ -228,6 +228,58 @@ def 
test_ref_binding_case_insensitive_failure(table_schema_simple: Schema) -> No
         ref.bind(table_schema_simple, case_sensitive=False)
 
 
+def test_ref_binding_nested_struct_field() -> None:
+    """Test binding references to nested struct fields (issue #953)."""
+    schema = Schema(
+        NestedField(field_id=1, name="age", field_type=IntegerType(), 
required=True),
+        NestedField(
+            field_id=2,
+            name="employment",
+            field_type=StructType(
+                NestedField(field_id=3, name="status", 
field_type=StringType(), required=False),
+                NestedField(field_id=4, name="company", 
field_type=StringType(), required=False),
+            ),
+            required=False,
+        ),
+        NestedField(
+            field_id=5,
+            name="contact",
+            field_type=StructType(
+                NestedField(field_id=6, name="email", field_type=StringType(), 
required=False),
+            ),
+            required=False,
+        ),
+        schema_id=1,
+    )
+
+    # Test that nested field names are in the index
+    assert "employment.status" in schema._name_to_id
+    assert "employment.company" in schema._name_to_id
+    assert "contact.email" in schema._name_to_id
+
+    # Test binding a reference to nested fields
+    ref = Reference("employment.status")
+    bound = ref.bind(schema, case_sensitive=True)
+    assert bound.field.field_id == 3
+    assert bound.field.name == "status"
+
+    # Test with different nested field
+    ref2 = Reference("contact.email")
+    bound2 = ref2.bind(schema, case_sensitive=True)
+    assert bound2.field.field_id == 6
+    assert bound2.field.name == "email"
+
+    # Test case-insensitive binding
+    ref3 = Reference("EMPLOYMENT.STATUS")
+    bound3 = ref3.bind(schema, case_sensitive=False)
+    assert bound3.field.field_id == 3
+
+    # Test that binding fails for non-existent nested field
+    ref4 = Reference("employment.department")
+    with pytest.raises(ValueError):
+        ref4.bind(schema, case_sensitive=True)
+
+
 def test_in_to_eq() -> None:
     assert In("x", (34.56,)) == EqualTo("x", 34.56)
 
diff --git a/tests/expressions/test_parser.py b/tests/expressions/test_parser.py
index 28d7cf11..2f0c444d 100644
--- a/tests/expressions/test_parser.py
+++ b/tests/expressions/test_parser.py
@@ -225,6 +225,9 @@ def test_with_function() -> None:
 def test_nested_fields() -> None:
     assert EqualTo("foo.bar", "data") == parser.parse("foo.bar = 'data'")
     assert LessThan("location.x", DecimalLiteral(Decimal(52.00))) == 
parser.parse("location.x < 52.00")
+    # Test issue #953 scenario - nested struct field filtering
+    assert EqualTo("employment.status", "Employed") == 
parser.parse("employment.status = 'Employed'")
+    assert EqualTo("contact.email", "[email protected]") == 
parser.parse("contact.email = '[email protected]'")
 
 
 def test_quoted_column_with_dots() -> None:

Reply via email to