This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0337ecefc61 Fix iceberg nested filter (#37340)
0337ecefc61 is described below
commit 0337ecefc61fb4b04e3a661fe03f1893079461e3
Author: liferoad <[email protected]>
AuthorDate: Tue Feb 17 17:12:39 2026 -0500
Fix iceberg nested filter (#37340)
* Fix IcebergIO predicate filter pushdown for nested columns
* Add modern IDE directories to .gitignore
* chore: update .gitignore to exclude additional IDE directories
* feat: Add unit and integration tests for filtering Iceberg tables by
nested fields.
* chore: Remove `.agent/` from `.gitignore`.
---
.../IO_Iceberg_Integration_Tests.json | 2 +-
.gitignore | 7 +++++
.../apache/beam/sdk/io/iceberg/FilterUtils.java | 30 ++++++++++++----------
.../beam/sdk/io/iceberg/FilterUtilsTest.java | 16 ++++++++++++
.../io/iceberg/catalog/IcebergCatalogBaseIT.java | 19 ++++++++++++++
5 files changed, 59 insertions(+), 15 deletions(-)
diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index b73af5e61a4..7ab7bcd9a9c 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "modification": 1
+ "modification": 2
}
diff --git a/.gitignore b/.gitignore
index 9c6e68f4ce5..e9fe331cb31 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,13 @@
# is an input to 'maven-assembly-plugin' that generates source distribution.
# This is typically in files named 'src.xml' throughout this repository.
+# Ignore IDE files
+.codex/
+.trae/
+.cursor/
+.windsurf/
+.claude/
+
# Ignore any offline repositories the user may have created.
**/offline-repository/**/*
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
index fd008701c54..855fcf7d1e1 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FilterUtils.java
@@ -66,8 +66,6 @@ import org.checkerframework.checker.nullness.qual.Nullable;
/**
* Utilities that convert between a SQL filter expression and an Iceberg
{@link Expression}. Uses
* Apache Calcite semantics.
- *
- * <p>Note: Only supports top-level fields (i.e. cannot reference nested
fields).
*/
@Internal
public class FilterUtils {
@@ -112,7 +110,7 @@ public class FilterUtils {
private static void extractFieldNames(SqlNode node, Set<String> fieldNames) {
if (node instanceof SqlIdentifier) {
- fieldNames.add(((SqlIdentifier) node).getSimple());
+ fieldNames.add(getFieldName((SqlIdentifier) node));
} else if (node instanceof SqlBasicCall) {
// recursively check operands
SqlBasicCall call = (SqlBasicCall) node;
@@ -133,9 +131,6 @@ public class FilterUtils {
/**
* parses a SQL filter expression string into an Iceberg {@link Expression}
that can be used for
* data pruning.
- *
- * <p>Note: This utility currently supports only top-level fields within the
filter expression.
- * Nested field references are not supported.
*/
static Expression convert(@Nullable String filter, Schema schema) {
if (filter == null) {
@@ -154,7 +149,7 @@ public class FilterUtils {
private static Expression convert(SqlNode expression, Schema schema) throws
SqlParseException {
if (expression instanceof SqlIdentifier) {
- String fieldName = ((SqlIdentifier) expression).getSimple();
+ String fieldName = getFieldName((SqlIdentifier) expression);
Types.NestedField field = schema.caseInsensitiveFindField(fieldName);
if (field.type().equals(Types.BooleanType.get())) {
return Expressions.equal(field.name(), true);
@@ -242,7 +237,14 @@ public class FilterUtils {
SqlNode ref = call.operand(0);
Preconditions.checkState(
ref instanceof SqlIdentifier, "Expected operand '%s' to be a
reference.", ref);
- return ((SqlIdentifier) ref).getSimple();
+ return getFieldName((SqlIdentifier) ref);
+ }
+
+ private static String getFieldName(SqlIdentifier identifier) {
+ if (identifier.isSimple()) {
+ return identifier.getSimple();
+ }
+ return String.join(".", identifier.names);
}
private static SqlNode getLeftChild(SqlBasicCall call) {
@@ -285,9 +287,9 @@ public class FilterUtils {
checkArgument(
value instanceof SqlNodeList,
"Expected right hand side to be a list but got " + value.getClass());
- String caseInsensitiveName = ((SqlIdentifier) term).getSimple();
+ String caseInsensitiveName = getFieldName((SqlIdentifier) term);
Types.NestedField field =
schema.caseInsensitiveFindField(caseInsensitiveName);
- String name = field.name();
+ String name = schema.findColumnName(field.fieldId());
TypeID type = field.type().typeId();
List<SqlNode> list =
((SqlNodeList) value)
@@ -313,16 +315,16 @@ public class FilterUtils {
SqlNode left = getLeftChild(call);
SqlNode right = getRightChild(call);
if (left instanceof SqlIdentifier && right instanceof SqlLiteral) {
- String caseInsensitiveName = ((SqlIdentifier) left).getSimple();
+ String caseInsensitiveName = getFieldName((SqlIdentifier) left);
Types.NestedField field =
schema.caseInsensitiveFindField(caseInsensitiveName);
- String name = field.name();
+ String name = schema.findColumnName(field.fieldId());
TypeID type = field.type().typeId();
Object value = convertLiteral((SqlLiteral) right, name, type);
return convertLR.apply(name, value);
} else if (left instanceof SqlLiteral && right instanceof SqlIdentifier) {
- String caseInsensitiveName = ((SqlIdentifier) right).getSimple();
+ String caseInsensitiveName = getFieldName((SqlIdentifier) right);
Types.NestedField field =
schema.caseInsensitiveFindField(caseInsensitiveName);
- String name = field.name();
+ String name = schema.findColumnName(field.fieldId());
TypeID type = field.type().typeId();
Object value = convertLiteral((SqlLiteral) left, name, type);
return convertRL.apply(name, value);
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
index 591467ce0d0..12acfff7bf0 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java
@@ -169,6 +169,21 @@ public class FilterUtilsTest {
.validate();
}
+ @Test
+ public void testNestedField() {
+ // nested integer
+ TestCase.expecting(lessThan("nested.field", 30))
+ .fromFilter("\"nested\".\"field\" < 30")
+ .withSchema(
+ new Schema(
+ Types.NestedField.required(
+ 1,
+ "nested",
+ Types.StructType.of(
+ Types.NestedField.required(2, "field",
Types.IntegerType.get())))))
+ .validate();
+ }
+
@Test
public void testLessThanOrEqual() {
// integer
@@ -726,6 +741,7 @@ public class FilterUtilsTest {
Pair.of("field_1 < 35", Sets.newHashSet("FIELD_1")),
Pair.of("\"field_1\" in (1, 2, 3)", Sets.newHashSet("field_1")),
Pair.of("field_1 < 35 and \"fiELd_2\" = TRUE",
Sets.newHashSet("FIELD_1", "fiELd_2")),
+ Pair.of("\"nested\".\"inner\" = 'abc'",
Sets.newHashSet("nested.inner")),
Pair.of(
"(\"field_1\" < 35 and \"field_2\" = TRUE) or \"field_3\" in
('a', 'b')",
Sets.newHashSet("field_1", "field_2", "field_3")));
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
index 9e6aa5913cc..c2c5dc0b8f4 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
@@ -488,6 +488,25 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
pipeline.run().waitUntilFinish();
}
+ @Test
+ public void testReadWithNestedFieldFilter() throws Exception {
+ Table table = catalog.createTable(TableIdentifier.parse(tableId()),
ICEBERG_SCHEMA);
+
+ List<Row> expectedRows =
+ populateTable(table).stream()
+ .filter(row -> row.getRow("row").getInt32("nested_int") < 350)
+ .collect(Collectors.toList());
+
+ Map<String, Object> config = new
HashMap<>(managedIcebergConfig(tableId()));
+ config.put("filter", "\"row\".\"nested_int\" < 350");
+
+ PCollection<Row> rows =
+
pipeline.apply(Managed.read(ICEBERG).withConfig(config)).getSinglePCollection();
+
+ PAssert.that(rows).containsInAnyOrder(expectedRows);
+ pipeline.run().waitUntilFinish();
+ }
+
@Test
public void testStreamingReadWithFilter() throws Exception {
Table table = catalog.createTable(TableIdentifier.parse(tableId()),
ICEBERG_SCHEMA);