This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.10.x by this push:
new 4119a36eda Parquet, Data, Spark: Fix variant type filtering in
ParquetMetricsRowGroupFilter (#14081) (#14467)
4119a36eda is described below
commit 4119a36eda6cbe31fa4e856c2c71b4d256e58ec7
Author: Huaxin Gao <[email protected]>
AuthorDate: Sat Nov 1 21:27:57 2025 -0700
Parquet, Data, Spark: Fix variant type filtering in
ParquetMetricsRowGroupFilter (#14081) (#14467)
(cherry picked from commit fb63af014c39f687f3016a1b3c8bde4a494e9a4a)
Co-authored-by: Drew Gallardo <[email protected]>
---
.../iceberg/data/TestMetricsRowGroupFilter.java | 147 ++++++++++++++++-----
.../parquet/ParquetMetricsRowGroupFilter.java | 9 +-
.../apache/iceberg/spark/SparkTestHelperBase.java | 8 ++
.../iceberg/spark/sql/TestFilterPushDown.java | 89 ++++++++++++-
4 files changed, 212 insertions(+), 41 deletions(-)
diff --git
a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
index 384dcacd10..e12015d5eb 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.data;
-import static org.apache.iceberg.avro.AvroSchemaUtil.convert;
import static org.apache.iceberg.expressions.Expressions.and;
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.apache.iceberg.expressions.Expressions.greaterThan;
@@ -49,8 +48,6 @@ import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.FileFormat;
@@ -59,9 +56,9 @@ import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
@@ -78,6 +75,10 @@ import org.apache.iceberg.types.Types.DoubleType;
import org.apache.iceberg.types.Types.FloatType;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.variants.ShreddedObject;
+import org.apache.iceberg.variants.Variant;
+import org.apache.iceberg.variants.VariantMetadata;
+import org.apache.iceberg.variants.Variants;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -138,6 +139,11 @@ public class TestMetricsRowGroupFilter {
optional(16, "_no_nans", Types.DoubleType.get()),
optional(17, "_some_double_nans", Types.DoubleType.get()));
+ private static final Schema VARIANT_SCHEMA =
+ new Schema(
+ required(1, "id", IntegerType.get()),
+ optional(2, "variant_field", Types.VariantType.get()));
+
private static final String TOO_LONG_FOR_STATS_PARQUET;
static {
@@ -220,40 +226,32 @@ public class TestMetricsRowGroupFilter {
}
private void createParquetInputFile() throws IOException {
- File parquetFile = new File(tempDir, "junit" + System.nanoTime());
-
- // build struct field schema
- org.apache.avro.Schema structSchema =
AvroSchemaUtil.convert(UNDERSCORE_STRUCT_FIELD_TYPE);
-
- OutputFile outFile = Files.localOutput(parquetFile);
- try (FileAppender<Record> appender =
Parquet.write(outFile).schema(FILE_SCHEMA).build()) {
- GenericRecordBuilder builder = new
GenericRecordBuilder(convert(FILE_SCHEMA, "table"));
- // create 50 records
- for (int i = 0; i < INT_MAX_VALUE - INT_MIN_VALUE + 1; i += 1) {
- builder.set("_id", INT_MIN_VALUE + i); // min=30, max=79, num-nulls=0
- builder.set(
- "_no_stats_parquet",
- TOO_LONG_FOR_STATS_PARQUET); // value longer than 4k will produce
no stats
- // in Parquet
- builder.set("_required", "req"); // required, always non-null
- builder.set("_all_nulls", null); // never non-null
- builder.set("_some_nulls", (i % 10 == 0) ? null : "some"); // includes
some null values
- builder.set("_no_nulls", ""); // optional, but always non-null
- builder.set("_all_nans", Double.NaN); // never non-nan
- builder.set("_some_nans", (i % 10 == 0) ? Float.NaN : 2F); // includes
some nan values
- builder.set(
- "_some_double_nans", (i % 10 == 0) ? Double.NaN : 2D); // includes
some nan values
- builder.set("_no_nans", 3D); // optional, but always non-nan
- builder.set("_str", i + "str" + i);
-
- Record structNotNull = new Record(structSchema);
- structNotNull.put("_int_field", INT_MIN_VALUE + i);
- builder.set("_struct_not_null", structNotNull); // struct with int
-
- appender.add(builder.build());
- }
+ List<GenericRecord> records = Lists.newArrayList();
+
+ for (int i = 0; i < INT_MAX_VALUE - INT_MIN_VALUE + 1; i += 1) {
+ GenericRecord builder = GenericRecord.create(FILE_SCHEMA);
+ builder.setField("_id", INT_MIN_VALUE + i); // min=30, max=79,
num-nulls=0
+ builder.setField(
+ "_no_stats_parquet",
+ TOO_LONG_FOR_STATS_PARQUET); // value longer than 4k will produce no
stats
+ // in Parquet
+ builder.setField("_required", "req"); // required, always non-null
+ builder.setField("_all_nulls", null); // never non-null
+ builder.setField("_some_nulls", (i % 10 == 0) ? null : "some"); //
includes some null values
+ builder.setField("_no_nulls", ""); // optional, but always non-null
+ builder.setField("_all_nans", Double.NaN); // never non-nan
+ builder.setField("_some_nans", (i % 10 == 0) ? Float.NaN : 2F); //
includes some nan values
+ builder.setField(
+ "_some_double_nans", (i % 10 == 0) ? Double.NaN : 2D); // includes
some nan values
+ builder.setField("_no_nans", 3D); // optional, but always non-nan
+ builder.setField("_str", i + "str" + i);
+ GenericRecord structNotNull =
GenericRecord.create(UNDERSCORE_STRUCT_FIELD_TYPE);
+ structNotNull.setField("_int_field", INT_MIN_VALUE + i);
+ builder.setField("_struct_not_null", structNotNull); // struct with int
+ records.add(builder);
}
+ File parquetFile = writeParquetFile("junit", FILE_SCHEMA, records);
InputFile inFile = Files.localInput(parquetFile);
try (ParquetFileReader reader =
ParquetFileReader.open(parquetInputFile(inFile))) {
assertThat(reader.getRowGroups()).as("Should create only one row
group").hasSize(1);
@@ -264,6 +262,24 @@ public class TestMetricsRowGroupFilter {
parquetFile.deleteOnExit();
}
+ private File writeParquetFile(String fileName, Schema schema,
List<GenericRecord> records)
+ throws IOException {
+ File parquetFile = new File(tempDir, fileName + System.nanoTime());
+
+ OutputFile outFile = Files.localOutput(parquetFile);
+ try (FileAppender<GenericRecord> appender =
+ Parquet.write(outFile)
+ .schema(schema)
+ .createWriterFunc(GenericParquetWriter::create)
+ .build()) {
+ for (GenericRecord record : records) {
+ appender.add(record);
+ }
+ }
+ parquetFile.deleteOnExit();
+ return parquetFile;
+ }
+
@TestTemplate
public void testAllNulls() {
boolean shouldRead;
@@ -988,6 +1004,65 @@ public class TestMetricsRowGroupFilter {
.isTrue();
}
+ @TestTemplate
+ public void testVariantFieldMixedValuesNotNull() throws IOException {
+ assumeThat(format).isEqualTo(FileFormat.PARQUET);
+
+ List<GenericRecord> records = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ GenericRecord record = GenericRecord.create(VARIANT_SCHEMA);
+ record.setField("id", i);
+ if (i % 2 == 0) {
+ VariantMetadata metadata = Variants.metadata("field");
+ ShreddedObject obj = Variants.object(metadata);
+ obj.put("field", Variants.of("value" + i));
+ Variant variant = Variant.of(metadata, obj);
+ record.setField("variant_field", variant);
+ }
+ records.add(record);
+ }
+
+ File parquetFile = writeParquetFile("test-variant", VARIANT_SCHEMA,
records);
+ InputFile inFile = Files.localInput(parquetFile);
+ try (ParquetFileReader reader =
ParquetFileReader.open(parquetInputFile(inFile))) {
+ BlockMetaData blockMetaData = reader.getRowGroups().get(0);
+ MessageType fileSchema = reader.getFileMetaData().getSchema();
+ ParquetMetricsRowGroupFilter rowGroupFilter =
+ new ParquetMetricsRowGroupFilter(VARIANT_SCHEMA,
notNull("variant_field"), true);
+
+ assertThat(rowGroupFilter.shouldRead(fileSchema, blockMetaData))
+ .as("Should read: variant notNull filters must be evaluated post
scan")
+ .isTrue();
+ }
+ }
+
+ @TestTemplate
+ public void testVariantFieldAllNullsNotNull() throws IOException {
+ assumeThat(format).isEqualTo(FileFormat.PARQUET);
+
+ List<GenericRecord> records = Lists.newArrayListWithExpectedSize(10);
+ for (int i = 0; i < 10; i++) {
+ GenericRecord record = GenericRecord.create(VARIANT_SCHEMA);
+ record.setField("id", i);
+ record.setField("variant_field", null);
+ records.add(record);
+ }
+
+ File parquetFile = writeParquetFile("test-variant-nulls", VARIANT_SCHEMA,
records);
+ InputFile inFile = Files.localInput(parquetFile);
+
+ try (ParquetFileReader reader =
ParquetFileReader.open(parquetInputFile(inFile))) {
+ BlockMetaData blockMetaData = reader.getRowGroups().get(0);
+ MessageType fileSchema = reader.getFileMetaData().getSchema();
+ ParquetMetricsRowGroupFilter rowGroupFilter =
+ new ParquetMetricsRowGroupFilter(VARIANT_SCHEMA,
notNull("variant_field"), true);
+
+ assertThat(rowGroupFilter.shouldRead(fileSchema, blockMetaData))
+ .as("Should read: variant notNull filters must be evaluated post
scan even for all nulls")
+ .isTrue();
+ }
+ }
+
private boolean shouldRead(Expression expression) {
return shouldRead(expression, true);
}
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
index 1ad346d39a..598e5dd235 100644
---
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
+++
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
@@ -155,10 +155,11 @@ public class ParquetMetricsRowGroupFilter {
// if the column has no non-null values, the expression cannot match
int id = ref.fieldId();
- // When filtering nested types notNull() is implicit filter passed even
though complex
- // filters aren't pushed down in Parquet. Leave all nested column type
filters to be
- // evaluated post scan.
- if (schema.findType(id) instanceof Type.NestedType) {
+ // When filtering nested types or variant types, notNull() is an
implicit filter passed
+ // even though complex filters aren't pushed down in Parquet. Leave
these type filters
+ // to be evaluated post scan.
+ Type type = schema.findType(id);
+ if (type instanceof Type.NestedType || type.isVariantType()) {
return ROWS_MIGHT_MATCH;
}
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java
index 9fc71125a9..2754e891a4 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.spark.sql.Row;
+import org.apache.spark.unsafe.types.VariantVal;
public class SparkTestHelperBase {
protected static final Object ANY = new Object();
@@ -79,6 +80,13 @@ public class SparkTestHelperBase {
} else {
assertEquals(newContext, (Object[]) expectedValue, (Object[])
actualValue);
}
+ } else if (expectedValue instanceof VariantVal && actualValue instanceof
VariantVal) {
+ // Spark VariantVal comparison is based on raw byte[] comparison,
which can fail
+ // if Spark uses trailing null bytes. so, we compare their JSON
representation instead.
+ assertThat(actualValue)
+ .asString()
+ .as("%s contents should match (VariantVal JSON)", context)
+ .isEqualTo((expectedValue).toString());
} else if (expectedValue != ANY) {
assertThat(actualValue).as("%s contents should match",
context).isEqualTo(expectedValue);
}
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
index 9d2ce2b388..a984c4c826 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
@@ -23,6 +23,8 @@ import static org.apache.iceberg.PlanningMode.LOCAL;
import static org.assertj.core.api.Assertions.assertThat;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
@@ -35,7 +37,12 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.TestBaseWithCatalog;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.variants.ShreddedObject;
+import org.apache.iceberg.variants.VariantMetadata;
+import org.apache.iceberg.variants.Variants;
import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.unsafe.types.VariantVal;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -578,6 +585,68 @@ public class TestFilterPushDown extends
TestBaseWithCatalog {
ImmutableList.of(row(4, Double.NEGATIVE_INFINITY)));
}
+ @TestTemplate
+ public void testVariantExtractFiltering() {
+ sql(
+ "CREATE TABLE %s (id BIGINT, data VARIANT) USING iceberg TBLPROPERTIES"
+ + "('format-version'='3')",
+ tableName);
+ configurePlanningMode(planningMode);
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(1, parse_json('{\"field\": \"foo\", \"num\": 25}')), "
+ + "(2, parse_json('{\"field\": \"bar\", \"num\": 30}')), "
+ + "(3, parse_json('{\"field\": \"baz\", \"num\": 35}')), "
+ + "(4, null)",
+ tableName);
+
+ withDefaultTimeZone(
+ "UTC",
+ () -> {
+ checkFilters(
+ "try_variant_get(data, '$.num', 'int') IS NOT NULL",
+ "isnotnull(data) AND isnotnull(try_variant_get(data, $.num,
IntegerType, false, Some(UTC)))",
+ "data IS NOT NULL",
+ ImmutableList.of(
+ row(1L, toSparkVariantRow("foo", 25)),
+ row(2L, toSparkVariantRow("bar", 30)),
+ row(3L, toSparkVariantRow("baz", 35))));
+
+ checkFilters(
+ "try_variant_get(data, '$.num', 'int') IS NULL",
+ "isnull(try_variant_get(data, $.num, IntegerType, false,
Some(UTC)))",
+ "",
+ ImmutableList.of(row(4L, null)));
+
+ checkFilters(
+ "try_variant_get(data, '$.num', 'int') > 30",
+ "isnotnull(data) AND (try_variant_get(data, $.num, IntegerType,
false, Some(UTC)) > 30)",
+ "data IS NOT NULL",
+ ImmutableList.of(row(3L, toSparkVariantRow("baz", 35))));
+
+ checkFilters(
+ "try_variant_get(data, '$.num', 'int') = 30",
+ "isnotnull(data) AND (try_variant_get(data, $.num, IntegerType,
false, Some(UTC)) = 30)",
+ "data IS NOT NULL",
+ ImmutableList.of(row(2L, toSparkVariantRow("bar", 30))));
+
+ checkFilters(
+ "try_variant_get(data, '$.num', 'int') IN (25, 35)",
+ "try_variant_get(data, $.num, IntegerType, false, Some(UTC)) IN
(25,35)",
+ "",
+ ImmutableList.of(
+ row(1L, toSparkVariantRow("foo", 25)), row(3L,
toSparkVariantRow("baz", 35))));
+
+ checkFilters(
+ "try_variant_get(data, '$.num', 'int') != 25",
+ "isnotnull(data) AND NOT (try_variant_get(data, $.num,
IntegerType, false, Some(UTC)) = 25)",
+ "data IS NOT NULL",
+ ImmutableList.of(
+ row(2L, toSparkVariantRow("bar", 30)), row(3L,
toSparkVariantRow("baz", 35))));
+ });
+ }
+
private void checkOnlyIcebergFilters(
String predicate, String icebergFilters, List<Object[]> expectedRows) {
@@ -600,7 +669,7 @@ public class TestFilterPushDown extends TestBaseWithCatalog
{
if (sparkFilter != null) {
assertThat(planAsString)
.as("Post scan filter should match")
- .contains("Filter (" + sparkFilter + ")");
+ .containsAnyOf("Filter (" + sparkFilter + ")", "Filter " +
sparkFilter);
} else {
assertThat(planAsString).as("Should be no post scan
filter").doesNotContain("Filter (");
}
@@ -613,4 +682,22 @@ public class TestFilterPushDown extends
TestBaseWithCatalog {
private Timestamp timestamp(String timestampAsString) {
return Timestamp.from(Instant.parse(timestampAsString));
}
+
+ private VariantVal toSparkVariantRow(String field, int num) {
+ VariantMetadata metadata = Variants.metadata("field", "num");
+
+ ShreddedObject obj = Variants.object(metadata);
+ obj.put("field", Variants.of(field));
+ obj.put("num", Variants.of(num));
+
+ ByteBuffer metadataBuffer =
+
ByteBuffer.allocate(metadata.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
+ metadata.writeTo(metadataBuffer, 0);
+
+ ByteBuffer valueBuffer =
ByteBuffer.allocate(obj.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
+ obj.writeTo(valueBuffer, 0);
+
+ return new VariantVal(
+ ByteBuffers.toByteArray(valueBuffer),
ByteBuffers.toByteArray(metadataBuffer));
+ }
}