This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0337ecefc61 Fix iceberg nested filter (#37340)
0337ecefc61 is described below

commit 0337ecefc61fb4b04e3a661fe03f1893079461e3
Author: liferoad <[email protected]>
AuthorDate: Tue Feb 17 17:12:39 2026 -0500

    Fix iceberg nested filter (#37340)
    
    * Fix IcebergIO predicate filter pushdown for nested columns
    
    * Add modern IDE directories to .gitignore
    
    * chore: update .gitignore to exclude additional IDE directories
    
    * feat: Add unit and integration tests for filtering Iceberg tables by 
nested fields.
    
    * chore: Remove `.agent/` from `.gitignore`.
---
 .../IO_Iceberg_Integration_Tests.json              |  2 +-
 .gitignore                                         |  7 +++++
 .../apache/beam/sdk/io/iceberg/FilterUtils.java    | 30 ++++++++++++----------
 .../beam/sdk/io/iceberg/FilterUtilsTest.java       | 16 ++++++++++++
 .../io/iceberg/catalog/IcebergCatalogBaseIT.java   | 19 ++++++++++++++
 5 files changed, 59 insertions(+), 15 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index b73af5e61a4..7ab7bcd9a9c 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
 {
     "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-    "modification": 1
+    "modification": 2
 }
diff --git a/.gitignore b/.gitignore
index 9c6e68f4ce5..e9fe331cb31 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,13 @@
 # is an input to 'maven-assembly-plugin' that generates source distribution.
 # This is typically in files named 'src.xml' throughout this repository.
 
+# Ignore IDE files
+.codex/
+.trae/
+.cursor/
+.windsurf/
+.claude/
+
 # Ignore any offline repositories the user may have created.
 **/offline-repository/**/*
 
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
index fd008701c54..855fcf7d1e1 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
@@ -66,8 +66,6 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 /**
  * Utilities that convert between a SQL filter expression and an Iceberg 
{@link Expression}. Uses
  * Apache Calcite semantics.
- *
- * <p>Note: Only supports top-level fields (i.e. cannot reference nested 
fields).
  */
 @Internal
 public class FilterUtils {
@@ -112,7 +110,7 @@ public class FilterUtils {
 
   private static void extractFieldNames(SqlNode node, Set<String> fieldNames) {
     if (node instanceof SqlIdentifier) {
-      fieldNames.add(((SqlIdentifier) node).getSimple());
+      fieldNames.add(getFieldName((SqlIdentifier) node));
     } else if (node instanceof SqlBasicCall) {
       // recursively check operands
       SqlBasicCall call = (SqlBasicCall) node;
@@ -133,9 +131,6 @@ public class FilterUtils {
   /**
    * parses a SQL filter expression string into an Iceberg {@link Expression} 
that can be used for
    * data pruning.
-   *
-   * <p>Note: This utility currently supports only top-level fields within the 
filter expression.
-   * Nested field references are not supported.
    */
   static Expression convert(@Nullable String filter, Schema schema) {
     if (filter == null) {
@@ -154,7 +149,7 @@ public class FilterUtils {
 
   private static Expression convert(SqlNode expression, Schema schema) throws 
SqlParseException {
     if (expression instanceof SqlIdentifier) {
-      String fieldName = ((SqlIdentifier) expression).getSimple();
+      String fieldName = getFieldName((SqlIdentifier) expression);
       Types.NestedField field = schema.caseInsensitiveFindField(fieldName);
       if (field.type().equals(Types.BooleanType.get())) {
         return Expressions.equal(field.name(), true);
@@ -242,7 +237,14 @@ public class FilterUtils {
     SqlNode ref = call.operand(0);
     Preconditions.checkState(
         ref instanceof SqlIdentifier, "Expected operand '%s' to be a 
reference.", ref);
-    return ((SqlIdentifier) ref).getSimple();
+    return getFieldName((SqlIdentifier) ref);
+  }
+
+  private static String getFieldName(SqlIdentifier identifier) {
+    if (identifier.isSimple()) {
+      return identifier.getSimple();
+    }
+    return String.join(".", identifier.names);
   }
 
   private static SqlNode getLeftChild(SqlBasicCall call) {
@@ -285,9 +287,9 @@ public class FilterUtils {
     checkArgument(
         value instanceof SqlNodeList,
         "Expected right hand side to be a list but got " + value.getClass());
-    String caseInsensitiveName = ((SqlIdentifier) term).getSimple();
+    String caseInsensitiveName = getFieldName((SqlIdentifier) term);
     Types.NestedField field = 
schema.caseInsensitiveFindField(caseInsensitiveName);
-    String name = field.name();
+    String name = schema.findColumnName(field.fieldId());
     TypeID type = field.type().typeId();
     List<SqlNode> list =
         ((SqlNodeList) value)
@@ -313,16 +315,16 @@ public class FilterUtils {
     SqlNode left = getLeftChild(call);
     SqlNode right = getRightChild(call);
     if (left instanceof SqlIdentifier && right instanceof SqlLiteral) {
-      String caseInsensitiveName = ((SqlIdentifier) left).getSimple();
+      String caseInsensitiveName = getFieldName((SqlIdentifier) left);
       Types.NestedField field = 
schema.caseInsensitiveFindField(caseInsensitiveName);
-      String name = field.name();
+      String name = schema.findColumnName(field.fieldId());
       TypeID type = field.type().typeId();
       Object value = convertLiteral((SqlLiteral) right, name, type);
       return convertLR.apply(name, value);
     } else if (left instanceof SqlLiteral && right instanceof SqlIdentifier) {
-      String caseInsensitiveName = ((SqlIdentifier) right).getSimple();
+      String caseInsensitiveName = getFieldName((SqlIdentifier) right);
       Types.NestedField field = 
schema.caseInsensitiveFindField(caseInsensitiveName);
-      String name = field.name();
+      String name = schema.findColumnName(field.fieldId());
       TypeID type = field.type().typeId();
       Object value = convertLiteral((SqlLiteral) left, name, type);
       return convertRL.apply(name, value);
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
index 591467ce0d0..12acfff7bf0 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
@@ -169,6 +169,21 @@ public class FilterUtilsTest {
         .validate();
   }
 
+  @Test
+  public void testNestedField() {
+    // nested integer
+    TestCase.expecting(lessThan("nested.field", 30))
+        .fromFilter("\"nested\".\"field\" < 30")
+        .withSchema(
+            new Schema(
+                Types.NestedField.required(
+                    1,
+                    "nested",
+                    Types.StructType.of(
+                        Types.NestedField.required(2, "field", 
Types.IntegerType.get())))))
+        .validate();
+  }
+
   @Test
   public void testLessThanOrEqual() {
     // integer
@@ -726,6 +741,7 @@ public class FilterUtilsTest {
             Pair.of("field_1 < 35", Sets.newHashSet("FIELD_1")),
             Pair.of("\"field_1\" in (1, 2, 3)", Sets.newHashSet("field_1")),
             Pair.of("field_1 < 35 and \"fiELd_2\" = TRUE", 
Sets.newHashSet("FIELD_1", "fiELd_2")),
+            Pair.of("\"nested\".\"inner\" = 'abc'", 
Sets.newHashSet("nested.inner")),
             Pair.of(
                 "(\"field_1\" < 35 and \"field_2\" = TRUE) or \"field_3\" in 
('a', 'b')",
                 Sets.newHashSet("field_1", "field_2", "field_3")));
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
index 9e6aa5913cc..c2c5dc0b8f4 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
@@ -488,6 +488,25 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
     pipeline.run().waitUntilFinish();
   }
 
+  @Test
+  public void testReadWithNestedFieldFilter() throws Exception {
+    Table table = catalog.createTable(TableIdentifier.parse(tableId()), 
ICEBERG_SCHEMA);
+
+    List<Row> expectedRows =
+        populateTable(table).stream()
+            .filter(row -> row.getRow("row").getInt32("nested_int") < 350)
+            .collect(Collectors.toList());
+
+    Map<String, Object> config = new 
HashMap<>(managedIcebergConfig(tableId()));
+    config.put("filter", "\"row\".\"nested_int\" < 350");
+
+    PCollection<Row> rows =
+        
pipeline.apply(Managed.read(ICEBERG).withConfig(config)).getSinglePCollection();
+
+    PAssert.that(rows).containsInAnyOrder(expectedRows);
+    pipeline.run().waitUntilFinish();
+  }
+
   @Test
   public void testStreamingReadWithFilter() throws Exception {
     Table table = catalog.createTable(TableIdentifier.parse(tableId()), 
ICEBERG_SCHEMA);

Reply via email to