This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2f947eb302 Parquet: Treat VARIANT like nested for eq/in in
ParquetMetricsRowGroupFilter (#14279)
2f947eb302 is described below
commit 2f947eb30222cbd4a52074322a54531c95368ffa
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Oct 17 17:03:55 2025 -0700
Parquet: Treat VARIANT like nested for eq/in in
ParquetMetricsRowGroupFilter (#14279)
* Parquet: Treat VARIANT like nested for eq/in in
ParquetMetricsRowGroupFilter
* address comments
* address comments
---
.../iceberg/data/TestMetricsRowGroupFilter.java | 98 +++++++++++++++++-----
.../parquet/ParquetMetricsRowGroupFilter.java | 12 +--
2 files changed, 82 insertions(+), 28 deletions(-)
diff --git
a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
index c871c25c93..e1f9f07e7a 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
@@ -1030,18 +1030,11 @@ public class TestMetricsRowGroupFilter {
records.add(record);
}
- File parquetFile = writeParquetFile("test-variant", VARIANT_SCHEMA,
records);
- InputFile inFile = Files.localInput(parquetFile);
- try (ParquetFileReader reader =
ParquetFileReader.open(parquetInputFile(inFile))) {
- BlockMetaData blockMetaData = reader.getRowGroups().get(0);
- MessageType fileSchema = reader.getFileMetaData().getSchema();
- ParquetMetricsRowGroupFilter rowGroupFilter =
- new ParquetMetricsRowGroupFilter(VARIANT_SCHEMA,
notNull("variant_field"), true);
+ boolean shouldRead = shouldReadVariant(notNull("variant_field"), records);
- assertThat(rowGroupFilter.shouldRead(fileSchema, blockMetaData))
- .as("Should read: variant notNull filters must be evaluated post
scan")
- .isTrue();
- }
+ assertThat(shouldRead)
+ .as("Should read: variant notNull filters must be evaluated post scan")
+ .isTrue();
}
@TestTemplate
@@ -1056,19 +1049,40 @@ public class TestMetricsRowGroupFilter {
records.add(record);
}
- File parquetFile = writeParquetFile("test-variant-nulls", VARIANT_SCHEMA,
records);
- InputFile inFile = Files.localInput(parquetFile);
+ boolean shouldRead = shouldReadVariant(notNull("variant_field"), records);
- try (ParquetFileReader reader =
ParquetFileReader.open(parquetInputFile(inFile))) {
- BlockMetaData blockMetaData = reader.getRowGroups().get(0);
- MessageType fileSchema = reader.getFileMetaData().getSchema();
- ParquetMetricsRowGroupFilter rowGroupFilter =
- new ParquetMetricsRowGroupFilter(VARIANT_SCHEMA,
notNull("variant_field"), true);
+ assertThat(shouldRead)
+ .as("Should read: variant notNull filters must be evaluated post scan
even for all nulls")
+ .isTrue();
+ }
- assertThat(rowGroupFilter.shouldRead(fileSchema, blockMetaData))
- .as("Should read: variant notNull filters must be evaluated post
scan even for all nulls")
- .isTrue();
- }
+ @TestTemplate
+ public void testVariantFieldEq() throws IOException {
+ assumeThat(format).isEqualTo(FileFormat.PARQUET);
+
+ VariantMetadata md = Variants.metadata("k");
+ Variant v0 = createVariantWithKey(md, "v0");
+ List<GenericRecord> records = createVariantRecords(v0);
+
+ boolean shouldRead = shouldReadVariant(equal("variant_field", v0),
records);
+ assertThat(shouldRead)
+ .as("Should read: variant eq filters must be evaluated post scan")
+ .isTrue();
+ }
+
+ @TestTemplate
+ public void testVariantFieldIn() throws IOException {
+ assumeThat(format).isEqualTo(FileFormat.PARQUET);
+
+ VariantMetadata md = Variants.metadata("k");
+ Variant v0 = createVariantWithKey(md, "v0");
+ Variant v1 = createVariantWithKey(md, "v1");
+ List<GenericRecord> records = createVariantRecords(v0);
+
+ boolean shouldRead = shouldReadVariant(in("variant_field", v0, v1),
records);
+ assertThat(shouldRead)
+ .as("Should read RowGroups: variant in filters must be evaluated post
scan")
+ .isTrue();
}
@TestTemplate
@@ -1163,6 +1177,46 @@ public class TestMetricsRowGroupFilter {
.shouldRead(messageType, blockMetaData);
}
+ private boolean shouldReadVariant(Expression expression, List<GenericRecord>
records)
+ throws IOException {
+ assumeThat(format).isEqualTo(FileFormat.PARQUET);
+
+ File parquetFile =
+ writeParquetFile("variant-test-" + System.nanoTime(), VARIANT_SCHEMA,
records);
+ InputFile inFile = Files.localInput(parquetFile);
+ try (ParquetFileReader reader =
ParquetFileReader.open(parquetInputFile(inFile))) {
+ BlockMetaData blockMetaData = reader.getRowGroups().get(0);
+ MessageType fileSchema = reader.getFileMetaData().getSchema();
+ ParquetMetricsRowGroupFilter rowGroupFilter =
+ new ParquetMetricsRowGroupFilter(VARIANT_SCHEMA, expression, true);
+ return rowGroupFilter.shouldRead(fileSchema, blockMetaData);
+ }
+ }
+
+ // Helper method to create a Variant with a single key-value pair
+ private Variant createVariantWithKey(VariantMetadata md, String value) {
+ ShreddedObject obj = Variants.object(md);
+ obj.put("k", Variants.of(value));
+ return Variant.of(md, obj);
+ }
+
+ // Helper method to create test records with variant field
+ private List<GenericRecord> createVariantRecords(Variant variantValue) {
+ List<GenericRecord> records = Lists.newArrayListWithExpectedSize(2);
+
+ GenericRecord r0 = GenericRecord.create(VARIANT_SCHEMA);
+ r0.setField("id", 0);
+ r0.setField("variant_field", variantValue);
+ records.add(r0);
+
+ GenericRecord r1 = GenericRecord.create(VARIANT_SCHEMA);
+ r1.setField("id", 1);
+ r1.setField("variant_field", null);
+ records.add(r1);
+
+ return records;
+ }
+
private org.apache.parquet.io.InputFile parquetInputFile(InputFile inFile) {
return new org.apache.parquet.io.InputFile() {
@Override
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
index 598e5dd235..89ded2a8ac 100644
---
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
+++
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
@@ -326,10 +326,10 @@ public class ParquetMetricsRowGroupFilter {
public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
int id = ref.fieldId();
- // When filtering nested types notNull() is implicit filter passed even
though complex
- // filters aren't pushed down in Parquet. Leave all nested column type
filters to be
+ // Leave all nested column type and variant type filters to be
// evaluated post scan.
- if (schema.findType(id) instanceof Type.NestedType) {
+ Type type = schema.findType(id);
+ if (type instanceof Type.NestedType || type.isVariantType()) {
return ROWS_MIGHT_MATCH;
}
@@ -376,10 +376,10 @@ public class ParquetMetricsRowGroupFilter {
public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
int id = ref.fieldId();
- // When filtering nested types notNull() is implicit filter passed even
though complex
- // filters aren't pushed down in Parquet. Leave all nested column type
filters to be
+ // Leave all nested column type and variant type filters to be
// evaluated post scan.
- if (schema.findType(id) instanceof Type.NestedType) {
+ Type type = schema.findType(id);
+ if (type instanceof Type.NestedType || type.isVariantType()) {
return ROWS_MIGHT_MATCH;
}