This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c7877ecb18 [variant] Introduce InferVariantShreddingSchema (#7017)
c7877ecb18 is described below

commit c7877ecb18064b5facae4d396cd0d7394f5c68d7
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jan 13 11:42:38 2026 +0800

    [variant] Introduce InferVariantShreddingSchema (#7017)
---
 .../data/variant/InferVariantShreddingSchema.java  | 499 ++++++++++++++++
 .../paimon/data/variant/PaimonShreddingUtils.java  |   4 +-
 .../variant/InferVariantShreddingSchemaTest.java   | 650 +++++++++++++++++++++
 .../apache/paimon/format/parquet/VariantUtils.java |   4 +-
 4 files changed, 1153 insertions(+), 4 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/InferVariantShreddingSchema.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/InferVariantShreddingSchema.java
new file mode 100644
index 0000000000..94e268aed5
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/InferVariantShreddingSchema.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VariantType;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Infer a schema when there are Variant values in the shredding schema. Only 
VariantType values at
+ * the top level or nested in row fields are shredded. VariantType nested in 
arrays or maps are not
+ * shredded.
+ */
+public class InferVariantShreddingSchema {
+
+    private final RowType schema;
+    private final List<List<Integer>> pathsToVariant;
+    private final int maxSchemaWidth;
+    private final int maxSchemaDepth;
+    private final double minFieldCardinalityRatio;
+
+    public InferVariantShreddingSchema(
+            RowType schema,
+            int maxSchemaWidth,
+            int maxSchemaDepth,
+            double minFieldCardinalityRatio) {
+        this.schema = schema;
+        this.pathsToVariant = getPathsToVariant(schema);
+        this.maxSchemaWidth = maxSchemaWidth;
+        this.maxSchemaDepth = maxSchemaDepth;
+        this.minFieldCardinalityRatio = minFieldCardinalityRatio;
+    }
+
+    /** Infer schema from a list of rows. */
+    public RowType inferSchema(List<InternalRow> rows) {
+        MaxFields maxFields = new MaxFields(maxSchemaWidth);
+        Map<List<Integer>, RowType> inferredSchemas = new HashMap<>();
+
+        for (List<Integer> path : pathsToVariant) {
+            int numNonNullValues = 0;
+            DataType simpleSchema = null;
+
+            for (InternalRow row : rows) {
+                Variant variant = getValueAtPath(schema, row, path);
+                if (variant != null) {
+                    numNonNullValues++;
+                    GenericVariant v = (GenericVariant) variant;
+                    DataType schemaOfRow = schemaOf(v, maxSchemaDepth);
+                    simpleSchema = mergeSchema(simpleSchema, schemaOfRow);
+                }
+            }
+
+            // Don't infer a schema for fields that appear in less than 
minFieldCardinalityRatio
+            int minCardinality = (int) Math.ceil(numNonNullValues * 
minFieldCardinalityRatio);
+
+            DataType finalizedSchema =
+                    finalizeSimpleSchema(simpleSchema, minCardinality, 
maxFields);
+            RowType shreddingSchema = 
PaimonShreddingUtils.variantShreddingSchema(finalizedSchema);
+            inferredSchemas.put(path, shreddingSchema);
+        }
+
+        // Insert each inferred schema into the full schema
+        return updateSchema(schema, inferredSchemas, new ArrayList<>());
+    }
+
+    /**
+     * Create a list of paths to Variant values in the schema. Variant fields 
nested in arrays or
+     * maps are not included. For example, if the schema is {@code row<v: 
variant, row<a: int, b:
+     * int, c: variant>>} the function will return [[0], [1, 2]]
+     */
+    private List<List<Integer>> getPathsToVariant(RowType schema) {
+        List<List<Integer>> result = new ArrayList<>();
+        List<DataField> fields = schema.getFields();
+
+        for (int idx = 0; idx < fields.size(); idx++) {
+            DataField field = fields.get(idx);
+            DataType dataType = field.type();
+
+            if (dataType instanceof VariantType) {
+                List<Integer> path = new ArrayList<>();
+                path.add(idx);
+                result.add(path);
+            } else if (dataType instanceof RowType) {
+                // Prepend this index to each downstream path
+                List<List<Integer>> innerPaths = getPathsToVariant((RowType) 
dataType);
+                for (List<Integer> path : innerPaths) {
+                    List<Integer> fullPath = new ArrayList<>();
+                    fullPath.add(idx);
+                    fullPath.addAll(path);
+                    result.add(fullPath);
+                }
+            }
+        }
+
+        return result;
+    }
+
+    /**
+     * Return the Variant at the given path in the schema, or null if the 
Variant value or any of
+     * its containing rows is null.
+     */
+    private Variant getValueAtPath(RowType schema, InternalRow row, 
List<Integer> path) {
+        return getValueAtPathHelper(schema, row, path, 0);
+    }
+
+    private Variant getValueAtPathHelper(
+            RowType schema, InternalRow row, List<Integer> path, int 
pathIndex) {
+        int idx = path.get(pathIndex);
+
+        if (row.isNullAt(idx)) {
+            return null;
+        } else if (pathIndex == path.size() - 1) {
+            // We've reached the Variant value
+            return row.getVariant(idx);
+        } else {
+            // The field must be a row
+            RowType childRowType = (RowType) 
schema.getFields().get(idx).type();
+            InternalRow childRow = row.getRow(idx, 
childRowType.getFieldCount());
+            return getValueAtPathHelper(childRowType, childRow, path, 
pathIndex + 1);
+        }
+    }
+
+    /**
+     * Return an appropriate schema for shredding a Variant value. It is 
similar to the
+     * SchemaOfVariant expression, but the rules are somewhat different, 
because we want the types
+     * to be consistent with what will be allowed during shredding. E.g. 
SchemaOfVariant will
+     * consider the common type across Integer and Double to be double, but we 
consider it to be
+     * VariantType, since shredding will not allow those types to be written 
to the same
+     * typed_value. We also maintain metadata on row fields to track how 
frequently they occur. Rare
+     * fields are dropped in the final schema.
+     */
+    private DataType schemaOf(GenericVariant v, int maxDepth) {
+        GenericVariantUtil.Type type = v.getType();
+
+        switch (type) {
+            case OBJECT:
+                if (maxDepth <= 0) {
+                    return DataTypes.VARIANT();
+                }
+
+                int size = v.objectSize();
+                List<DataField> fields = new ArrayList<>(size);
+
+                for (int i = 0; i < size; i++) {
+                    GenericVariant.ObjectField field = v.getFieldAtIndex(i);
+                    DataType fieldType = schemaOf(field.value, maxDepth - 1);
+                    // Store count in description temporarily (will be used in 
mergeRowTypes)
+                    DataField dataField = new DataField(i, field.key, 
fieldType, "1");
+                    fields.add(dataField);
+                }
+
+                // According to the variant spec, object fields must be sorted 
alphabetically
+                for (int i = 1; i < size; i++) {
+                    if (fields.get(i - 
1).name().compareTo(fields.get(i).name()) >= 0) {
+                        throw new RuntimeException(
+                                "Variant object fields must be sorted 
alphabetically");
+                    }
+                }
+
+                return new RowType(fields);
+
+            case ARRAY:
+                if (maxDepth <= 0) {
+                    return DataTypes.VARIANT();
+                }
+
+                DataType elementType = null;
+                for (int i = 0; i < v.arraySize(); i++) {
+                    elementType =
+                            mergeSchema(
+                                    elementType, 
schemaOf(v.getElementAtIndex(i), maxDepth - 1));
+                }
+                return new ArrayType(elementType == null ? DataTypes.VARIANT() 
: elementType);
+
+            case NULL:
+                return null;
+
+            case BOOLEAN:
+                return DataTypes.BOOLEAN();
+
+            case LONG:
+                // Compute the smallest decimal that can contain this value
+                BigDecimal d = BigDecimal.valueOf(v.getLong());
+                int precision = d.precision();
+                if (precision <= 18) {
+                    return DataTypes.DECIMAL(precision, 0);
+                } else {
+                    return DataTypes.BIGINT();
+                }
+
+            case STRING:
+                return DataTypes.STRING();
+
+            case DOUBLE:
+                return DataTypes.DOUBLE();
+
+            case DECIMAL:
+                BigDecimal dec = v.getDecimal();
+                int decPrecision = dec.precision();
+                int decScale = dec.scale();
+                // Ensure precision is at least scale + 1 to be valid
+                if (decPrecision < decScale) {
+                    decPrecision = decScale;
+                }
+                // Ensure precision is at least 1
+                if (decPrecision == 0) {
+                    decPrecision = 1;
+                }
+                return DataTypes.DECIMAL(decPrecision, decScale);
+
+            case DATE:
+                return DataTypes.DATE();
+
+            case TIMESTAMP:
+                return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
+
+            case TIMESTAMP_NTZ:
+                return DataTypes.TIMESTAMP();
+
+            case FLOAT:
+                return DataTypes.FLOAT();
+
+            case BINARY:
+                return DataTypes.BYTES();
+
+            default:
+                return DataTypes.VARIANT();
+        }
+    }
+
+    private long getFieldCount(DataField field) {
+        // Read count from description field
+        String desc = field.description();
+        if (desc == null || desc.isEmpty()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Field '%s' is missing count in description. This 
should not happen during schema inference.",
+                            field.name()));
+        }
+        return Long.parseLong(desc);
+    }
+
+    /** Merge two decimals with possibly different scales. */
+    private DataType mergeDecimal(DecimalType d1, DecimalType d2) {
+        int scale = Math.max(d1.getScale(), d2.getScale());
+        int range = Math.max(d1.getPrecision() - d1.getScale(), 
d2.getPrecision() - d2.getScale());
+
+        if (range + scale > DecimalType.MAX_PRECISION) {
+            // DecimalType can't support precision > 38
+            return DataTypes.VARIANT();
+        } else {
+            return DataTypes.DECIMAL(range + scale, scale);
+        }
+    }
+
+    private DataType mergeDecimalWithLong(DecimalType d) {
+        if (d.getScale() == 0 && d.getPrecision() <= 18) {
+            // It's an integer-like Decimal
+            return DataTypes.BIGINT();
+        } else {
+            // Long can always fit in a Decimal(19, 0)
+            return mergeDecimal(d, DataTypes.DECIMAL(19, 0));
+        }
+    }
+
+    private DataType mergeSchema(DataType dt1, DataType dt2) {
+        // Allow null to appear in any typed schema
+        if (dt1 == null) {
+            return dt2;
+        } else if (dt2 == null) {
+            return dt1;
+        } else if (dt1 instanceof DecimalType && dt2 instanceof DecimalType) {
+            return mergeDecimal((DecimalType) dt1, (DecimalType) dt2);
+        } else if (dt1 instanceof DecimalType
+                && dt2.getTypeRoot() == 
org.apache.paimon.types.DataTypeRoot.BIGINT) {
+            return mergeDecimalWithLong((DecimalType) dt1);
+        } else if (dt1.getTypeRoot() == 
org.apache.paimon.types.DataTypeRoot.BIGINT
+                && dt2 instanceof DecimalType) {
+            return mergeDecimalWithLong((DecimalType) dt2);
+        } else if (dt1 instanceof RowType && dt2 instanceof RowType) {
+            return mergeRowTypes((RowType) dt1, (RowType) dt2);
+        } else if (dt1 instanceof ArrayType && dt2 instanceof ArrayType) {
+            ArrayType a1 = (ArrayType) dt1;
+            ArrayType a2 = (ArrayType) dt2;
+            return new ArrayType(mergeSchema(a1.getElementType(), 
a2.getElementType()));
+        } else if (dt1.equals(dt2)) {
+            return dt1;
+        } else {
+            return DataTypes.VARIANT();
+        }
+    }
+
+    private DataType mergeRowTypes(RowType s1, RowType s2) {
+        List<DataField> fields1 = s1.getFields();
+        List<DataField> fields2 = s2.getFields();
+        List<DataField> newFields = new ArrayList<>();
+
+        int f1Idx = 0;
+        int f2Idx = 0;
+        int maxRowFieldSize = 1000;
+        int nextFieldId = 0;
+
+        while (f1Idx < fields1.size()
+                && f2Idx < fields2.size()
+                && newFields.size() < maxRowFieldSize) {
+            DataField field1 = fields1.get(f1Idx);
+            DataField field2 = fields2.get(f2Idx);
+            String f1Name = field1.name();
+            String f2Name = field2.name();
+            int comp = f1Name.compareTo(f2Name);
+
+            if (comp == 0) {
+                DataType dataType = mergeSchema(field1.type(), field2.type());
+                long c1 = getFieldCount(field1);
+                long c2 = getFieldCount(field2);
+                // Store count in description
+                DataField newField =
+                        new DataField(nextFieldId++, f1Name, dataType, 
String.valueOf(c1 + c2));
+                newFields.add(newField);
+                f1Idx++;
+                f2Idx++;
+            } else if (comp < 0) {
+                long count = getFieldCount(field1);
+                DataField newField =
+                        new DataField(
+                                nextFieldId++, field1.name(), field1.type(), 
String.valueOf(count));
+                newFields.add(newField);
+                f1Idx++;
+            } else {
+                long count = getFieldCount(field2);
+                DataField newField =
+                        new DataField(
+                                nextFieldId++, field2.name(), field2.type(), 
String.valueOf(count));
+                newFields.add(newField);
+                f2Idx++;
+            }
+        }
+
+        while (f1Idx < fields1.size() && newFields.size() < maxRowFieldSize) {
+            DataField field1 = fields1.get(f1Idx);
+            long count = getFieldCount(field1);
+            DataField newField =
+                    new DataField(
+                            nextFieldId++, field1.name(), field1.type(), 
String.valueOf(count));
+            newFields.add(newField);
+            f1Idx++;
+        }
+
+        while (f2Idx < fields2.size() && newFields.size() < maxRowFieldSize) {
+            DataField field2 = fields2.get(f2Idx);
+            long count = getFieldCount(field2);
+            DataField newField =
+                    new DataField(
+                            nextFieldId++, field2.name(), field2.type(), 
String.valueOf(count));
+            newFields.add(newField);
+            f2Idx++;
+        }
+
+        return new RowType(newFields);
+    }
+
+    /** Return a new schema, with each VariantType replaced with its inferred 
shredding schema. */
+    private RowType updateSchema(
+            RowType schema, Map<List<Integer>, RowType> inferredSchemas, 
List<Integer> path) {
+
+        List<DataField> fields = schema.getFields();
+        List<DataField> newFields = new ArrayList<>(fields.size());
+
+        for (int idx = 0; idx < fields.size(); idx++) {
+            DataField field = fields.get(idx);
+            DataType dataType = field.type();
+
+            if (dataType instanceof VariantType) {
+                List<Integer> fullPath = new ArrayList<>(path);
+                fullPath.add(idx);
+                if (!inferredSchemas.containsKey(fullPath)) {
+                    throw new IllegalStateException(
+                            String.format(
+                                    "No inferred schema found for Variant 
field '%s' at path %s",
+                                    field.name(), fullPath));
+                }
+                newFields.add(field.newType(inferredSchemas.get(fullPath)));
+            } else if (dataType instanceof RowType) {
+                List<Integer> fullPath = new ArrayList<>(path);
+                fullPath.add(idx);
+                RowType newType = updateSchema((RowType) dataType, 
inferredSchemas, fullPath);
+                newFields.add(field.newType(newType));
+            } else {
+                newFields.add(field);
+            }
+        }
+
+        return new RowType(newFields);
+    }
+
+    /** Container for a mutable integer to track the total number of shredded 
fields. */
+    private static class MaxFields {
+        int remaining;
+
+        MaxFields(int remaining) {
+            this.remaining = remaining;
+        }
+    }
+
+    /**
+     * Given the schema of a Variant type, finalize the schema. Specifically: 
1) Widen integer types
+     * to LongType 2) Replace empty rows with VariantType 3) Limit the total 
number of shredded
+     * fields in the schema
+     */
+    private DataType finalizeSimpleSchema(DataType dt, int minCardinality, 
MaxFields maxFields) {
+
+        // Every field uses a value column
+        maxFields.remaining--;
+        if (maxFields.remaining <= 0) {
+            return DataTypes.VARIANT();
+        }
+
+        // Handle null type first
+        if (dt == null || dt instanceof VariantType) {
+            return DataTypes.VARIANT();
+        }
+
+        if (dt instanceof RowType) {
+            RowType rowType = (RowType) dt;
+            List<DataField> newFields = new ArrayList<>();
+            int fieldId = 0;
+
+            for (DataField field : rowType.getFields()) {
+                if (getFieldCount(field) >= minCardinality && 
maxFields.remaining > 0) {
+                    DataType newType =
+                            finalizeSimpleSchema(field.type(), minCardinality, 
maxFields);
+                    // Clear description after finalizing
+                    newFields.add(new DataField(fieldId++, field.name(), 
newType, null));
+                }
+            }
+
+            if (!newFields.isEmpty()) {
+                return new RowType(newFields);
+            } else {
+                return DataTypes.VARIANT();
+            }
+        } else if (dt instanceof ArrayType) {
+            ArrayType arrayType = (ArrayType) dt;
+            DataType newElementType =
+                    finalizeSimpleSchema(arrayType.getElementType(), 
minCardinality, maxFields);
+            return new ArrayType(newElementType);
+        } else if (dt.getTypeRoot() == 
org.apache.paimon.types.DataTypeRoot.TINYINT
+                || dt.getTypeRoot() == 
org.apache.paimon.types.DataTypeRoot.SMALLINT
+                || dt.getTypeRoot() == 
org.apache.paimon.types.DataTypeRoot.INTEGER
+                || dt.getTypeRoot() == 
org.apache.paimon.types.DataTypeRoot.BIGINT) {
+            maxFields.remaining--;
+            return DataTypes.BIGINT();
+        } else if (dt instanceof DecimalType) {
+            DecimalType decimalType = (DecimalType) dt;
+            if (decimalType.getPrecision() <= 18 && decimalType.getScale() == 
0) {
+                maxFields.remaining--;
+                return DataTypes.BIGINT();
+            } else {
+                maxFields.remaining--;
+                if (decimalType.getPrecision() <= 18) {
+                    return DataTypes.DECIMAL(18, decimalType.getScale());
+                } else {
+                    return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 
decimalType.getScale());
+                }
+            }
+        } else {
+            // All other scalar types use typed_value
+            maxFields.remaining--;
+            return dt;
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
index 189ac8e83b..cca7b5f747 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
@@ -209,8 +209,8 @@ public class PaimonShreddingUtils {
         }
     }
 
-    public static RowType variantShreddingSchema(RowType rowType) {
-        return variantShreddingSchema(rowType, true, false);
+    public static RowType variantShreddingSchema(DataType dataType) {
+        return variantShreddingSchema(dataType, true, false);
     }
 
     /**
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/variant/InferVariantShreddingSchemaTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/variant/InferVariantShreddingSchemaTest.java
new file mode 100644
index 0000000000..68787f8a45
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/variant/InferVariantShreddingSchemaTest.java
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.paimon.data.variant.PaimonShreddingUtils.variantShreddingSchema;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link InferVariantShreddingSchema}. */
+public class InferVariantShreddingSchemaTest {
+
+    public static InferVariantShreddingSchema 
defaultInferVariantShreddingSchema(RowType schema) {
+        return new InferVariantShreddingSchema(schema, 300, 50, 0.1);
+    }
+
+    @Test
+    void testInferSchemaWithSimpleObject() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        // Create test data: {"name": "Alice", "age": 30}
+        GenericVariant variant1 = GenericVariant.fromJson("{\"name\": 
\"Alice\", \"age\": 30}");
+        GenericVariant variant2 = GenericVariant.fromJson("{\"name\": \"Bob\", 
\"age\": 25}");
+        GenericVariant variant3 = GenericVariant.fromJson("{\"name\": 
\"Charlie\", \"age\": 35}");
+
+        List<InternalRow> rows = new ArrayList<>();
+        rows.add(GenericRow.of(variant1));
+        rows.add(GenericRow.of(variant2));
+        rows.add(GenericRow.of(variant3));
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        // Verify schema structure
+        RowType expectShreddedType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"age", "name"});
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectShreddedType));
+    }
+
+    @Test
+    void testInferSchemaWithNestedStruct() {
+        // Schema: row<data: row<v: variant>>
+        RowType innerSchema = RowType.of(new DataType[] {DataTypes.VARIANT()}, 
new String[] {"v"});
+        RowType schema = RowType.of(new DataType[] {innerSchema}, new String[] 
{"data"});
+
+        // Create test data
+        GenericVariant variant1 = GenericVariant.fromJson("{\"x\": 1, \"y\": 
2}");
+        GenericRow innerRow1 = GenericRow.of(variant1);
+
+        GenericVariant variant2 = GenericVariant.fromJson("{\"x\": 3, \"y\": 
4}");
+        GenericRow innerRow2 = GenericRow.of(variant2);
+
+        List<InternalRow> rows = new ArrayList<>();
+        rows.add(GenericRow.of(innerRow1));
+        rows.add(GenericRow.of(innerRow2));
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        // Nested variant in row<data: row<v: variant>>
+        // Expected: inferred schema for inner variant field with x and y
+        RowType expectedInnerType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.BIGINT()},
+                        new String[] {"x", "y"});
+        RowType expectedDataType =
+                RowType.of(
+                        new DataType[] 
{variantShreddingSchema(expectedInnerType)},
+                        new String[] {"v"});
+        
assertThat(inferredSchema.getField("data").type()).isEqualTo(expectedDataType);
+    }
+
+    @Test
+    void testInferSchemaWithArray() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        // Create test data with arrays
+        GenericVariant variant1 = GenericVariant.fromJson("{\"numbers\": [1, 
2, 3]}");
+        GenericVariant variant2 = GenericVariant.fromJson("{\"numbers\": [4, 
5, 6]}");
+
+        List<InternalRow> rows = Arrays.asList(GenericRow.of(variant1), 
GenericRow.of(variant2));
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        // Array field [1,2,3] should be inferred as array<bigint>
+        RowType expectedType =
+                RowType.of(
+                        new DataType[] {DataTypes.ARRAY(DataTypes.BIGINT())},
+                        new String[] {"numbers"});
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType));
+    }
+
+    @Test
+    void testInferSchemaWithMixedTypes() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        // Create test data with mixed types
+        GenericVariant variant1 =
+                GenericVariant.fromJson(
+                        "{\"str\": \"hello\", \"num\": 42, \"bool\": true, 
\"dec\": 3.14}");
+        GenericVariant variant2 =
+                GenericVariant.fromJson(
+                        "{\"str\": \"world\", \"num\": 100, \"bool\": false, 
\"dec\": 2.71}");
+
+        List<InternalRow> rows = Arrays.asList(GenericRow.of(variant1), 
GenericRow.of(variant2));
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        // Mixed types: string, bigint, boolean, decimal (3.14 and 2.71 become 
DECIMAL(18, 2))
+        RowType expectedType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.BOOLEAN(),
+                            DataTypes.DECIMAL(18, 2),
+                            DataTypes.BIGINT(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"bool", "dec", "num", "str"});
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType));
+    }
+
+    @Test
+    void testInferSchemaWithNullValues() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        // Create test data with null
+        GenericVariant variant1 = GenericVariant.fromJson("{\"a\": 1, \"b\": 
null}");
+        GenericVariant variant2 = GenericVariant.fromJson("{\"a\": 2, \"b\": 
3}");
+
+        List<InternalRow> rows = Arrays.asList(GenericRow.of(variant1), 
GenericRow.of(variant2));
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        // Field "a" appears in all rows, "b" has null in one row
+        // With NULL case handling, b field can now be inferred as BIGINT 
(null is handled properly)
+        RowType expectedType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.BIGINT()},
+                        new String[] {"a", "b"});
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType));
+    }
+
+    @Test
+    void testInferSchemaWithEmptyRows() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        List<InternalRow> rows = new ArrayList<>();
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        // Empty rows should result in variant type without typed schema
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(DataTypes.VARIANT()));
+    }
+
+    @Test
+    void testInferSchemaWithDeepNesting() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        // Create deeply nested JSON
+        String deepJson = "{\"level1\": {\"level2\": {\"level3\": {\"value\": 
42}}}}";
+        GenericVariant variant = GenericVariant.fromJson(deepJson);
+
+        List<InternalRow> rows = Arrays.asList(GenericRow.of(variant));
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        // Deep nested structure: level1 -> level2 -> level3 -> value
+        RowType level3Type =
+                RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[] 
{"value"});
+        RowType level2Type = RowType.of(new DataType[] {level3Type}, new 
String[] {"level3"});
+        RowType level1Type = RowType.of(new DataType[] {level2Type}, new 
String[] {"level2"});
+        RowType expectedType = RowType.of(new DataType[] {level1Type}, new 
String[] {"level1"});
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType));
+    }
+
+    @Test
+    void testInferSchemaWithMinFieldCardinalityRatio() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        List<InternalRow> rows = new ArrayList<>();
+
+        // Add 10 rows, where field "rare" appears in 2/10 rows (20%)
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"common\": 1, 
\"rare\": 99}")));
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"common\": 2, 
\"rare\": 88}")));
+        for (int i = 0; i < 8; i++) {
+            rows.add(GenericRow.of(GenericVariant.fromJson("{\"common\": " + i 
+ "}")));
+        }
+
+        // Test with threshold (0.1 = 10%): rare field should be included 
(2/10 = 20% >= 10%)
+        InferVariantShreddingSchema inferrer1 =
+                new InferVariantShreddingSchema(schema, 300, 50, 0.1);
+        RowType inferredSchema1 = inferrer1.inferSchema(rows);
+        RowType expectedType1 =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.BIGINT()},
+                        new String[] {"common", "rare"});
+        assertThat(inferredSchema1.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType1));
+
+        // Test with higher threshold (0.25 = 25%): rare field should be 
excluded (2/10 = 20% < 25%)
+        InferVariantShreddingSchema inferrer2 =
+                new InferVariantShreddingSchema(schema, 300, 50, 0.25);
+        RowType inferredSchema2 = inferrer2.inferSchema(rows);
+        RowType expectedType2 =
+                RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[] 
{"common"});
+        assertThat(inferredSchema2.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType2));
+    }
+
+    @Test
+    void testInferSchemaWithMaxDepthLimit() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        // Create a 3-level nested JSON
+        String json = "{\"level1\": {\"level2\": {\"level3\": {\"value\": 
42}}}}";
+        GenericVariant variant = GenericVariant.fromJson(json);
+        List<InternalRow> rows = Arrays.asList(GenericRow.of(variant));
+
+        // Test with max depth = 50: should infer full structure
+        InferVariantShreddingSchema inferrer1 =
+                new InferVariantShreddingSchema(schema, 300, 50, 0.1);
+        RowType inferredSchema1 = inferrer1.inferSchema(rows);
+
+        // Should have full nested structure: level1 -> level2 -> level3 -> 
value
+        RowType level3Type =
+                RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[] 
{"value"});
+        RowType level2Type = RowType.of(new DataType[] {level3Type}, new 
String[] {"level3"});
+        RowType level1Type = RowType.of(new DataType[] {level2Type}, new 
String[] {"level2"});
+        RowType expectedType1 = RowType.of(new DataType[] {level1Type}, new 
String[] {"level1"});
+        assertThat(inferredSchema1.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType1));
+
+        // Test with max depth = 1: level1 is inferred, but level2 and deeper 
become variant
+        InferVariantShreddingSchema inferrer2 =
+                new InferVariantShreddingSchema(schema, 300, 1, 0.1);
+        RowType inferredSchema2 = inferrer2.inferSchema(rows);
+
+        // Result: level1 field exists but is variant
+        RowType expectedType2 =
+                RowType.of(new DataType[] {DataTypes.VARIANT()}, new String[] 
{"level1"});
+        assertThat(inferredSchema2.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType2));
+    }
+
+    @Test
+    void testInferSchemaWithMaxWidthLimit() {
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        // Create JSON with 5 fields
+        String json = "{\"field1\": 1, \"field2\": 2, \"field3\": 3, 
\"field4\": 4, \"field5\": 5}";
+        GenericVariant variant = GenericVariant.fromJson(json);
+        List<InternalRow> rows = Arrays.asList(GenericRow.of(variant));
+
+        // Test with maxSchemaWidth = 11: all 5 fields should be inferred
+        // Each field consumes 2 counts (one for field itself, one for BIGINT 
type)
+        // So 5 fields = 10 counts, plus 1 for the root = 11 total
+        InferVariantShreddingSchema inferrer1 =
+                new InferVariantShreddingSchema(schema, 11, 50, 0.1);
+        RowType inferredSchema1 = inferrer1.inferSchema(rows);
+
+        // Should have all 5 fields
+        RowType expectedType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT()
+                        },
+                        new String[] {"field1", "field2", "field3", "field4", 
"field5"});
+        assertThat(inferredSchema1.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType1));
+
+        // Test with maxSchemaWidth = 9: only first 4 fields are inferred
+        // 4 fields = 8 counts, plus 1 for root = 9 total
+        InferVariantShreddingSchema inferrer2 = new 
InferVariantShreddingSchema(schema, 9, 50, 0.1);
+        RowType inferredSchema2 = inferrer2.inferSchema(rows);
+
+        // Should have only 4 fields (field1-4), field5 is dropped
+        RowType expectedType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.BIGINT()
+                        },
+                        new String[] {"field1", "field2", "field3", "field4"});
+        assertThat(inferredSchema2.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType2));
+    }
+
+    @Test
+    void testInferSchemaWithAllPrimitiveTypes() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        String json =
+                "{"
+                        + "\"string\": \"test\", "
+                        + "\"long\": 123456789, "
+                        + "\"double\": 3.14159, "
+                        + "\"boolean\": true, "
+                        + "\"null\": null"
+                        + "}";
+
+        GenericVariant variant = GenericVariant.fromJson(json);
+        List<InternalRow> rows = Arrays.asList(GenericRow.of(variant));
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        // All primitive types: boolean, decimal (3.14159 becomes DECIMAL), 
bigint, variant (null),
+        // string
+        RowType expectedType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.BOOLEAN(),
+                            DataTypes.DECIMAL(18, 5),
+                            DataTypes.BIGINT(),
+                            DataTypes.VARIANT(),
+                            DataTypes.STRING()
+                        },
+                        new String[] {"boolean", "double", "long", "null", 
"string"});
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType));
+    }
+
+    @Test
+    void testInferSchemaWithConflictingTypes() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        // Field "value" has different types in different rows
+        GenericVariant variant1 = GenericVariant.fromJson("{\"value\": 123}");
+        GenericVariant variant2 = GenericVariant.fromJson("{\"value\": 
\"string\"}");
+
+        List<InternalRow> rows = Arrays.asList(GenericRow.of(variant1), 
GenericRow.of(variant2));
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        // Conflicting types (int vs string) should fall back to variant type
+        RowType expectedType =
+                RowType.of(new DataType[] {DataTypes.VARIANT()}, new String[] 
{"value"});
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType));
+    }
+
+    @Test
+    void testMultipleVariantFields() {
+        // Schema: row<v1: variant, v2: variant, id: int>
+        RowType schema =
+                RowType.of(
+                        new DataType[] {DataTypes.VARIANT(), 
DataTypes.VARIANT(), DataTypes.INT()},
+                        new String[] {"v1", "v2", "id"});
+
+        GenericVariant variant1 = GenericVariant.fromJson("{\"name\": 
\"Alice\"}");
+        GenericVariant variant2 = GenericVariant.fromJson("{\"age\": 30}");
+
+        List<InternalRow> rows =
+                Arrays.asList(
+                        GenericRow.of(variant1, variant2, 1), 
GenericRow.of(variant1, variant2, 2));
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        // Multiple variant fields: v1 with {name: string}, v2 with {age: 
bigint}, id: int
+        RowType expectedV1Type =
+                RowType.of(new DataType[] {DataTypes.STRING()}, new String[] 
{"name"});
+        RowType expectedV2Type =
+                RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[] 
{"age"});
+        assertThat(inferredSchema.getFieldCount()).isEqualTo(3);
+        assertThat(inferredSchema.getField("v1").type())
+                .isEqualTo(variantShreddingSchema(expectedV1Type));
+        assertThat(inferredSchema.getField("v2").type())
+                .isEqualTo(variantShreddingSchema(expectedV2Type));
+        
assertThat(inferredSchema.getField("id").type()).isEqualTo(DataTypes.INT());
+    }
+
+    @Test
+    void testLargeDatasetWithManyFields() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        List<InternalRow> rows = new ArrayList<>();
+        // Generate 500 rows, each with 50 fields
+        for (int i = 0; i < 500; i++) {
+            StringBuilder jsonBuilder = new StringBuilder("{");
+            for (int j = 0; j < 50; j++) {
+                if (j > 0) {
+                    jsonBuilder.append(", ");
+                }
+                jsonBuilder.append(String.format("\"field%d\": %d", j, (i * 50 
+ j) % 1000));
+            }
+            jsonBuilder.append("}");
+            
rows.add(GenericRow.of(GenericVariant.fromJson(jsonBuilder.toString())));
+        }
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        // All 50 fields should be inferred as BIGINT
+        DataType variantFieldType = inferredSchema.getField("v").type();
+        assertThat(variantFieldType).isInstanceOf(RowType.class);
+        RowType shreddedSchema = (RowType) variantFieldType;
+
+        // Should have metadata, value, and typed_value fields
+        DataField typedValueField = shreddedSchema.getField("typed_value");
+        assertThat(typedValueField).isNotNull();
+        assertThat(typedValueField.type()).isInstanceOf(RowType.class);
+
+        RowType typedValue = (RowType) typedValueField.type();
+        // Should infer all 50 fields (field0 to field49) as BIGINT
+        assertThat(typedValue.getFieldCount()).isEqualTo(50);
+
+        // Verify the schema structure (not exact equality due to field 
ordering)
+        
assertThat(inferredSchema.getField("v").type()).isInstanceOf(RowType.class);
+        RowType actualShreddedSchema = (RowType) 
inferredSchema.getField("v").type();
+        assertThat(actualShreddedSchema.getFieldCount())
+                .isEqualTo(3); // metadata, value, typed_value
+
+        // Verify all 50 fields are present with correct types
+        RowType actualTypedValue = (RowType) 
actualShreddedSchema.getField("typed_value").type();
+        for (int i = 0; i < 50; i++) {
+            String fieldName = "field" + i;
+            DataField field = actualTypedValue.getField(fieldName);
+            assertThat(field).as("Field %s should exist", 
fieldName).isNotNull();
+            // Each field is shredded: ROW<value BYTES, typed_value BIGINT> 
NOT NULL
+            assertThat(field.type()).isInstanceOf(RowType.class);
+            RowType shreddedFieldType = (RowType) field.type();
+            assertThat(shreddedFieldType.getField("typed_value").type())
+                    .isEqualTo(DataTypes.BIGINT());
+        }
+    }
+
+    @Test
+    void testNullRecords() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        List<InternalRow> rows = new ArrayList<>();
+        // Add rows where the entire variant value is null
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"a\": 1, \"b\": 
2}")));
+        rows.add(GenericRow.of((GenericVariant) null)); // Entire record is 
null
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"a\": 3, \"b\": 
4}")));
+        rows.add(GenericRow.of((GenericVariant) null)); // Entire record is 
null
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"a\": 5, \"b\": 
6}")));
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        // Verify the inferred schema structure
+        assertThat(inferredSchema).isNotNull();
+        assertThat(inferredSchema.getFieldCount()).isEqualTo(1);
+
+        // The variant field should contain shredded schema with fields a and b
+        DataType variantFieldType = inferredSchema.getFields().get(0).type();
+        assertThat(variantFieldType).isInstanceOf(RowType.class);
+
+        RowType shreddedSchema = (RowType) variantFieldType;
+        // Should have fields for metadata, typed_value, and value
+        assertThat(shreddedSchema.getFieldCount()).isGreaterThanOrEqualTo(3);
+    }
+
+    @Test
+    void testAllNullRecords() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        List<InternalRow> rows = new ArrayList<>();
+        // All records are null
+        for (int i = 0; i < 100; i++) {
+            rows.add(GenericRow.of((GenericVariant) null));
+        }
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        // When all records are null, should result in variant type without 
typed schema
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(DataTypes.VARIANT()));
+    }
+
+    @Test
+    void testMixedNullAndValidRecords() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        List<InternalRow> rows = new ArrayList<>();
+        // Mix of null records and valid records
+        for (int i = 0; i < 200; i++) {
+            if (i % 3 == 0) {
+                rows.add(GenericRow.of((GenericVariant) null));
+            } else {
+                rows.add(
+                        GenericRow.of(
+                                GenericVariant.fromJson(
+                                        String.format(
+                                                "{\"id\": %d, \"value\": 
\"data%d\"}", i, i))));
+            }
+        }
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        RowType expectedTypedValue =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"id", "value"});
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedTypedValue));
+    }
+
+    @Test
+    void testMixedArrayTypes() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        List<InternalRow> rows = new ArrayList<>();
+        // Arrays with different element types
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [1, 2, 
3]}")));
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [\"a\", 
\"b\", \"c\"]}")));
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [true, 
false, true]}")));
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [1, 
\"mixed\", true]}")));
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        RowType expectedType =
+                RowType.of(
+                        new DataType[] {DataTypes.ARRAY(DataTypes.VARIANT())},
+                        new String[] {"arr"});
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType));
+    }
+
+    @Test
+    void testIntegerTypeMixing() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        List<InternalRow> rows = new ArrayList<>();
+        // Mix of small and large integers - should merge to BIGINT
+        for (int i = 0; i < 100; i++) {
+            long value = (i % 2 == 0) ? i : (i * 1000000000L);
+            
rows.add(GenericRow.of(GenericVariant.fromJson(String.format("{\"num\": %d}", 
value))));
+        }
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        RowType expectedTypedValue =
+                RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[] 
{"num"});
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedTypedValue));
+    }
+
+    @Test
+    void testDoubleAndIntegerMixing() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        List<InternalRow> rows = new ArrayList<>();
+        // Mix integers and doubles - should become variant type
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"value\": 100}")));
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"value\": 100}")));
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"value\": 100}")));
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"value\": 3.14}")));
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        RowType expectedTypedValue =
+                RowType.of(new DataType[] {DataTypes.DECIMAL(18, 2)}, new 
String[] {"value"});
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedTypedValue));
+    }
+
+    @Test
+    void testNullInNestedArrays() {
+        // Schema: row<v: variant>
+        RowType schema = RowType.of(new DataType[] {DataTypes.VARIANT()}, new 
String[] {"v"});
+
+        List<InternalRow> rows = new ArrayList<>();
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [1, 2, 3, 
null, 5]}")));
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [null, null, 
null]}")));
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": [10, null, 
20, null, 30]}")));
+        rows.add(GenericRow.of(GenericVariant.fromJson("{\"arr\": null}")));
+
+        InferVariantShreddingSchema inferrer = 
defaultInferVariantShreddingSchema(schema);
+        RowType inferredSchema = inferrer.inferSchema(rows);
+
+        RowType expectedType =
+                RowType.of(
+                        new DataType[] {DataTypes.ARRAY(DataTypes.VARIANT())},
+                        new String[] {"arr"});
+        assertThat(inferredSchema.getField("v").type())
+                .isEqualTo(variantShreddingSchema(expectedType));
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
index 2b2382304e..86267052e1 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
@@ -88,7 +88,7 @@ public class VariantUtils {
         RowType shreddingFields = shreddingFields(conf);
         if (shreddingFields != null && 
shreddingFields.containsField(fieldName)) {
             return PaimonShreddingUtils.variantShreddingSchema(
-                    (RowType) shreddingFields.getField(fieldName).type());
+                    shreddingFields.getField(fieldName).type());
         } else {
             return null;
         }
@@ -106,7 +106,7 @@ public class VariantUtils {
                     && shreddingFields.containsField(field.name())) {
                 RowType shreddingSchema =
                         PaimonShreddingUtils.variantShreddingSchema(
-                                (RowType) 
shreddingFields.getField(field.name()).type());
+                                shreddingFields.getField(field.name()).type());
                 newFields.add(field.newType(shreddingSchema));
             } else {
                 newFields.add(field);

Reply via email to