This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f947eb302 Parquet: Treat VARIANT like nested for eq/in in 
ParquetMetricsRowGroupFilter (#14279)
2f947eb302 is described below

commit 2f947eb30222cbd4a52074322a54531c95368ffa
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Oct 17 17:03:55 2025 -0700

    Parquet: Treat VARIANT like nested for eq/in in 
ParquetMetricsRowGroupFilter (#14279)
    
    * Parquet: Treat VARIANT like nested for eq/in in 
ParquetMetricsRowGroupFilter
    
    * address comments
    
    * address comments
---
 .../iceberg/data/TestMetricsRowGroupFilter.java    | 98 +++++++++++++++++-----
 .../parquet/ParquetMetricsRowGroupFilter.java      | 12 +--
 2 files changed, 82 insertions(+), 28 deletions(-)

diff --git 
a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java 
b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
index c871c25c93..e1f9f07e7a 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
@@ -1030,18 +1030,11 @@ public class TestMetricsRowGroupFilter {
       records.add(record);
     }
 
-    File parquetFile = writeParquetFile("test-variant", VARIANT_SCHEMA, 
records);
-    InputFile inFile = Files.localInput(parquetFile);
-    try (ParquetFileReader reader = 
ParquetFileReader.open(parquetInputFile(inFile))) {
-      BlockMetaData blockMetaData = reader.getRowGroups().get(0);
-      MessageType fileSchema = reader.getFileMetaData().getSchema();
-      ParquetMetricsRowGroupFilter rowGroupFilter =
-          new ParquetMetricsRowGroupFilter(VARIANT_SCHEMA, 
notNull("variant_field"), true);
+    boolean shouldRead = shouldReadVariant(notNull("variant_field"), records);
 
-      assertThat(rowGroupFilter.shouldRead(fileSchema, blockMetaData))
-          .as("Should read: variant notNull filters must be evaluated post 
scan")
-          .isTrue();
-    }
+    assertThat(shouldRead)
+        .as("Should read: variant notNull filters must be evaluated post scan")
+        .isTrue();
   }
 
   @TestTemplate
@@ -1056,19 +1049,40 @@ public class TestMetricsRowGroupFilter {
       records.add(record);
     }
 
-    File parquetFile = writeParquetFile("test-variant-nulls", VARIANT_SCHEMA, 
records);
-    InputFile inFile = Files.localInput(parquetFile);
+    boolean shouldRead = shouldReadVariant(notNull("variant_field"), records);
 
-    try (ParquetFileReader reader = 
ParquetFileReader.open(parquetInputFile(inFile))) {
-      BlockMetaData blockMetaData = reader.getRowGroups().get(0);
-      MessageType fileSchema = reader.getFileMetaData().getSchema();
-      ParquetMetricsRowGroupFilter rowGroupFilter =
-          new ParquetMetricsRowGroupFilter(VARIANT_SCHEMA, 
notNull("variant_field"), true);
+    assertThat(shouldRead)
+        .as("Should read: variant notNull filters must be evaluated post scan 
even for all nulls")
+        .isTrue();
+  }
 
-      assertThat(rowGroupFilter.shouldRead(fileSchema, blockMetaData))
-          .as("Should read: variant notNull filters must be evaluated post 
scan even for all nulls")
-          .isTrue();
-    }
+  @TestTemplate
+  public void testVariantFieldEq() throws IOException {
+    assumeThat(format).isEqualTo(FileFormat.PARQUET);
+
+    VariantMetadata md = Variants.metadata("k");
+    Variant v0 = createVariantWithKey(md, "v0");
+    List<GenericRecord> records = createVariantRecords(v0);
+
+    boolean shouldRead = shouldReadVariant(equal("variant_field", v0), 
records);
+    assertThat(shouldRead)
+        .as("Should read: variant eq filters must be evaluated post scan")
+        .isTrue();
+  }
+
+  @TestTemplate
+  public void testVariantFieldIn() throws IOException {
+    assumeThat(format).isEqualTo(FileFormat.PARQUET);
+
+    VariantMetadata md = Variants.metadata("k");
+    Variant v0 = createVariantWithKey(md, "v0");
+    Variant v1 = createVariantWithKey(md, "v1");
+    List<GenericRecord> records = createVariantRecords(v0);
+
+    boolean shouldRead = shouldReadVariant(in("variant_field", v0, v1), 
records);
+    assertThat(shouldRead)
+        .as("Should read RowGroups: variant in filters must be evaluated post 
scan")
+        .isTrue();
   }
 
   @TestTemplate
@@ -1163,6 +1177,46 @@ public class TestMetricsRowGroupFilter {
         .shouldRead(messageType, blockMetaData);
   }
 
+  private boolean shouldReadVariant(Expression expression, List<GenericRecord> 
records)
+      throws IOException {
+    assumeThat(format).isEqualTo(FileFormat.PARQUET);
+
+    File parquetFile =
+        writeParquetFile("variant-test-" + System.nanoTime(), VARIANT_SCHEMA, 
records);
+    InputFile inFile = Files.localInput(parquetFile);
+    try (ParquetFileReader reader = 
ParquetFileReader.open(parquetInputFile(inFile))) {
+      BlockMetaData blockMetaData = reader.getRowGroups().get(0);
+      MessageType fileSchema = reader.getFileMetaData().getSchema();
+      ParquetMetricsRowGroupFilter rowGroupFilter =
+          new ParquetMetricsRowGroupFilter(VARIANT_SCHEMA, expression, true);
+      return rowGroupFilter.shouldRead(fileSchema, blockMetaData);
+    }
+  }
+
+  // Helper method to create a Variant with a single key-value pair
+  private Variant createVariantWithKey(VariantMetadata md, String value) {
+    ShreddedObject obj = Variants.object(md);
+    obj.put("k", Variants.of(value));
+    return Variant.of(md, obj);
+  }
+
+  // Helper method to create test records with variant field
+  private List<GenericRecord> createVariantRecords(Variant variantValue) {
+    List<GenericRecord> records = Lists.newArrayListWithExpectedSize(2);
+
+    GenericRecord r0 = GenericRecord.create(VARIANT_SCHEMA);
+    r0.setField("id", 0);
+    r0.setField("variant_field", variantValue);
+    records.add(r0);
+
+    GenericRecord r1 = GenericRecord.create(VARIANT_SCHEMA);
+    r1.setField("id", 1);
+    r1.setField("variant_field", null);
+    records.add(r1);
+
+    return records;
+  }
+
   private org.apache.parquet.io.InputFile parquetInputFile(InputFile inFile) {
     return new org.apache.parquet.io.InputFile() {
       @Override
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
index 598e5dd235..89ded2a8ac 100644
--- 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
+++ 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
@@ -326,10 +326,10 @@ public class ParquetMetricsRowGroupFilter {
     public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
       int id = ref.fieldId();
 
-      // When filtering nested types notNull() is implicit filter passed even 
though complex
-      // filters aren't pushed down in Parquet. Leave all nested column type 
filters to be
+      // Leave all nested column type and variant type filters to be
       // evaluated post scan.
-      if (schema.findType(id) instanceof Type.NestedType) {
+      Type type = schema.findType(id);
+      if (type instanceof Type.NestedType || type.isVariantType()) {
         return ROWS_MIGHT_MATCH;
       }
 
@@ -376,10 +376,10 @@ public class ParquetMetricsRowGroupFilter {
     public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
       int id = ref.fieldId();
 
-      // When filtering nested types notNull() is implicit filter passed even 
though complex
-      // filters aren't pushed down in Parquet. Leave all nested column type 
filters to be
+      // Leave all nested column type and variant type filters to be
       // evaluated post scan.
-      if (schema.findType(id) instanceof Type.NestedType) {
+      Type type = schema.findType(id);
+      if (type instanceof Type.NestedType || type.isVariantType()) {
         return ROWS_MIGHT_MATCH;
       }
 

Reply via email to