This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 10ba4eebf1 Flink: Support writing shredded variant (#15596)
10ba4eebf1 is described below
commit 10ba4eebf1b870627fb20b9a75413e8088cd975f
Author: GuoYu <[email protected]>
AuthorDate: Fri May 22 12:38:18 2026 +0800
Flink: Support writing shredded variant (#15596)
---
docs/docs/flink-configuration.md | 2 +
.../org/apache/iceberg/flink/FlinkWriteConf.java | 18 +
.../apache/iceberg/flink/FlinkWriteOptions.java | 6 +
.../iceberg/flink/data/FlinkFormatModels.java | 8 +-
.../flink/data/FlinkVariantShreddingAnalyzer.java | 72 ++
.../org/apache/iceberg/flink/sink/SinkUtil.java | 6 +
.../flink/TestFlinkVariantShreddingType.java | 1008 ++++++++++++++++++++
.../apache/iceberg/parquet/ParquetFormatModel.java | 42 +-
.../iceberg/parquet/TestParquetDataWriter.java | 8 +-
.../iceberg/spark/source/SparkFormatModels.java | 4 +-
.../iceberg/spark/source/SparkFormatModels.java | 4 +-
11 files changed, 1164 insertions(+), 14 deletions(-)
diff --git a/docs/docs/flink-configuration.md b/docs/docs/flink-configuration.md
index f30b422888..2f70cbf576 100644
--- a/docs/docs/flink-configuration.md
+++ b/docs/docs/flink-configuration.md
@@ -160,6 +160,8 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true')
*/
| compression-strategy | Table
write.orc.compression-strategy | Overrides this table's compression
strategy for ORC tables for this write
|
| write-parallelism | Upstream operator parallelism
| Overrides the writer parallelism
|
| uid-suffix | As per table property
| Overrides the uid suffix used in the underlying IcebergSink for this
table |
+| shred-variants | Table write.parquet.shred-variants
| Overrides this table's shred variants for this write |
+| variant-inference-buffer-size | Table
write.parquet.variant-inference-buffer-size | Overrides this table's variant
inference buffer size for this write |
#### Range distribution statistics type
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 990d23f2aa..fd3fccb224 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -262,4 +262,22 @@ public class FlinkWriteConf {
.flinkConfig(FlinkWriteOptions.TABLE_REFRESH_INTERVAL)
.parseOptional();
}
+
+ public boolean parquetShredVariants() {
+ return confParser
+ .booleanConf()
+ .option(FlinkWriteOptions.SHRED_VARIANTS.key())
+ .tableProperty(TableProperties.PARQUET_SHRED_VARIANTS)
+ .defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT)
+ .parse();
+ }
+
+ public int parquetVariantInferenceBufferSize() {
+ return confParser
+ .intConf()
+ .option(FlinkWriteOptions.VARIANT_INFERENCE_BUFFER_SIZE.key())
+ .tableProperty(TableProperties.PARQUET_VARIANT_BUFFER_SIZE)
+ .defaultValue(TableProperties.PARQUET_VARIANT_BUFFER_SIZE_DEFAULT)
+ .parse();
+ }
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index ee2aeaa450..1fdd6df8d7 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -105,4 +105,10 @@ public class FlinkWriteOptions {
// specify the uidSuffix to be used for the underlying IcebergSink
public static final ConfigOption<String> UID_SUFFIX =
ConfigOptions.key("uid-suffix").stringType().defaultValue("");
+
+ public static final ConfigOption<Boolean> SHRED_VARIANTS =
+ ConfigOptions.key("shred-variants").booleanType().defaultValue(false);
+
+ public static final ConfigOption<Integer> VARIANT_INFERENCE_BUFFER_SIZE =
+
ConfigOptions.key("variant-inference-buffer-size").intType().noDefaultValue();
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
index dd713b0dce..747a461868 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
@@ -18,7 +18,10 @@
*/
package org.apache.iceberg.flink.data;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.avro.AvroFormatModel;
import org.apache.iceberg.formats.FormatModelRegistry;
@@ -33,7 +36,10 @@ public class FlinkFormatModels {
RowType.class,
FlinkParquetWriters::buildWriter,
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
- FlinkParquetReaders.buildReader(icebergSchema, fileSchema,
idToConstant)));
+ FlinkParquetReaders.buildReader(icebergSchema, fileSchema,
idToConstant),
+ new FlinkVariantShreddingAnalyzer(),
+ (Function<RowType, UnaryOperator<RowData>>)
+ rowType -> new RowDataSerializer(rowType)::copy));
FormatModelRegistry.register(
AvroFormatModel.create(
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java
new file mode 100644
index 0000000000..cfb4d9a556
--- /dev/null
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkVariantShreddingAnalyzer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.variant.BinaryVariant;
+import org.apache.flink.types.variant.Variant;
+import org.apache.iceberg.parquet.VariantShreddingAnalyzer;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.variants.VariantMetadata;
+import org.apache.iceberg.variants.VariantValue;
+
+/**
+ * Analyzes Variant fields in Flink {@link RowData} and converts Flink's
binary Variant
+ * representation to Iceberg {@link VariantValue} instances for Variant
shredding.
+ */
+public class FlinkVariantShreddingAnalyzer extends
VariantShreddingAnalyzer<RowData, RowType> {
+
+ @Override
+ protected List<VariantValue> extractVariantValues(
+ List<RowData> bufferedRows, int variantFieldIndex) {
+ List<VariantValue> values = Lists.newArrayList();
+
+ for (RowData row : bufferedRows) {
+ if (!row.isNullAt(variantFieldIndex)) {
+ Variant flinkVariant = row.getVariant(variantFieldIndex);
+ if (flinkVariant != null) {
+ if (flinkVariant instanceof BinaryVariant binaryVariant) {
+ VariantValue variantValue =
+ VariantValue.from(
+ VariantMetadata.from(
+ ByteBuffer.wrap(binaryVariant.getMetadata())
+ .order(ByteOrder.LITTLE_ENDIAN)),
+
ByteBuffer.wrap(binaryVariant.getValue()).order(ByteOrder.LITTLE_ENDIAN));
+
+ values.add(variantValue);
+ } else {
+ throw new UnsupportedOperationException(
+ "Not a supported type: " + flinkVariant.getClass());
+ }
+ }
+ }
+ }
+
+ return values;
+ }
+
+ @Override
+ protected int resolveColumnIndex(RowType flinkSchema, String columnName) {
+ return flinkSchema.getFieldIndex(columnName);
+ }
+}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java
index b3a9ac6ba2..d4c3d3beb8 100644
--- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java
+++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/SinkUtil.java
@@ -24,6 +24,8 @@ import static
org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.PARQUET_SHRED_VARIANTS;
+import static org.apache.iceberg.TableProperties.PARQUET_VARIANT_BUFFER_SIZE;
import java.util.List;
import java.util.Map;
@@ -128,6 +130,10 @@ public class SinkUtil {
writeProperties.put(PARQUET_COMPRESSION_LEVEL,
parquetCompressionLevel);
}
+ writeProperties.put(PARQUET_SHRED_VARIANTS,
String.valueOf(conf.parquetShredVariants()));
+ writeProperties.put(
+ PARQUET_VARIANT_BUFFER_SIZE,
String.valueOf(conf.parquetVariantInferenceBufferSize()));
+
break;
case AVRO:
writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java
new file mode 100644
index 0000000000..e809a47cd2
--- /dev/null
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkVariantShreddingType.java
@@ -0,0 +1,1008 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import static org.apache.parquet.schema.Types.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.reader.ReaderUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.variants.Variant;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+
+class TestFlinkVariantShreddingType extends CatalogTestBase {
+
+ private static final String TABLE_NAME = "test_table";
+ private Table icebergTable;
+
+ @Parameters(name = "catalogName={0}, baseNamespace={1}")
+ protected static List<Object[]> parameters() {
+ List<Object[]> parameters = Lists.newArrayList();
+ parameters.add(new Object[] {"testhadoop", Namespace.empty()});
+ parameters.add(new Object[] {"testhadoop_basenamespace",
Namespace.of("l0", "l1")});
+ return parameters;
+ }
+
+ @Override
+ @BeforeEach
+ public void before() {
+ super.before();
+ sql("CREATE DATABASE %s", flinkDatabase);
+ sql("USE CATALOG %s", catalogName);
+ sql("USE %s", DATABASE);
+ sql(
+ """
+ CREATE TABLE %s (
+ id int NOT NULL,
+ address variant NOT NULL
+ ) WITH (
+ 'write.format.default'='%s',
+ 'format-version'='3',
+ 'shred-variants'='true',
+ 'variant-inference-buffer-size'='10'
+ )
+ """,
+ TABLE_NAME, FileFormat.PARQUET.name());
+ icebergTable =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
+ }
+
+ @Override
+ @AfterEach
+ public void clean() {
+ super.clean();
+ getTableEnv()
+ .getConfig()
+ .getConfiguration()
+ .setString("table.exec.resource.default-parallelism", "4");
+ }
+
+ @TestTemplate
+ public void testExcludingNullValue() throws IOException {
+ String values =
+ """
+ (1, parse_json('{"name": "Alice", "age": 30, "dummy": null}')),
+ (2, parse_json('{"name": "Bob", "age": 25}')),
+ (3, parse_json('{"name": "Charlie", "age": 35}'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType name =
+ field(
+ "name",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.stringType()));
+ GroupType age =
+ field(
+ "age",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.intType(8, true)));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(age, name));
+ MessageType expectedSchema = parquetSchema(address);
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testConsistentType() throws IOException {
+ String values =
+ """
+ (1, parse_json('{"age": "25"}')),
+ (2, parse_json('{"age": 30}')),
+ (3, parse_json('{"age": "35"}'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType age =
+ field(
+ "age",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.stringType()));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(age));
+ MessageType expectedSchema = parquetSchema(address);
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testPrimitiveType() throws IOException {
+ String values =
+ """
+ (1, parse_json('123')),
+ (2, parse_json('"abc"')),
+ (3, parse_json('12'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType address =
+ variant(
+ "address",
+ 2,
+ Type.Repetition.REQUIRED,
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.intType(8, true)));
+ MessageType expectedSchema = parquetSchema(address);
+
+ assertThat(SimpleDataUtil.tableRecords(icebergTable)).hasSize(3);
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testPrimitiveDecimalType() throws IOException {
+ String values =
+ """
+ (1, parse_json('123.56')),
+ (2, parse_json('"abc"')),
+ (3, parse_json('12.56'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType address =
+ variant(
+ "address",
+ 2,
+ Type.Repetition.REQUIRED,
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.decimalType(2, 5)));
+ MessageType expectedSchema = parquetSchema(address);
+ assertThat(SimpleDataUtil.tableRecords(icebergTable)).hasSize(3);
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testBooleanType() throws IOException {
+ String values =
+ """
+ (1, parse_json('{"active": true}')),
+ (2, parse_json('{"active": false}')),
+ (3, parse_json('{"active": true}'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType active = field("active",
shreddedPrimitive(PrimitiveType.PrimitiveTypeName.BOOLEAN));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(active));
+ MessageType expectedSchema = parquetSchema(address);
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testDecimalTypeWithInconsistentScales() throws IOException {
+ String values =
+ """
+ (1, parse_json('{"price": 123.456789}')),
+ (2, parse_json('{"price": 678.90}')),
+ (3, parse_json('{"price": 999.99}'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType price =
+ field(
+ "price",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.decimalType(6, 9)));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(price));
+ MessageType expectedSchema = parquetSchema(address);
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testDecimalTypeWithConsistentScales() throws IOException {
+ String values =
+ """
+ (1, parse_json('{"price": 123.45}')),
+ (2, parse_json('{"price": 678.90}')),
+ (3, parse_json('{"price": 999.99}'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType price =
+ field(
+ "price",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.decimalType(2, 5)));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(price));
+ MessageType expectedSchema = parquetSchema(address);
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testArrayType() throws IOException {
+ String values =
+ """
+ (1, parse_json('["java", "scala", "python"]')),
+ (2, parse_json('["rust", "go"]')),
+ (3, parse_json('["javascript"]'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType arr =
+ list(
+ element(
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.stringType())));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED, arr);
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testNestedArrayType() throws IOException {
+
+ String values =
+ """
+ (1, parse_json('{"tags": ["java", "scala", "python"]}')),
+ (2, parse_json('{"tags": ["rust", "go"]}')),
+ (3, parse_json('{"tags": ["javascript"]}'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType tags =
+ field(
+ "tags",
+ list(
+ element(
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
+ LogicalTypeAnnotation.stringType()))));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(tags));
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testNestedObjectType() throws IOException {
+
+ String values =
+ """
+ (1, parse_json('{"location": {"city": "Seattle", "zip": 98101},
"tags": ["java", "scala", "python"]}')),
+ (2, parse_json('{"location": {"city": "Portland", "zip":
97201}}')),
+ (3, parse_json('{"location": {"city": "NYC", "zip": 10001}}'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType city =
+ field(
+ "city",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.stringType()));
+ GroupType zip =
+ field(
+ "zip",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.intType(32, true)));
+ GroupType location = field("location", objectFields(city, zip));
+ GroupType tags =
+ field(
+ "tags",
+ list(
+ element(
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
+ LogicalTypeAnnotation.stringType()))));
+
+ GroupType address =
+ variant("address", 2, Type.Repetition.REQUIRED, objectFields(location,
tags));
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testLazyInitializationWithBufferedRows() throws IOException {
+
+ String values =
+ """
+ (1, parse_json('{"name": "Alice", "age": 30}')),
+ (2, parse_json('{"name": "Bob", "age": 25}')),
+ (3, parse_json('{"name": "Charlie", "age": 35}')),
+ (4, parse_json('{"name": "David", "age": 28}')),
+ (5, parse_json('{"name": "Eve", "age": 32}')),
+ (6, parse_json('{"name": "Frank", "age": 40}')),
+ (7, parse_json('{"name": "Grace", "age": 27}'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType name =
+ field(
+ "name",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.stringType()));
+ GroupType age =
+ field(
+ "age",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.intType(8, true)));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(age, name));
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+ assertThat(genericRowData()).hasSize(7);
+ }
+
+ @TestTemplate
+ public void testColumnIndexTruncateLength() throws IOException {
+ sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME);
+
+ int customTruncateLength = 10;
+ sql(
+ "ALTER TABLE %s SET ('%s'='%d')",
+ TABLE_NAME, "parquet.columnindex.truncate.length",
customTruncateLength);
+
+ StringBuilder valuesBuilder = new StringBuilder();
+ for (int i = 1; i <= 10; i++) {
+ if (i > 1) {
+ valuesBuilder.append(", ");
+ }
+
+ String longValue = "A".repeat(20);
+ valuesBuilder.append(
+ String.format(
+ """
+ (%d, parse_json('{"description": "%s", "id": %d}'))
+ """
+ .trim(),
+ i,
+ longValue,
+ i));
+ }
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, valuesBuilder.toString());
+
+ GroupType description =
+ field(
+ "description",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.stringType()));
+ GroupType id =
+ field(
+ "id",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.intType(8, true)));
+ GroupType address =
+ variant("address", 2, Type.Repetition.REQUIRED,
objectFields(description, id));
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+ assertThat(genericRowData()).hasSize(10);
+ }
+
+ @TestTemplate
+ public void testIntegerFamilyPromotion() throws IOException {
+
+ // Mix of INT8, INT16, INT32, INT64 - should promote to INT64
+ String values =
+ """
+ (1, parse_json('{"value": 10}')),
+ (2, parse_json('{"value": 1000}')),
+ (3, parse_json('{"value": 100000}')),
+ (4, parse_json('{"value": 10000000000}'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType value =
+ field(
+ "value",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.INT64,
LogicalTypeAnnotation.intType(64, true)));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(value));
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testDecimalFamilyPromotion() throws IOException {
+
+ // Test that they get promoted to the most capable decimal type observed
+ String values =
+ """
+ (1, parse_json('{"value": 1.5}')),
+ (2, parse_json('{"value": 123.456789}')),
+ (3, parse_json('{"value": 123456789123456.789}'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType value =
+ field(
+ "value",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
+ 16,
+ LogicalTypeAnnotation.decimalType(6, 21)));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(value));
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testDataRoundTripWithShredding() throws IOException {
+ String values =
+ """
+ (1, parse_json('{"name": "Alice", "age": 30}')),
+ (2, parse_json('{"name": "Bob", "age": 25}')),
+ (3, parse_json('{"name": "Charlie", "age": 35}'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType name =
+ field(
+ "name",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.stringType()));
+ GroupType age =
+ field(
+ "age",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.intType(8, true)));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(age, name));
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+
+ // Verify that we can read the data back correctly
+ List<Row> rows =
+ sql(
+ """
+ SELECT id,
+ JSON_VALUE(address, '$.name'),
+ JSON_VALUE(address, '$.age' RETURNING int)
+ FROM %s
+ ORDER BY id
+ """,
+ TABLE_NAME);
+ assertThat(rows).hasSize(3);
+ assertThat(rows.get(0).getField(0)).isEqualTo(1);
+ assertThat(rows.get(0).getField(1)).isEqualTo("Alice");
+ assertThat(rows.get(0).getField(2)).isEqualTo(30);
+ assertThat(rows.get(1).getField(0)).isEqualTo(2);
+ assertThat(rows.get(1).getField(1)).isEqualTo("Bob");
+ assertThat(rows.get(1).getField(2)).isEqualTo(25);
+ assertThat(rows.get(2).getField(0)).isEqualTo(3);
+ assertThat(rows.get(2).getField(1)).isEqualTo("Charlie");
+ assertThat(rows.get(2).getField(2)).isEqualTo(35);
+ }
+
+ @TestTemplate
+ public void testVariantWithNullValues() throws IOException {
+
+ String values =
+ """
+ (1, parse_json('null')),
+ (2, parse_json('null')),
+ (3, parse_json('null'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, values);
+
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED);
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testArrayOfNullElementsWithShredding() throws IOException {
+
+ sql(
+ """
+ INSERT INTO %s VALUES
+ (1, parse_json('[null, null, null]')),
+ (2, parse_json('[null]'))
+ """,
+ TABLE_NAME);
+
+ // Array elements are all null, element type is null, falls back to
unshredded
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED);
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testInfrequentFieldPruning() throws IOException {
+ // This test relies on the current VariantShreddingAnalyzer
MIN_FIELD_FREQUENCY threshold of
+ // 0.10: rare_field appears in 1/11 rows (~0.09), so it should be pruned.
+ sql("ALTER TABLE %s SET('variant-inference-buffer-size'='11')",
TABLE_NAME);
+ StringBuilder valuesBuilder = new StringBuilder();
+ for (int i = 1; i <= 11; i++) {
+ if (i > 1) {
+ valuesBuilder.append(", ");
+ }
+
+ if (i == 1) {
+ // Only the first row has rare_field
+ valuesBuilder.append(
+ String.format(
+ """
+ (%d, parse_json('{"name": "User%d", "rare_field":
"rare"}'))
+ """
+ .trim(),
+ i,
+ i));
+ } else {
+ valuesBuilder.append(
+ String.format(
+ """
+ (%d, parse_json('{"name": "User%d"}'))
+ """
+ .trim(),
+ i,
+ i));
+ }
+ }
+
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, valuesBuilder.toString());
+
+ // rare_field appears in 1/11 rows, should be pruned
+ // name appears in 11/11 rows and should be kept
+ GroupType name =
+ field(
+ "name",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.stringType()));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(name));
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testMixedTypeTieBreaking() throws IOException {
+ StringBuilder valuesBuilder = new StringBuilder();
+ for (int i = 1; i <= 10; i++) {
+ if (i > 1) {
+ valuesBuilder.append(", ");
+ }
+
+ if (i <= 5) {
+ valuesBuilder.append(
+ String.format(
+ """
+ (%d, parse_json('{"val": %d}'))
+ """
+ .trim(),
+ i,
+ i));
+ } else {
+ valuesBuilder.append(
+ String.format(
+ """
+ (%d, parse_json('{"val": "text%d"}'))
+ """
+ .trim(),
+ i,
+ i));
+ }
+ }
+
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, valuesBuilder.toString());
+
+ // 5 ints + 5 strings is a tie so STRING wins (higher TIE_BREAK_PRIORITY)
+ GroupType val =
+ field(
+ "val",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.stringType()));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(val));
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+ }
+
+ @TestTemplate
+ public void testFieldOnlyAfterBuffer() throws IOException {
+ getTableEnv()
+ .getConfig()
+ .getConfiguration()
+ .setString("table.exec.resource.default-parallelism", "1");
+
+ sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME);
+
+ sql(
+ """
+ CREATE TEMPORARY VIEW tmp_source AS
+ SELECT * FROM (VALUES
+ (1, parse_json('{"name": "Alice"}')),
+ (2, parse_json('{"name": "Bob"}')),
+ (3, parse_json('{"name": "Charlie"}')),
+ (4, parse_json('{"name": "David", "score": 95}')),
+ (5, parse_json('{"name": "Eve", "score": 88}')),
+ (6, parse_json('{"name": "Frank", "score": 72}')),
+ (7, parse_json('{"name": "Grace", "score": 91}'))
+ ) AS t(id, address)
+ """);
+
+ sql("INSERT INTO %s SELECT id, address FROM tmp_source ORDER BY id",
TABLE_NAME);
+
+ // Schema is determined from buffer (rows 1-3) which only has "name".
+ // "score" is not shredded
+ GroupType name =
+ field(
+ "name",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.stringType()));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(name));
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+
+ // Verify all data round-trips despite "score" not being shredded
+ List<Row> rows =
+ sql(
+ """
+ SELECT id,
+ JSON_VALUE(address, '$.name'),
+ JSON_VALUE(address, '$.score' RETURNING int)
+ FROM %s
+ ORDER BY id
+ """,
+ TABLE_NAME);
+ assertThat(rows).hasSize(7);
+ assertThat(rows.get(0).getField(1)).isEqualTo("Alice");
+ assertThat(rows.get(0).getField(2)).isNull();
+ assertThat(rows.get(3).getField(1)).isEqualTo("David");
+ assertThat(rows.get(3).getField(2)).isEqualTo(95);
+ assertThat(rows.get(6).getField(1)).isEqualTo("Grace");
+ assertThat(rows.get(6).getField(2)).isEqualTo(91);
+
+ sql("DROP TEMPORARY VIEW IF EXISTS tmp_source");
+ }
+
+ @TestTemplate
+ public void testCrossFileDifferentShreddedType() throws IOException {
+ sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME);
+
+ // File 1: "score" is always integer → shredded as INT8
+ String batch1 =
+ """
+ (1, parse_json('{"score": 95}')),
+ (2, parse_json('{"score": 88}')),
+ (3, parse_json('{"score": 72}'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, batch1);
+
+ // Verify file 1 schema: score shredded as INT8
+ GroupType scoreInt =
+ field(
+ "score",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.intType(8, true)));
+ MessageType expectedSchema1 =
+ parquetSchema(variant("address", 2, Type.Repetition.REQUIRED,
objectFields(scoreInt)));
+ verifyParquetSchema(icebergTable, expectedSchema1);
+
+ // File 2: "score" is always string → shredded as STRING
+ String batch2 =
+ """
+ (4, parse_json('{"score": "high"}')),
+ (5, parse_json('{"score": "medium"}')),
+ (6, parse_json('{"score": "low"}'))
+ """;
+ sql("INSERT INTO %s VALUES %s", TABLE_NAME, batch2);
+
+ // Query across both files, reader must handle different shredded types
+ List<Row> rows =
+ sql(
+ """
+ SELECT id,
+ json_value(address, '$.score')
+ FROM %s
+ ORDER BY id
+ """,
+ TABLE_NAME);
+ assertThat(rows).hasSize(6);
+ assertThat(rows.get(0).getField(1)).isEqualTo("95");
+ assertThat(rows.get(1).getField(1)).isEqualTo("88");
+ assertThat(rows.get(3).getField(1)).isEqualTo("high");
+ assertThat(rows.get(5).getField(1)).isEqualTo("low");
+ }
+
+ @TestTemplate
+ public void testAllNullVariantColumn() throws IOException {
+
+ String variantNullAbleTableName = "test_all_null_variant_column";
+ sql(
+ """
+ CREATE TABLE %s (
+ id int NOT NULL,
+ address variant
+ ) WITH (
+ 'write.format.default'='%s',
+ 'format-version'='3',
+ 'shred-variants'='true',
+ 'variant-inference-buffer-size'='10'
+ )
+ """,
+ variantNullAbleTableName, FileFormat.PARQUET.name());
+
+ sql(
+ """
+ INSERT INTO %s VALUES
+ (1, CAST(null AS VARIANT)),
+ (2, CAST(null AS VARIANT)),
+ (3, CAST(null AS VARIANT))
+ """,
+ variantNullAbleTableName);
+
+ // All variant values are SQL NULL, so no shredding should occur
+ MessageType expectedSchema = parquetSchema(variant("address", 2,
Type.Repetition.OPTIONAL));
+ Table variantNullAbleTable =
+ validationCatalog.loadTable(TableIdentifier.of(icebergNamespace,
variantNullAbleTableName));
+ verifyParquetSchema(variantNullAbleTable, expectedSchema);
+
+ List<Row> rows = sql("SELECT id, address FROM %s ORDER BY id",
variantNullAbleTableName);
+ assertThat(rows).hasSize(3);
+ assertThat(rows.get(0).getField(1)).isNull();
+ assertThat(rows.get(1).getField(1)).isNull();
+ assertThat(rows.get(2).getField(1)).isNull();
+ }
+
+ @TestTemplate
+ public void testBufferSizeOne() throws IOException {
+ sql("ALTER TABLE %s SET('variant-inference-buffer-size'='1')", TABLE_NAME);
+
+ sql(
+ """
+ INSERT INTO %s VALUES
+ (1, parse_json('{"name": "Alice", "age": 30}')),
+ (2, parse_json('{"name": "Bob", "age": 25}')),
+ (3, parse_json('{"name": "Charlie", "age": 35}'))
+ """,
+ TABLE_NAME);
+
+ // Schema inferred from first row only, should still shred name and age
+ GroupType age =
+ field(
+ "age",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.intType(8, true)));
+ GroupType name =
+ field(
+ "name",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.stringType()));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(age, name));
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+
+ List<Row> rows =
+ sql(
+ """
+ SELECT id,
+ json_value(address, '$.name')
+ FROM %s
+ ORDER BY id
+ """,
+ TABLE_NAME);
+ assertThat(rows).hasSize(3);
+ assertThat(rows.get(0).getField(1)).isEqualTo("Alice");
+ assertThat(rows.get(2).getField(1)).isEqualTo("Charlie");
+ }
+
+ @TestTemplate
+ public void testDecimalFallbackAfterBuffer() throws IOException {
+ getTableEnv()
+ .getConfig()
+ .getConfiguration()
+ .setString("table.exec.resource.default-parallelism", "1");
+
+ sql("ALTER TABLE %s SET('variant-inference-buffer-size'='3')", TABLE_NAME);
+
+ // Buffer: scale=2, 3 integer digits -> DECIMAL(5,2)
+ // Row 4: precision overflow -> fallback to value field
+ // Row 5: scale overflow -> fallback to value field
+ // Row 6: fits typed column, scale widened from 1 to 2 via setScale
+ sql(
+ """
+ CREATE TEMPORARY VIEW tmp_source AS
+ SELECT * FROM (VALUES
+ (1, parse_json('{"val": 123.45}')),
+ (2, parse_json('{"val": 678.90}')),
+ (3, parse_json('{"val": 999.99}')),
+ (4, parse_json('{"val": 123456.78}')),
+ (5, parse_json('{"val": 1.2345}')),
+ (6, parse_json('{"val": 12.3}'))
+ ) AS t(id, address)
+ """);
+
+ sql("INSERT INTO %s SELECT id, address FROM tmp_source ORDER BY id",
TABLE_NAME);
+
+ GroupType val =
+ field(
+ "val",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.INT32,
LogicalTypeAnnotation.decimalType(2, 5)));
+ GroupType address = variant("address", 2, Type.Repetition.REQUIRED,
objectFields(val));
+ MessageType expectedSchema = parquetSchema(address);
+
+ verifyParquetSchema(icebergTable, expectedSchema);
+
+ List<Row> rows =
+ sql(
+ """
+ SELECT id,
+ CAST(json_value(address, '$.val') AS DECIMAL(10, 4))
+ FROM %s
+ ORDER BY id
+ """,
+ TABLE_NAME);
+ assertThat(rows).hasSize(6);
+ assertThat(rows.get(0).getField(1)).isEqualTo(new BigDecimal("123.4500"));
+ assertThat(rows.get(3).getField(1)).isEqualTo(new
BigDecimal("123456.7800"));
+ assertThat(rows.get(4).getField(1)).isEqualTo(new BigDecimal("1.2345"));
+ assertThat(rows.get(5).getField(1)).isEqualTo(new BigDecimal("12.3000"));
+
+ sql("DROP TEMPORARY VIEW IF EXISTS tmp_source");
+ }
+
+ private void verifyParquetSchema(Table table, MessageType expectedSchema)
throws IOException {
+ table.refresh();
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ assertThat(tasks).isNotEmpty();
+
+ FileScanTask task = tasks.iterator().next();
+ String path = task.file().location();
+
+ HadoopInputFile inputFile =
+ HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(path), new
Configuration());
+
+ try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) {
+ MessageType actualSchema = reader.getFileMetaData().getSchema();
+ assertThat(actualSchema).isEqualTo(expectedSchema);
+ }
+ }
+ }
+
+ private static MessageType parquetSchema(Type variantTypes) {
+ return org.apache.parquet.schema.Types.buildMessage()
+ .required(PrimitiveType.PrimitiveTypeName.INT32)
+ .id(1)
+ .named("id")
+ .addFields(variantTypes)
+ .named("table");
+ }
+
+ private static GroupType variant(String name, int fieldId, Type.Repetition
repetition) {
+ return org.apache.parquet.schema.Types.buildGroup(repetition)
+ .id(fieldId)
+ .as(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION))
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .named("metadata")
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .named("value")
+ .named(name);
+ }
+
+ private static GroupType variant(
+ String name, int fieldId, Type.Repetition repetition, Type shreddedType)
{
+ checkShreddedType(shreddedType);
+ return org.apache.parquet.schema.Types.buildGroup(repetition)
+ .id(fieldId)
+ .as(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION))
+ .required(PrimitiveType.PrimitiveTypeName.BINARY)
+ .named("metadata")
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .named("value")
+ .addField(shreddedType)
+ .named(name);
+ }
+
+ private static Type shreddedPrimitive(PrimitiveType.PrimitiveTypeName
primitive) {
+ return optional(primitive).named("typed_value");
+ }
+
+ private static Type shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName primitive, LogicalTypeAnnotation
annotation) {
+ return optional(primitive).as(annotation).named("typed_value");
+ }
+
+ private static Type shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName primitive, int length,
LogicalTypeAnnotation annotation) {
+ return
optional(primitive).length(length).as(annotation).named("typed_value");
+ }
+
+ private static GroupType objectFields(GroupType... fields) {
+ for (GroupType fieldType : fields) {
+ checkField(fieldType);
+ }
+
+ return org.apache.parquet.schema.Types.buildGroup(Type.Repetition.OPTIONAL)
+ .addFields(fields)
+ .named("typed_value");
+ }
+
+ private static GroupType field(String name, Type shreddedType) {
+ checkShreddedType(shreddedType);
+ return org.apache.parquet.schema.Types.buildGroup(Type.Repetition.REQUIRED)
+ .optional(PrimitiveType.PrimitiveTypeName.BINARY)
+ .named("value")
+ .addField(shreddedType)
+ .named(name);
+ }
+
+ private static GroupType element(Type shreddedType) {
+ return field("element", shreddedType);
+ }
+
+ private static GroupType list(GroupType elementType) {
+ return
org.apache.parquet.schema.Types.optionalList().element(elementType).named("typed_value");
+ }
+
+ private static void checkShreddedType(Type shreddedType) {
+ Preconditions.checkArgument(
+ shreddedType.getName().equals("typed_value"),
+ "Invalid shredded type name: %s should be typed_value",
+ shreddedType.getName());
+ Preconditions.checkArgument(
+ shreddedType.isRepetition(Type.Repetition.OPTIONAL),
+ "Invalid shredded type repetition: %s should be OPTIONAL",
+ shreddedType.getRepetition());
+ }
+
+ private static void checkField(GroupType fieldType) {
+ Preconditions.checkArgument(
+ fieldType.isRepetition(Type.Repetition.REQUIRED),
+ "Invalid field type repetition: %s should be REQUIRED",
+ fieldType.getRepetition());
+ }
+
+ private List<GenericRowData> genericRowData() throws IOException {
+ List<GenericRowData> genericRowData = Lists.newArrayList();
+ try (CloseableIterable<CombinedScanTask> combinedScanTasks =
+ icebergTable.newScan().planTasks()) {
+ for (CombinedScanTask combinedScanTask : combinedScanTasks) {
+ try (DataIterator<RowData> dataIterator =
+ ReaderUtil.createDataIterator(
+ combinedScanTask, icebergTable.schema(),
icebergTable.schema())) {
+ while (dataIterator.hasNext()) {
+ GenericRowData rowData = (GenericRowData) dataIterator.next();
+ genericRowData.add(rowData);
+ }
+ }
+ }
+ }
+
+ return genericRowData;
+ }
+}
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java
index 90dd6e117b..9a4a62cae6 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java
@@ -51,7 +51,7 @@ public class ParquetFormatModel<D, S, R>
extends BaseFormatModel<D, S, ParquetValueWriter<?>, R, MessageType> {
private final boolean isBatchReader;
private final VariantShreddingAnalyzer<D, S> variantAnalyzer;
- private final UnaryOperator<D> copyFunc;
+ private final Function<S, UnaryOperator<D>> copyFuncFactory;
public static <D> ParquetFormatModel<PositionDelete<D>, Void, Object>
forPositionDeletes() {
return new ParquetFormatModel<>(
@@ -67,6 +67,11 @@ public class ParquetFormatModel<D, S, R>
type, schemaType, writerFunction, readerFunction, false, null, null);
}
+ /**
+ * @deprecated Will be removed in 1.13.0; use {@link #create(Class, Class,
WriterFunction,
+ * ReaderFunction, VariantShreddingAnalyzer, Function)} instead.
+ */
+ @Deprecated
public static <D, S> ParquetFormatModel<D, S, ParquetValueReader<?>> create(
Class<D> type,
Class<S> schemaType,
@@ -74,8 +79,24 @@ public class ParquetFormatModel<D, S, R>
ReaderFunction<ParquetValueReader<?>, S, MessageType> readerFunction,
VariantShreddingAnalyzer<D, S> variantAnalyzer,
UnaryOperator<D> copyFunc) {
+ return create(
+ type,
+ schemaType,
+ writerFunction,
+ readerFunction,
+ variantAnalyzer,
+ (Function<S, UnaryOperator<D>>) unused -> copyFunc);
+ }
+
+ public static <D, S> ParquetFormatModel<D, S, ParquetValueReader<?>> create(
+ Class<D> type,
+ Class<S> schemaType,
+ WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction,
+ ReaderFunction<ParquetValueReader<?>, S, MessageType> readerFunction,
+ VariantShreddingAnalyzer<D, S> variantAnalyzer,
+ Function<S, UnaryOperator<D>> copyFuncFactory) {
return new ParquetFormatModel<>(
- type, schemaType, writerFunction, readerFunction, false,
variantAnalyzer, copyFunc);
+ type, schemaType, writerFunction, readerFunction, false,
variantAnalyzer, copyFuncFactory);
}
public static <D, S> ParquetFormatModel<D, S, VectorizedReader<?>> create(
@@ -92,11 +113,11 @@ public class ParquetFormatModel<D, S, R>
ReaderFunction<R, S, MessageType> readerFunction,
boolean isBatchReader,
VariantShreddingAnalyzer<D, S> variantAnalyzer,
- UnaryOperator<D> copyFunc) {
+ Function<S, UnaryOperator<D>> copyFuncFactory) {
super(type, schemaType, writerFunction, readerFunction);
this.isBatchReader = isBatchReader;
this.variantAnalyzer = variantAnalyzer;
- this.copyFunc = copyFunc;
+ this.copyFuncFactory = copyFuncFactory;
}
@Override
@@ -106,7 +127,8 @@ public class ParquetFormatModel<D, S, R>
@Override
public ModelWriteBuilder<D, S> writeBuilder(EncryptedOutputFile outputFile) {
- return new WriteBuilderWrapper<>(outputFile, writerFunction(),
variantAnalyzer, copyFunc);
+ return new WriteBuilderWrapper<>(
+ outputFile, writerFunction(), variantAnalyzer, copyFuncFactory);
}
@Override
@@ -118,7 +140,7 @@ public class ParquetFormatModel<D, S, R>
private final Parquet.WriteBuilder internal;
private final WriterFunction<ParquetValueWriter<?>, S, MessageType>
writerFunction;
private final VariantShreddingAnalyzer<D, S> variantAnalyzer;
- private final UnaryOperator<D> copyFunc;
+ private final Function<S, UnaryOperator<D>> copyFuncFactory;
private Schema schema;
private S engineSchema;
private FileContent content;
@@ -129,11 +151,11 @@ public class ParquetFormatModel<D, S, R>
EncryptedOutputFile outputFile,
WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction,
VariantShreddingAnalyzer<D, S> variantAnalyzer,
- UnaryOperator<D> copyFunc) {
+ Function<S, UnaryOperator<D>> copyFuncFactory) {
this.internal = Parquet.write(outputFile);
this.writerFunction = writerFunction;
this.variantAnalyzer = variantAnalyzer;
- this.copyFunc = copyFunc;
+ this.copyFuncFactory = copyFuncFactory;
}
@Override
@@ -267,6 +289,10 @@ public class ParquetFormatModel<D, S, R>
* top-level fields.
*/
private FileAppender<D> buildShreddedAppender() {
+ Preconditions.checkState(copyFuncFactory != null, "copyFuncFactory must
not be null");
+ UnaryOperator<D> copyFunc = copyFuncFactory.apply(engineSchema);
+ Preconditions.checkState(copyFunc != null, "copyFunc must not return
null");
+
return new BufferedFileAppender<>(
bufferSize,
bufferedRows -> {
diff --git
a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java
index 36e254628a..547ada3e53 100644
---
a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java
+++
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java
@@ -26,6 +26,8 @@ import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
@@ -370,7 +372,7 @@ public class TestParquetDataWriter {
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
GenericParquetReaders.buildReader(icebergSchema, fileSchema),
testAnalyzer,
- record -> record);
+ (Function<Void, UnaryOperator<Record>>) unused -> record ->
record);
try (FileAppender<Record> appender =
model
@@ -401,7 +403,7 @@ public class TestParquetDataWriter {
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
GenericParquetReaders.buildReader(icebergSchema, fileSchema),
null,
- null);
+ (Function<Void, UnaryOperator<Record>>) null);
try (FileAppender<Record> appender =
model
@@ -471,7 +473,7 @@ public class TestParquetDataWriter {
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
GenericParquetReaders.buildReader(icebergSchema, fileSchema),
analyzer,
- record1 -> record1);
+ (Function<Void, UnaryOperator<Record>>) unused -> record1 ->
record1);
try (FileAppender<Record> appender =
model
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
index 5b7862116a..15c96ff4cd 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark.source;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
import org.apache.iceberg.avro.AvroFormatModel;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.orc.ORCFormatModel;
@@ -53,7 +55,7 @@ public class SparkFormatModels {
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
SparkParquetReaders.buildReader(icebergSchema, fileSchema,
idToConstant),
new SparkVariantShreddingAnalyzer(),
- InternalRow::copy));
+ (Function<StructType, UnaryOperator<InternalRow>>) unused ->
InternalRow::copy));
FormatModelRegistry.register(
ParquetFormatModel.create(
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
index 5b7862116a..15c96ff4cd 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark.source;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
import org.apache.iceberg.avro.AvroFormatModel;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.orc.ORCFormatModel;
@@ -53,7 +55,7 @@ public class SparkFormatModels {
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
SparkParquetReaders.buildReader(icebergSchema, fileSchema,
idToConstant),
new SparkVariantShreddingAnalyzer(),
- InternalRow::copy));
+ (Function<StructType, UnaryOperator<InternalRow>>) unused ->
InternalRow::copy));
FormatModelRegistry.register(
ParquetFormatModel.create(