This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c7877ecb18 [variant] Introduce InferVariantShreddingSchema (#7017)
c7877ecb18 is described below
commit c7877ecb18064b5facae4d396cd0d7394f5c68d7
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jan 13 11:42:38 2026 +0800
[variant] Introduce InferVariantShreddingSchema (#7017)
---
.../data/variant/InferVariantShreddingSchema.java | 499 ++++++++++++++++
.../paimon/data/variant/PaimonShreddingUtils.java | 4 +-
.../variant/InferVariantShreddingSchemaTest.java | 650 +++++++++++++++++++++
.../apache/paimon/format/parquet/VariantUtils.java | 4 +-
4 files changed, 1153 insertions(+), 4 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/InferVariantShreddingSchema.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/InferVariantShreddingSchema.java
new file mode 100644
index 0000000000..94e268aed5
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/InferVariantShreddingSchema.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VariantType;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Infer a schema when there are Variant values in the shredding schema. Only
VariantType values at
+ * the top level or nested in row fields are shredded. VariantType nested in
arrays or maps are not
+ * shredded.
+ */
+public class InferVariantShreddingSchema {
+
+ private final RowType schema;
+ private final List<List<Integer>> pathsToVariant;
+ private final int maxSchemaWidth;
+ private final int maxSchemaDepth;
+ private final double minFieldCardinalityRatio;
+
+ public InferVariantShreddingSchema(
+ RowType schema,
+ int maxSchemaWidth,
+ int maxSchemaDepth,
+ double minFieldCardinalityRatio) {
+ this.schema = schema;
+ this.pathsToVariant = getPathsToVariant(schema);
+ this.maxSchemaWidth = maxSchemaWidth;
+ this.maxSchemaDepth = maxSchemaDepth;
+ this.minFieldCardinalityRatio = minFieldCardinalityRatio;
+ }
+
+ /** Infer schema from a list of rows. */
+ public RowType inferSchema(List<InternalRow> rows) {
+ MaxFields maxFields = new MaxFields(maxSchemaWidth);
+ Map<List<Integer>, RowType> inferredSchemas = new HashMap<>();
+
+ for (List<Integer> path : pathsToVariant) {
+ int numNonNullValues = 0;
+ DataType simpleSchema = null;
+
+ for (InternalRow row : rows) {
+ Variant variant = getValueAtPath(schema, row, path);
+ if (variant != null) {
+ numNonNullValues++;
+ GenericVariant v = (GenericVariant) variant;
+ DataType schemaOfRow = schemaOf(v, maxSchemaDepth);
+ simpleSchema = mergeSchema(simpleSchema, schemaOfRow);
+ }
+ }
+
+ // Don't infer a schema for fields that appear in less than
minFieldCardinalityRatio
+ int minCardinality = (int) Math.ceil(numNonNullValues *
minFieldCardinalityRatio);
+
+ DataType finalizedSchema =
+ finalizeSimpleSchema(simpleSchema, minCardinality,
maxFields);
+ RowType shreddingSchema =
PaimonShreddingUtils.variantShreddingSchema(finalizedSchema);
+ inferredSchemas.put(path, shreddingSchema);
+ }
+
+ // Insert each inferred schema into the full schema
+ return updateSchema(schema, inferredSchemas, new ArrayList<>());
+ }
+
+ /**
+ * Create a list of paths to Variant values in the schema. Variant fields
nested in arrays or
+ * maps are not included. For example, if the schema is {@code row<v:
variant, row<a: int, b:
+ * int, c: variant>>} the function will return [[0], [1, 2]]
+ */
+ private List<List<Integer>> getPathsToVariant(RowType schema) {
+ List<List<Integer>> result = new ArrayList<>();
+ List<DataField> fields = schema.getFields();
+
+ for (int idx = 0; idx < fields.size(); idx++) {
+ DataField field = fields.get(idx);
+ DataType dataType = field.type();
+
+ if (dataType instanceof VariantType) {
+ List<Integer> path = new ArrayList<>();
+ path.add(idx);
+ result.add(path);
+ } else if (dataType instanceof RowType) {
+ // Prepend this index to each downstream path
+ List<List<Integer>> innerPaths = getPathsToVariant((RowType)
dataType);
+ for (List<Integer> path : innerPaths) {
+ List<Integer> fullPath = new ArrayList<>();
+ fullPath.add(idx);
+ fullPath.addAll(path);
+ result.add(fullPath);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Return the Variant at the given path in the schema, or null if the
Variant value or any of
+ * its containing rows is null.
+ */
+ private Variant getValueAtPath(RowType schema, InternalRow row,
List<Integer> path) {
+ return getValueAtPathHelper(schema, row, path, 0);
+ }
+
+ private Variant getValueAtPathHelper(
+ RowType schema, InternalRow row, List<Integer> path, int
pathIndex) {
+ int idx = path.get(pathIndex);
+
+ if (row.isNullAt(idx)) {
+ return null;
+ } else if (pathIndex == path.size() - 1) {
+ // We've reached the Variant value
+ return row.getVariant(idx);
+ } else {
+ // The field must be a row
+ RowType childRowType = (RowType)
schema.getFields().get(idx).type();
+ InternalRow childRow = row.getRow(idx,
childRowType.getFieldCount());
+ return getValueAtPathHelper(childRowType, childRow, path,
pathIndex + 1);
+ }
+ }
+
+ /**
+ * Return an appropriate schema for shredding a Variant value. It is
similar to the
+ * SchemaOfVariant expression, but the rules are somewhat different,
because we want the types
+ * to be consistent with what will be allowed during shredding. E.g.
SchemaOfVariant will
+ * consider the common type across Integer and Double to be double, but we
consider it to be
+ * VariantType, since shredding will not allow those types to be written
to the same
+ * typed_value. We also maintain metadata on row fields to track how
frequently they occur. Rare
+ * fields are dropped in the final schema.
+ */
+ private DataType schemaOf(GenericVariant v, int maxDepth) {
+ GenericVariantUtil.Type type = v.getType();
+
+ switch (type) {
+ case OBJECT:
+ if (maxDepth <= 0) {
+ return DataTypes.VARIANT();
+ }
+
+ int size = v.objectSize();
+ List<DataField> fields = new ArrayList<>(size);
+
+ for (int i = 0; i < size; i++) {
+ GenericVariant.ObjectField field = v.getFieldAtIndex(i);
+ DataType fieldType = schemaOf(field.value, maxDepth - 1);
+ // Store count in description temporarily (will be used in
mergeRowTypes)
+ DataField dataField = new DataField(i, field.key,
fieldType, "1");
+ fields.add(dataField);
+ }
+
+ // According to the variant spec, object fields must be sorted
alphabetically
+ for (int i = 1; i < size; i++) {
+ if (fields.get(i -
1).name().compareTo(fields.get(i).name()) >= 0) {
+ throw new RuntimeException(
+ "Variant object fields must be sorted
alphabetically");
+ }
+ }
+
+ return new RowType(fields);
+
+ case ARRAY:
+ if (maxDepth <= 0) {
+ return DataTypes.VARIANT();
+ }
+
+ DataType elementType = null;
+ for (int i = 0; i < v.arraySize(); i++) {
+ elementType =
+ mergeSchema(
+ elementType,
schemaOf(v.getElementAtIndex(i), maxDepth - 1));
+ }
+ return new ArrayType(elementType == null ? DataTypes.VARIANT()
: elementType);
+
+ case NULL:
+ return null;
+
+ case BOOLEAN:
+ return DataTypes.BOOLEAN();
+
+ case LONG:
+ // Compute the smallest decimal that can contain this value
+ BigDecimal d = BigDecimal.valueOf(v.getLong());
+ int precision = d.precision();
+ if (precision <= 18) {
+ return DataTypes.DECIMAL(precision, 0);
+ } else {
+ return DataTypes.BIGINT();
+ }
+
+ case STRING:
+ return DataTypes.STRING();
+
+ case DOUBLE:
+ return DataTypes.DOUBLE();
+
+ case DECIMAL:
+ BigDecimal dec = v.getDecimal();
+ int decPrecision = dec.precision();
+ int decScale = dec.scale();
+ // Ensure precision is at least scale + 1 to be valid
+ if (decPrecision < decScale) {
+ decPrecision = decScale;
+ }
+ // Ensure precision is at least 1
+ if (decPrecision == 0) {
+ decPrecision = 1;
+ }
+ return DataTypes.DECIMAL(decPrecision, decScale);
+
+ case DATE:
+ return DataTypes.DATE();
+
+ case TIMESTAMP:
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
+
+ case TIMESTAMP_NTZ:
+ return DataTypes.TIMESTAMP();
+
+ case FLOAT:
+ return DataTypes.FLOAT();
+
+ case BINARY:
+ return DataTypes.BYTES();
+
+ default:
+ return DataTypes.VARIANT();
+ }
+ }
+
+ private long getFieldCount(DataField field) {
+ // Read count from description field
+ String desc = field.description();
+ if (desc == null || desc.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "Field '%s' is missing count in description. This
should not happen during schema inference.",
+ field.name()));
+ }
+ return Long.parseLong(desc);
+ }
+
+ /** Merge two decimals with possibly different scales. */
+ private DataType mergeDecimal(DecimalType d1, DecimalType d2) {
+ int scale = Math.max(d1.getScale(), d2.getScale());
+ int range = Math.max(d1.getPrecision() - d1.getScale(),
d2.getPrecision() - d2.getScale());
+
+ if (range + scale > DecimalType.MAX_PRECISION) {
+ // DecimalType can't support precision > 38
+ return DataTypes.VARIANT();
+ } else {
+ return DataTypes.DECIMAL(range + scale, scale);
+ }
+ }
+
+ private DataType mergeDecimalWithLong(DecimalType d) {
+ if (d.getScale() == 0 && d.getPrecision() <= 18) {
+ // It's an integer-like Decimal
+ return DataTypes.BIGINT();
+ } else {
+ // Long can always fit in a Decimal(19, 0)
+ return mergeDecimal(d, DataTypes.DECIMAL(19, 0));
+ }
+ }
+
+ private DataType mergeSchema(DataType dt1, DataType dt2) {
+ // Allow null to appear in any typed schema
+ if (dt1 == null) {
+ return dt2;
+ } else if (dt2 == null) {
+ return dt1;
+ } else if (dt1 instanceof DecimalType && dt2 instanceof DecimalType) {
+ return mergeDecimal((DecimalType) dt1, (DecimalType) dt2);
+ } else if (dt1 instanceof DecimalType
+ && dt2.getTypeRoot() ==
org.apache.paimon.types.DataTypeRoot.BIGINT) {
+ return mergeDecimalWithLong((DecimalType) dt1);
+ } else if (dt1.getTypeRoot() ==
org.apache.paimon.types.DataTypeRoot.BIGINT
+ && dt2 instanceof DecimalType) {
+ return mergeDecimalWithLong((DecimalType) dt2);
+ } else if (dt1 instanceof RowType && dt2 instanceof RowType) {
+ return mergeRowTypes((RowType) dt1, (RowType) dt2);
+ } else if (dt1 instanceof ArrayType && dt2 instanceof ArrayType) {
+ ArrayType a1 = (ArrayType) dt1;
+ ArrayType a2 = (ArrayType) dt2;
+ return new ArrayType(mergeSchema(a1.getElementType(),
a2.getElementType()));
+ } else if (dt1.equals(dt2)) {
+ return dt1;
+ } else {
+ return DataTypes.VARIANT();
+ }
+ }
+
+ private DataType mergeRowTypes(RowType s1, RowType s2) {
+ List<DataField> fields1 = s1.getFields();
+ List<DataField> fields2 = s2.getFields();
+ List<DataField> newFields = new ArrayList<>();
+
+ int f1Idx = 0;
+ int f2Idx = 0;
+ int maxRowFieldSize = 1000;
+ int nextFieldId = 0;
+
+ while (f1Idx < fields1.size()
+ && f2Idx < fields2.size()
+ && newFields.size() < maxRowFieldSize) {
+ DataField field1 = fields1.get(f1Idx);
+ DataField field2 = fields2.get(f2Idx);
+ String f1Name = field1.name();
+ String f2Name = field2.name();
+ int comp = f1Name.compareTo(f2Name);
+
+ if (comp == 0) {
+ DataType dataType = mergeSchema(field1.type(), field2.type());
+ long c1 = getFieldCount(field1);
+ long c2 = getFieldCount(field2);
+ // Store count in description
+ DataField newField =
+ new DataField(nextFieldId++, f1Name, dataType,
String.valueOf(c1 + c2));
+ newFields.add(newField);
+ f1Idx++;
+ f2Idx++;
+ } else if (comp < 0) {
+ long count = getFieldCount(field1);
+ DataField newField =
+ new DataField(
+ nextFieldId++, field1.name(), field1.type(),
String.valueOf(count));
+ newFields.add(newField);
+ f1Idx++;
+ } else {
+ long count = getFieldCount(field2);
+ DataField newField =
+ new DataField(
+ nextFieldId++, field2.name(), field2.type(),
String.valueOf(count));
+ newFields.add(newField);
+ f2Idx++;
+ }
+ }
+
+ while (f1Idx < fields1.size() && newFields.size() < maxRowFieldSize) {
+ DataField field1 = fields1.get(f1Idx);
+ long count = getFieldCount(field1);
+ DataField newField =
+ new DataField(
+ nextFieldId++, field1.name(), field1.type(),
String.valueOf(count));
+ newFields.add(newField);
+ f1Idx++;
+ }
+
+ while (f2Idx < fields2.size() && newFields.size() < maxRowFieldSize) {
+ DataField field2 = fields2.get(f2Idx);
+ long count = getFieldCount(field2);
+ DataField newField =
+ new DataField(
+ nextFieldId++, field2.name(), field2.type(),
String.valueOf(count));
+ newFields.add(newField);
+ f2Idx++;
+ }
+
+ return new RowType(newFields);
+ }
+
+ /** Return a new schema, with each VariantType replaced with its inferred
shredding schema. */
+ private RowType updateSchema(
+ RowType schema, Map<List<Integer>, RowType> inferredSchemas,
List<Integer> path) {
+
+ List<DataField> fields = schema.getFields();
+ List<DataField> newFields = new ArrayList<>(fields.size());
+
+ for (int idx = 0; idx < fields.size(); idx++) {
+ DataField field = fields.get(idx);
+ DataType dataType = field.type();
+
+ if (dataType instanceof VariantType) {
+ List<Integer> fullPath = new ArrayList<>(path);
+ fullPath.add(idx);
+ if (!inferredSchemas.containsKey(fullPath)) {
+ throw new IllegalStateException(
+ String.format(
+ "No inferred schema found for Variant
field '%s' at path %s",
+ field.name(), fullPath));
+ }
+ newFields.add(field.newType(inferredSchemas.get(fullPath)));
+ } else if (dataType instanceof RowType) {
+ List<Integer> fullPath = new ArrayList<>(path);
+ fullPath.add(idx);
+ RowType newType = updateSchema((RowType) dataType,
inferredSchemas, fullPath);
+ newFields.add(field.newType(newType));
+ } else {
+ newFields.add(field);
+ }
+ }
+
+ return new RowType(newFields);
+ }
+
+ /** Container for a mutable integer to track the total number of shredded
fields. */
+ private static class MaxFields {
+ int remaining;
+
+ MaxFields(int remaining) {
+ this.remaining = remaining;
+ }
+ }
+
+ /**
+ * Given the schema of a Variant type, finalize the schema. Specifically:
1) Widen integer types
+ * to LongType 2) Replace empty rows with VariantType 3) Limit the total
number of shredded
+ * fields in the schema
+ */
+ private DataType finalizeSimpleSchema(DataType dt, int minCardinality,
MaxFields maxFields) {
+
+ // Every field uses a value column
+ maxFields.remaining--;
+ if (maxFields.remaining <= 0) {
+ return DataTypes.VARIANT();
+ }
+
+ // Handle null type first
+ if (dt == null || dt instanceof VariantType) {
+ return DataTypes.VARIANT();
+ }
+
+ if (dt instanceof RowType) {
+ RowType rowType = (RowType) dt;
+ List<DataField> newFields = new ArrayList<>();
+ int fieldId = 0;
+
+ for (DataField field : rowType.getFields()) {
+ if (getFieldCount(field) >= minCardinality &&
maxFields.remaining > 0) {
+ DataType newType =
+ finalizeSimpleSchema(field.type(), minCardinality,
maxFields);
+ // Clear description after finalizing
+ newFields.add(new DataField(fieldId++, field.name(),
newType, null));
+ }
+ }
+
+ if (!newFields.isEmpty()) {
+ return new RowType(newFields);
+ } else {
+ return DataTypes.VARIANT();
+ }
+ } else if (dt instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) dt;
+ DataType newElementType =
+ finalizeSimpleSchema(arrayType.getElementType(),
minCardinality, maxFields);
+ return new ArrayType(newElementType);
+ } else if (dt.getTypeRoot() ==
org.apache.paimon.types.DataTypeRoot.TINYINT
+ || dt.getTypeRoot() ==
org.apache.paimon.types.DataTypeRoot.SMALLINT
+ || dt.getTypeRoot() ==
org.apache.paimon.types.DataTypeRoot.INTEGER
+ || dt.getTypeRoot() ==
org.apache.paimon.types.DataTypeRoot.BIGINT) {
+ maxFields.remaining--;
+ return DataTypes.BIGINT();
+ } else if (dt instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) dt;
+ if (decimalType.getPrecision() <= 18 && decimalType.getScale() ==
0) {
+ maxFields.remaining--;
+ return DataTypes.BIGINT();
+ } else {
+ maxFields.remaining--;
+ if (decimalType.getPrecision() <= 18) {
+ return DataTypes.DECIMAL(18, decimalType.getScale());
+ } else {
+ return DataTypes.DECIMAL(DecimalType.MAX_PRECISION,
decimalType.getScale());
+ }
+ }
+ } else {
+ // All other scalar types use typed_value
+ maxFields.remaining--;
+ return dt;
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
index 189ac8e83b..cca7b5f747 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
@@ -209,8 +209,8 @@ public class PaimonShreddingUtils {
}
}
- public static RowType variantShreddingSchema(RowType rowType) {
- return variantShreddingSchema(rowType, true, false);
+ public static RowType variantShreddingSchema(DataType dataType) {
+ return variantShreddingSchema(dataType, true, false);
}
/**
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/variant/InferVariantShreddingSchemaTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/variant/InferVariantShreddingSchemaTest.java
new file mode 100644
index 0000000000..68787f8a45
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/data/variant/InferVariantShreddingSchemaTest.java
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.paimon.data.variant.PaimonShreddingUtils.variantShreddingSchema;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link InferVariantShreddingSchema}. */
+public class InferVariantShreddingSchemaTest {
+
+ public static InferVariantShreddingSchema
defaultInferVariantShreddingSchema(RowType schema) {
+ return new InferVariantShreddingSchema(schema, 300, 50, 0.1);
+ }
+
+ @Test
+ void testInferSchemaWithSimpleObject() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ // Create test data: {"name": "Alice", "age": 30}
+ GenericVariant variant1 = GenericVariant.fromJson("{\"name\":
\"Alice\", \"age\": 30}");
+ GenericVariant variant2 = GenericVariant.fromJson("{\"name\": \"Bob\",
\"age\": 25}");
+ GenericVariant variant3 = GenericVariant.fromJson("{\"name\":
\"Charlie\", \"age\": 35}");
+
+ List<InternalRow> rows = new ArrayList<>();
+ rows.add(GenericRow.of(variant1));
+ rows.add(GenericRow.of(variant2));
+ rows.add(GenericRow.of(variant3));
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ // Verify schema structure
+ RowType expectShreddedType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"age", "name"});
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectShreddedType));
+ }
+
+ @Test
+ void testInferSchemaWithNestedStruct() {
+ // Schema: row<data: row<v: variant>>
+ RowType innerSchema = RowType.of(new DataType[] {DataTypes.VARIANT()},
new String[] {"v"});
+ RowType schema = RowType.of(new DataType[] {innerSchema}, new String[]
{"data"});
+
+ // Create test data
+ GenericVariant variant1 = GenericVariant.fromJson("{\"x\": 1, \"y\":
2}");
+ GenericRow innerRow1 = GenericRow.of(variant1);
+
+ GenericVariant variant2 = GenericVariant.fromJson("{\"x\": 3, \"y\":
4}");
+ GenericRow innerRow2 = GenericRow.of(variant2);
+
+ List<InternalRow> rows = new ArrayList<>();
+ rows.add(GenericRow.of(innerRow1));
+ rows.add(GenericRow.of(innerRow2));
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ // Nested variant in row<data: row<v: variant>>
+ // Expected: inferred schema for inner variant field with x and y
+ RowType expectedInnerType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.BIGINT()},
+ new String[] {"x", "y"});
+ RowType expectedDataType =
+ RowType.of(
+ new DataType[]
{variantShreddingSchema(expectedInnerType)},
+ new String[] {"v"});
+
assertThat(inferredSchema.getField("data").type()).isEqualTo(expectedDataType);
+ }
+
+ @Test
+ void testInferSchemaWithArray() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ // Create test data with arrays
+ GenericVariant variant1 = GenericVariant.fromJson("{\"numbers\": [1,
2, 3]}");
+ GenericVariant variant2 = GenericVariant.fromJson("{\"numbers\": [4,
5, 6]}");
+
+ List<InternalRow> rows = Arrays.asList(GenericRow.of(variant1),
GenericRow.of(variant2));
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ // Array field [1,2,3] should be inferred as array<bigint>
+ RowType expectedType =
+ RowType.of(
+ new DataType[] {DataTypes.ARRAY(DataTypes.BIGINT())},
+ new String[] {"numbers"});
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType));
+ }
+
+ @Test
+ void testInferSchemaWithMixedTypes() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ // Create test data with mixed types
+ GenericVariant variant1 =
+ GenericVariant.fromJson(
+ "{\"str\": \"hello\", \"num\": 42, \"bool\": true,
\"dec\": 3.14}");
+ GenericVariant variant2 =
+ GenericVariant.fromJson(
+ "{\"str\": \"world\", \"num\": 100, \"bool\": false,
\"dec\": 2.71}");
+
+ List<InternalRow> rows = Arrays.asList(GenericRow.of(variant1),
GenericRow.of(variant2));
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ // Mixed types: string, bigint, boolean, decimal (3.14 and 2.71 become
DECIMAL(18, 2))
+ RowType expectedType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.BOOLEAN(),
+ DataTypes.DECIMAL(18, 2),
+ DataTypes.BIGINT(),
+ DataTypes.STRING()
+ },
+ new String[] {"bool", "dec", "num", "str"});
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType));
+ }
+
+ @Test
+ void testInferSchemaWithNullValues() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ // Create test data with null
+ GenericVariant variant1 = GenericVariant.fromJson("{\"a\": 1, \"b\":
null}");
+ GenericVariant variant2 = GenericVariant.fromJson("{\"a\": 2, \"b\":
3}");
+
+ List<InternalRow> rows = Arrays.asList(GenericRow.of(variant1),
GenericRow.of(variant2));
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ // Field "a" appears in all rows, "b" has null in one row
+ // With NULL case handling, b field can now be inferred as BIGINT
(null is handled properly)
+ RowType expectedType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.BIGINT()},
+ new String[] {"a", "b"});
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType));
+ }
+
+ @Test
+ void testInferSchemaWithEmptyRows() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ List<InternalRow> rows = new ArrayList<>();
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ // Empty rows should result in variant type without typed schema
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(DataTypes.VARIANT()));
+ }
+
+ @Test
+ void testInferSchemaWithDeepNesting() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ // Create deeply nested JSON
+ String deepJson = "{\"level1\": {\"level2\": {\"level3\": {\"value\":
42}}}}";
+ GenericVariant variant = GenericVariant.fromJson(deepJson);
+
+ List<InternalRow> rows = Arrays.asList(GenericRow.of(variant));
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ // Deep nested structure: level1 -> level2 -> level3 -> value
+ RowType level3Type =
+ RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[]
{"value"});
+ RowType level2Type = RowType.of(new DataType[] {level3Type}, new
String[] {"level3"});
+ RowType level1Type = RowType.of(new DataType[] {level2Type}, new
String[] {"level2"});
+ RowType expectedType = RowType.of(new DataType[] {level1Type}, new
String[] {"level1"});
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType));
+ }
+
+ @Test
+ void testInferSchemaWithMinFieldCardinalityRatio() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ List<InternalRow> rows = new ArrayList<>();
+
+ // Add 10 rows, where field "rare" appears in 2/10 rows (20%)
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"common\": 1,
\"rare\": 99}")));
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"common\": 2,
\"rare\": 88}")));
+ for (int i = 0; i < 8; i++) {
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"common\": " + i
+ "}")));
+ }
+
+ // Test with threshold (0.1 = 10%): rare field should be included
(2/10 = 20% >= 10%)
+ InferVariantShreddingSchema inferrer1 =
+ new InferVariantShreddingSchema(schema, 300, 50, 0.1);
+ RowType inferredSchema1 = inferrer1.inferSchema(rows);
+ RowType expectedType1 =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.BIGINT()},
+ new String[] {"common", "rare"});
+ assertThat(inferredSchema1.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType1));
+
+ // Test with higher threshold (0.25 = 25%): rare field should be
excluded (2/10 = 20% < 25%)
+ InferVariantShreddingSchema inferrer2 =
+ new InferVariantShreddingSchema(schema, 300, 50, 0.25);
+ RowType inferredSchema2 = inferrer2.inferSchema(rows);
+ RowType expectedType2 =
+ RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[]
{"common"});
+ assertThat(inferredSchema2.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType2));
+ }
+
+ @Test
+ void testInferSchemaWithMaxDepthLimit() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ // Create a 3-level nested JSON
+ String json = "{\"level1\": {\"level2\": {\"level3\": {\"value\":
42}}}}";
+ GenericVariant variant = GenericVariant.fromJson(json);
+ List<InternalRow> rows = Arrays.asList(GenericRow.of(variant));
+
+ // Test with max depth = 50: should infer full structure
+ InferVariantShreddingSchema inferrer1 =
+ new InferVariantShreddingSchema(schema, 300, 50, 0.1);
+ RowType inferredSchema1 = inferrer1.inferSchema(rows);
+
+ // Should have full nested structure: level1 -> level2 -> level3 ->
value
+ RowType level3Type =
+ RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[]
{"value"});
+ RowType level2Type = RowType.of(new DataType[] {level3Type}, new
String[] {"level3"});
+ RowType level1Type = RowType.of(new DataType[] {level2Type}, new
String[] {"level2"});
+ RowType expectedType1 = RowType.of(new DataType[] {level1Type}, new
String[] {"level1"});
+ assertThat(inferredSchema1.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType1));
+
+ // Test with max depth = 1: level1 is inferred, but level2 and deeper
become variant
+ InferVariantShreddingSchema inferrer2 =
+ new InferVariantShreddingSchema(schema, 300, 1, 0.1);
+ RowType inferredSchema2 = inferrer2.inferSchema(rows);
+
+ // Result: level1 field exists but is variant
+ RowType expectedType2 =
+ RowType.of(new DataType[] {DataTypes.VARIANT()}, new String[]
{"level1"});
+ assertThat(inferredSchema2.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType2));
+ }
+
+ @Test
+ void testInferSchemaWithMaxWidthLimit() {
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ // Create JSON with 5 fields
+ String json = "{\"field1\": 1, \"field2\": 2, \"field3\": 3,
\"field4\": 4, \"field5\": 5}";
+ GenericVariant variant = GenericVariant.fromJson(json);
+ List<InternalRow> rows = Arrays.asList(GenericRow.of(variant));
+
+ // Test with maxSchemaWidth = 11: all 5 fields should be inferred
+ // Each field consumes 2 counts (one for field itself, one for BIGINT
type)
+ // So 5 fields = 10 counts, plus 1 for the root = 11 total
+ InferVariantShreddingSchema inferrer1 =
+ new InferVariantShreddingSchema(schema, 11, 50, 0.1);
+ RowType inferredSchema1 = inferrer1.inferSchema(rows);
+
+ // Should have all 5 fields
+ RowType expectedType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT()
+ },
+ new String[] {"field1", "field2", "field3", "field4",
"field5"});
+ assertThat(inferredSchema1.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType1));
+
+ // Test with maxSchemaWidth = 9: only first 4 fields are inferred
+ // 4 fields = 8 counts, plus 1 for root = 9 total
+ InferVariantShreddingSchema inferrer2 = new
InferVariantShreddingSchema(schema, 9, 50, 0.1);
+ RowType inferredSchema2 = inferrer2.inferSchema(rows);
+
+ // Should have only 4 fields (field1-4), field5 is dropped
+ RowType expectedType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT()
+ },
+ new String[] {"field1", "field2", "field3", "field4"});
+ assertThat(inferredSchema2.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType2));
+ }
+
+ @Test
+ void testInferSchemaWithAllPrimitiveTypes() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ String json =
+ "{"
+ + "\"string\": \"test\", "
+ + "\"long\": 123456789, "
+ + "\"double\": 3.14159, "
+ + "\"boolean\": true, "
+ + "\"null\": null"
+ + "}";
+
+ GenericVariant variant = GenericVariant.fromJson(json);
+ List<InternalRow> rows = Arrays.asList(GenericRow.of(variant));
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ // All primitive types: boolean, decimal (3.14159 becomes DECIMAL),
bigint, variant (null),
+ // string
+ RowType expectedType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.BOOLEAN(),
+ DataTypes.DECIMAL(18, 5),
+ DataTypes.BIGINT(),
+ DataTypes.VARIANT(),
+ DataTypes.STRING()
+ },
+ new String[] {"boolean", "double", "long", "null",
"string"});
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType));
+ }
+
+ @Test
+ void testInferSchemaWithConflictingTypes() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ // Field "value" has different types in different rows
+ GenericVariant variant1 = GenericVariant.fromJson("{\"value\": 123}");
+ GenericVariant variant2 = GenericVariant.fromJson("{\"value\":
\"string\"}");
+
+ List<InternalRow> rows = Arrays.asList(GenericRow.of(variant1),
GenericRow.of(variant2));
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ // Conflicting types (int vs string) should fall back to variant type
+ RowType expectedType =
+ RowType.of(new DataType[] {DataTypes.VARIANT()}, new String[]
{"value"});
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType));
+ }
+
+ @Test
+ void testMultipleVariantFields() {
+ // Schema: row<v1: variant, v2: variant, id: int>
+ RowType schema =
+ RowType.of(
+ new DataType[] {DataTypes.VARIANT(),
DataTypes.VARIANT(), DataTypes.INT()},
+ new String[] {"v1", "v2", "id"});
+
+ GenericVariant variant1 = GenericVariant.fromJson("{\"name\":
\"Alice\"}");
+ GenericVariant variant2 = GenericVariant.fromJson("{\"age\": 30}");
+
+ List<InternalRow> rows =
+ Arrays.asList(
+ GenericRow.of(variant1, variant2, 1),
GenericRow.of(variant1, variant2, 2));
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ // Multiple variant fields: v1 with {name: string}, v2 with {age:
bigint}, id: int
+ RowType expectedV1Type =
+ RowType.of(new DataType[] {DataTypes.STRING()}, new String[]
{"name"});
+ RowType expectedV2Type =
+ RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[]
{"age"});
+ assertThat(inferredSchema.getFieldCount()).isEqualTo(3);
+ assertThat(inferredSchema.getField("v1").type())
+ .isEqualTo(variantShreddingSchema(expectedV1Type));
+ assertThat(inferredSchema.getField("v2").type())
+ .isEqualTo(variantShreddingSchema(expectedV2Type));
+
assertThat(inferredSchema.getField("id").type()).isEqualTo(DataTypes.INT());
+ }
+
+ @Test
+ void testLargeDatasetWithManyFields() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ List<InternalRow> rows = new ArrayList<>();
+ // Generate 500 rows, each with 50 fields
+ for (int i = 0; i < 500; i++) {
+ StringBuilder jsonBuilder = new StringBuilder("{");
+ for (int j = 0; j < 50; j++) {
+ if (j > 0) {
+ jsonBuilder.append(", ");
+ }
+ jsonBuilder.append(String.format("\"field%d\": %d", j, (i * 50
+ j) % 1000));
+ }
+ jsonBuilder.append("}");
+
rows.add(GenericRow.of(GenericVariant.fromJson(jsonBuilder.toString())));
+ }
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ // All 50 fields should be inferred as BIGINT
+ DataType variantFieldType = inferredSchema.getField("v").type();
+ assertThat(variantFieldType).isInstanceOf(RowType.class);
+ RowType shreddedSchema = (RowType) variantFieldType;
+
+ // Should have metadata, value, and typed_value fields
+ DataField typedValueField = shreddedSchema.getField("typed_value");
+ assertThat(typedValueField).isNotNull();
+ assertThat(typedValueField.type()).isInstanceOf(RowType.class);
+
+ RowType typedValue = (RowType) typedValueField.type();
+ // Should infer all 50 fields (field0 to field49) as BIGINT
+ assertThat(typedValue.getFieldCount()).isEqualTo(50);
+
+ // Verify the schema structure (not exact equality due to field
ordering)
+
assertThat(inferredSchema.getField("v").type()).isInstanceOf(RowType.class);
+ RowType actualShreddedSchema = (RowType)
inferredSchema.getField("v").type();
+ assertThat(actualShreddedSchema.getFieldCount())
+ .isEqualTo(3); // metadata, value, typed_value
+
+ // Verify all 50 fields are present with correct types
+ RowType actualTypedValue = (RowType)
actualShreddedSchema.getField("typed_value").type();
+ for (int i = 0; i < 50; i++) {
+ String fieldName = "field" + i;
+ DataField field = actualTypedValue.getField(fieldName);
+ assertThat(field).as("Field %s should exist",
fieldName).isNotNull();
+ // Each field is shredded: ROW<value BYTES, typed_value BIGINT>
NOT NULL
+ assertThat(field.type()).isInstanceOf(RowType.class);
+ RowType shreddedFieldType = (RowType) field.type();
+ assertThat(shreddedFieldType.getField("typed_value").type())
+ .isEqualTo(DataTypes.BIGINT());
+ }
+ }
+
+ @Test
+ void testNullRecords() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ List<InternalRow> rows = new ArrayList<>();
+ // Add rows where the entire variant value is null
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"a\": 1, \"b\":
2}")));
+ rows.add(GenericRow.of((GenericVariant) null)); // Entire record is
null
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"a\": 3, \"b\":
4}")));
+ rows.add(GenericRow.of((GenericVariant) null)); // Entire record is
null
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"a\": 5, \"b\":
6}")));
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ // Verify the inferred schema structure
+ assertThat(inferredSchema).isNotNull();
+ assertThat(inferredSchema.getFieldCount()).isEqualTo(1);
+
+ // The variant field should contain shredded schema with fields a and b
+ DataType variantFieldType = inferredSchema.getFields().get(0).type();
+ assertThat(variantFieldType).isInstanceOf(RowType.class);
+
+ RowType shreddedSchema = (RowType) variantFieldType;
+ // Should have fields for metadata, typed_value, and value
+ assertThat(shreddedSchema.getFieldCount()).isGreaterThanOrEqualTo(3);
+ }
+
+ @Test
+ void testAllNullRecords() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ List<InternalRow> rows = new ArrayList<>();
+ // All records are null
+ for (int i = 0; i < 100; i++) {
+ rows.add(GenericRow.of((GenericVariant) null));
+ }
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ // When all records are null, should result in variant type without
typed schema
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(DataTypes.VARIANT()));
+ }
+
+ @Test
+ void testMixedNullAndValidRecords() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ List<InternalRow> rows = new ArrayList<>();
+ // Mix of null records and valid records
+ for (int i = 0; i < 200; i++) {
+ if (i % 3 == 0) {
+ rows.add(GenericRow.of((GenericVariant) null));
+ } else {
+ rows.add(
+ GenericRow.of(
+ GenericVariant.fromJson(
+ String.format(
+ "{\"id\": %d, \"value\":
\"data%d\"}", i, i))));
+ }
+ }
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ RowType expectedTypedValue =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"id", "value"});
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedTypedValue));
+ }
+
+ @Test
+ void testMixedArrayTypes() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ List<InternalRow> rows = new ArrayList<>();
+ // Arrays with different element types
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [1, 2,
3]}")));
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [\"a\",
\"b\", \"c\"]}")));
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [true,
false, true]}")));
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [1,
\"mixed\", true]}")));
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ RowType expectedType =
+ RowType.of(
+ new DataType[] {DataTypes.ARRAY(DataTypes.VARIANT())},
+ new String[] {"arr"});
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType));
+ }
+
+ @Test
+ void testIntegerTypeMixing() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ List<InternalRow> rows = new ArrayList<>();
+ // Mix of small and large integers - should merge to BIGINT
+ for (int i = 0; i < 100; i++) {
+ long value = (i % 2 == 0) ? i : (i * 1000000000L);
+
rows.add(GenericRow.of(GenericVariant.fromJson(String.format("{\"num\": %d}",
value))));
+ }
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ RowType expectedTypedValue =
+ RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[]
{"num"});
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedTypedValue));
+ }
+
+ @Test
+ void testDoubleAndIntegerMixing() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ List<InternalRow> rows = new ArrayList<>();
+ // Mix integers and doubles - should become variant type
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"value\": 100}")));
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"value\": 100}")));
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"value\": 100}")));
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"value\": 3.14}")));
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ RowType expectedTypedValue =
+ RowType.of(new DataType[] {DataTypes.DECIMAL(18, 2)}, new
String[] {"value"});
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedTypedValue));
+ }
+
+ @Test
+ void testNullInNestedArrays() {
+ // Schema: row<v: variant>
+ RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new
String[] {"v"});
+
+ List<InternalRow> rows = new ArrayList<>();
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [1, 2, 3,
null, 5]}")));
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [null, null,
null]}")));
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [10, null,
20, null, 30]}")));
+ rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": null}")));
+
+ InferVariantShreddingSchema inferrer =
defaultInferVariantShreddingSchema(schema);
+ RowType inferredSchema = inferrer.inferSchema(rows);
+
+ RowType expectedType =
+ RowType.of(
+ new DataType[] {DataTypes.ARRAY(DataTypes.VARIANT())},
+ new String[] {"arr"});
+ assertThat(inferredSchema.getField("v").type())
+ .isEqualTo(variantShreddingSchema(expectedType));
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
index 2b2382304e..86267052e1 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
@@ -88,7 +88,7 @@ public class VariantUtils {
RowType shreddingFields = shreddingFields(conf);
if (shreddingFields != null &&
shreddingFields.containsField(fieldName)) {
return PaimonShreddingUtils.variantShreddingSchema(
- (RowType) shreddingFields.getField(fieldName).type());
+ shreddingFields.getField(fieldName).type());
} else {
return null;
}
@@ -106,7 +106,7 @@ public class VariantUtils {
&& shreddingFields.containsField(field.name())) {
RowType shreddingSchema =
PaimonShreddingUtils.variantShreddingSchema(
- (RowType)
shreddingFields.getField(field.name()).type());
+ shreddingFields.getField(field.name()).type());
newFields.add(field.newType(shreddingSchema));
} else {
newFields.add(field);