This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e9ac92d077 [variant] Add shredding function (#5901)
e9ac92d077 is described below

commit e9ac92d077040293bbff4762dcd51c6c1f4578d3
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jul 15 17:39:09 2025 +0800

    [variant] Add shredding function (#5901)
---
 LICENSE                                            |   3 +
 .../apache/paimon/data/variant/GenericVariant.java |   8 +-
 .../paimon/data/variant/GenericVariantBuilder.java |   7 +-
 .../paimon/data/variant/PaimonShreddingUtils.java  | 432 +++++++++++++++++++++
 .../apache/paimon/data/variant/ShreddingUtils.java | 208 ++++++++++
 .../apache/paimon/data/variant/VariantSchema.java  | 205 ++++++++++
 .../data/variant/VariantShreddingWriter.java       | 327 ++++++++++++++++
 .../paimon/data/variant/GenericVariantTest.java    | 143 +++++++
 8 files changed, 1326 insertions(+), 7 deletions(-)

diff --git a/LICENSE b/LICENSE
index c5e688b325..1fc316248a 100644
--- a/LICENSE
+++ b/LICENSE
@@ -294,6 +294,9 @@ 
paimon-format/src/main/java/org/apache/paimon/format/parquet/newReader/Vectorize
 
paimon-format/test/main/java/org/apache/paimon/format/parquet/newReader/DeltaByteArrayEncodingTest.java
 
paimon-format/test/main/java/org/apache/paimon/format/parquet/newReader/DeltaEncodingTest.java
 
paimon-format/test/main/java/org/apache/paimon/format/parquet/newReader/DeltaLengthByteArrayEncodingTest.java
+paimon-common/src/main/java/org/apache/paimon/data/variant/ShreddingUtils.java
+paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
+paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
 from https://spark.apache.org/ version 4.0.0-preview2
 
 MIT License
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
index 17faf26097..ad00d2d798 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
@@ -123,7 +123,7 @@ public final class GenericVariant implements Variant {
         return Objects.hash(Arrays.hashCode(value), Arrays.hashCode(metadata), 
pos);
     }
 
-    public static Variant fromJson(String json) {
+    public static GenericVariant fromJson(String json) {
         try {
             return GenericVariantBuilder.parseJson(json, false);
         } catch (IOException e) {
@@ -300,9 +300,9 @@ public final class GenericVariant implements Variant {
     /** Variant object field. */
     public static final class ObjectField {
         public final String key;
-        public final Variant value;
+        public final GenericVariant value;
 
-        public ObjectField(String key, Variant value) {
+        public ObjectField(String key, GenericVariant value) {
             this.key = key;
             this.value = value;
         }
@@ -322,7 +322,7 @@ public final class GenericVariant implements Variant {
                     int id = readUnsigned(value, idStart + idSize * index, 
idSize);
                     int offset = readUnsigned(value, offsetStart + offsetSize 
* index, offsetSize);
                     String key = getMetadataKey(metadata, id);
-                    Variant v = new GenericVariant(value, metadata, dataStart 
+ offset);
+                    GenericVariant v = new GenericVariant(value, metadata, 
dataStart + offset);
                     return new ObjectField(key, v);
                 });
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java
index 169a2f8c9b..9893652c71 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java
@@ -95,7 +95,8 @@ public class GenericVariantBuilder {
      *
      * @throws IOException if any JSON parsing error happens.
      */
-    public static Variant parseJson(String json, boolean allowDuplicateKeys) 
throws IOException {
+    public static GenericVariant parseJson(String json, boolean 
allowDuplicateKeys)
+            throws IOException {
         try (JsonParser parser = new JsonFactory().createParser(json)) {
             parser.nextToken();
             return parseJson(parser, allowDuplicateKeys);
@@ -105,7 +106,7 @@ public class GenericVariantBuilder {
     /**
      * Similar {@link #parseJson(String, boolean)}, but takes a JSON parser 
instead of string input.
      */
-    public static Variant parseJson(JsonParser parser, boolean 
allowDuplicateKeys)
+    public static GenericVariant parseJson(JsonParser parser, boolean 
allowDuplicateKeys)
             throws IOException {
         GenericVariantBuilder builder = new 
GenericVariantBuilder(allowDuplicateKeys);
         builder.buildJson(parser);
@@ -113,7 +114,7 @@ public class GenericVariantBuilder {
     }
 
     // Build the variant metadata from `dictionaryKeys` and return the variant 
result.
-    public Variant result() {
+    public GenericVariant result() {
         int numKeys = dictionaryKeys.size();
         // Use long to avoid overflow in accumulating lengths.
         long dictionaryStringSize = 0;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
new file mode 100644
index 0000000000..18f129ac87
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.DataGetters;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarBinaryType;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** Utils for paimon shredding. */
+public class PaimonShreddingUtils {
+
+    public static final String METADATA_FIELD_NAME = Variant.METADATA;
+    public static final String VARIANT_VALUE_FIELD_NAME = Variant.VALUE;
+    public static final String TYPED_VALUE_FIELD_NAME = "typed_value";
+
+    /** Paimon shredded row. */
+    static class PaimonShreddedRow implements ShreddingUtils.ShreddedRow {
+
+        private final DataGetters row;
+
+        public PaimonShreddedRow(DataGetters row) {
+            this.row = row;
+        }
+
+        @Override
+        public boolean isNullAt(int ordinal) {
+            return row.isNullAt(ordinal);
+        }
+
+        @Override
+        public boolean getBoolean(int ordinal) {
+            return row.getBoolean(ordinal);
+        }
+
+        @Override
+        public byte getByte(int ordinal) {
+            return row.getByte(ordinal);
+        }
+
+        @Override
+        public short getShort(int ordinal) {
+            return row.getShort(ordinal);
+        }
+
+        @Override
+        public int getInt(int ordinal) {
+            return row.getInt(ordinal);
+        }
+
+        @Override
+        public long getLong(int ordinal) {
+            return row.getLong(ordinal);
+        }
+
+        @Override
+        public float getFloat(int ordinal) {
+            return row.getFloat(ordinal);
+        }
+
+        @Override
+        public double getDouble(int ordinal) {
+            return row.getDouble(ordinal);
+        }
+
+        @Override
+        public BigDecimal getDecimal(int ordinal, int precision, int scale) {
+            return row.getDecimal(ordinal, precision, scale).toBigDecimal();
+        }
+
+        @Override
+        public String getString(int ordinal) {
+            return row.getString(ordinal).toString();
+        }
+
+        @Override
+        public byte[] getBinary(int ordinal) {
+            return row.getBinary(ordinal);
+        }
+
+        @Override
+        public UUID getUuid(int ordinal) {
+            // Paimon currently does not shred UUID.
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ShreddingUtils.ShreddedRow getStruct(int ordinal, int 
numFields) {
+            return new PaimonShreddedRow(row.getRow(ordinal, numFields));
+        }
+
+        @Override
+        public ShreddingUtils.ShreddedRow getArray(int ordinal) {
+            return new PaimonShreddedRow(row.getArray(ordinal));
+        }
+
+        @Override
+        public int numElements() {
+            return ((InternalArray) row).size();
+        }
+    }
+
+    public static RowType variantShreddingSchema(RowType rowType) {
+        return variantShreddingSchema(rowType, true);
+    }
+
+    /**
+     * Given an expected schema of a Variant value, returns a suitable schema 
for shredding, by
+     * inserting appropriate intermediate value/typed_value fields at each 
level. For example, to
+     * represent the JSON {"a": 1, "b": "hello"}, the schema struct&lt;a: int, 
b: string&gt; could
+     * be passed into this function, and it would return the shredding schema: 
struct&lt; metadata:
+     * binary, value: binary, typed_value: struct&lt; a: 
struct&lt;typed_value: int, value:
+     * binary&gt;, b: struct&lt;typed_value: string, value: binary&gt;&gt;&gt;
+     */
+    private static RowType variantShreddingSchema(DataType dataType, boolean 
topLevel) {
+        RowType.Builder builder = RowType.builder();
+        if (topLevel) {
+            builder.field(METADATA_FIELD_NAME, DataTypes.BYTES());
+        }
+        switch (dataType.getTypeRoot()) {
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) dataType;
+                ArrayType shreddedArrayType =
+                        new ArrayType(
+                                arrayType.isNullable(),
+                                
variantShreddingSchema(arrayType.getElementType(), false));
+                builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
+                builder.field(TYPED_VALUE_FIELD_NAME, shreddedArrayType);
+                break;
+            case ROW:
+                // The field name level is always non-nullable: Variant null 
values are represented
+                // in the "value" column as "00", and missing values are 
represented by setting both
+                // "value" and "typed_value" to null.
+                RowType rowType = (RowType) dataType;
+                RowType shreddedRowType =
+                        rowType.copy(
+                                rowType.getFields().stream()
+                                        .map(
+                                                field ->
+                                                        field.newType(
+                                                                
variantShreddingSchema(
+                                                                               
 field.type(), false)
+                                                                        
.notNull()))
+                                        .collect(Collectors.toList()));
+                builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
+                builder.field(TYPED_VALUE_FIELD_NAME, shreddedRowType);
+                break;
+            case VARIANT:
+                builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
+                break;
+            case CHAR:
+            case VARCHAR:
+            case BOOLEAN:
+            case BINARY:
+            case VARBINARY:
+            case DECIMAL:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+                builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
+                builder.field(TYPED_VALUE_FIELD_NAME, dataType);
+                break;
+            default:
+                throw invalidVariantShreddingSchema(dataType);
+        }
+        return builder.build();
+    }
+
+    public static VariantSchema buildVariantSchema(RowType rowType) {
+        return buildVariantSchema(rowType, true);
+    }
+
+    private static VariantSchema buildVariantSchema(RowType rowType, boolean 
topLevel) {
+        int typedIdx = -1;
+        int variantIdx = -1;
+        int topLevelMetadataIdx = -1;
+        VariantSchema.ScalarType scalarSchema = null;
+        VariantSchema.ObjectField[] objectSchema = null;
+        VariantSchema arraySchema = null;
+
+        // The struct must not be empty or contain duplicate field names. The 
latter is enforced in
+        // the loop below (`if (typedIdx != -1)` and other similar checks).
+        if (rowType.getFields().isEmpty()) {
+            throw invalidVariantShreddingSchema(rowType);
+        }
+
+        List<DataField> fields = rowType.getFields();
+        for (int i = 0; i < fields.size(); i++) {
+            DataField field = fields.get(i);
+            DataType dataType = field.type();
+            switch (field.name()) {
+                case TYPED_VALUE_FIELD_NAME:
+                    if (typedIdx != -1) {
+                        throw invalidVariantShreddingSchema(rowType);
+                    }
+                    typedIdx = i;
+                    switch (field.type().getTypeRoot()) {
+                        case ROW:
+                            RowType r = (RowType) dataType;
+                            List<DataField> rFields = r.getFields();
+                            // The struct must not be empty or contain 
duplicate field names.
+                            if (fields.isEmpty()
+                                    || fields.stream().distinct().count() != 
fields.size()) {
+                                throw invalidVariantShreddingSchema(rowType);
+                            }
+                            objectSchema = new 
VariantSchema.ObjectField[rFields.size()];
+                            for (int index = 0; index < rFields.size(); 
index++) {
+                                if (field.type() instanceof RowType) {
+                                    DataField f = rFields.get(index);
+                                    objectSchema[index] =
+                                            new VariantSchema.ObjectField(
+                                                    f.name(),
+                                                    
buildVariantSchema((RowType) f.type(), false));
+                                } else {
+                                    throw 
invalidVariantShreddingSchema(rowType);
+                                }
+                            }
+                            break;
+                        case ARRAY:
+                            ArrayType arrayType = (ArrayType) dataType;
+                            if (arrayType.getElementType() instanceof RowType) 
{
+                                arraySchema =
+                                        buildVariantSchema(
+                                                (RowType) 
arrayType.getElementType(), false);
+                            } else {
+                                throw invalidVariantShreddingSchema(rowType);
+                            }
+                            break;
+                        case BOOLEAN:
+                            scalarSchema = new VariantSchema.BooleanType();
+                            break;
+                        case TINYINT:
+                            scalarSchema =
+                                    new 
VariantSchema.IntegralType(VariantSchema.IntegralSize.BYTE);
+                            break;
+                        case SMALLINT:
+                            scalarSchema =
+                                    new VariantSchema.IntegralType(
+                                            VariantSchema.IntegralSize.SHORT);
+                            break;
+                        case INTEGER:
+                            scalarSchema =
+                                    new 
VariantSchema.IntegralType(VariantSchema.IntegralSize.INT);
+                            break;
+                        case BIGINT:
+                            scalarSchema =
+                                    new 
VariantSchema.IntegralType(VariantSchema.IntegralSize.LONG);
+                            break;
+                        case FLOAT:
+                            scalarSchema = new VariantSchema.FloatType();
+                            break;
+                        case DOUBLE:
+                            scalarSchema = new VariantSchema.DoubleType();
+                            break;
+                        case VARCHAR:
+                            scalarSchema = new VariantSchema.StringType();
+                            break;
+                        case BINARY:
+                            scalarSchema = new VariantSchema.BinaryType();
+                            break;
+                        case DATE:
+                            scalarSchema = new VariantSchema.DateType();
+                            break;
+                        case DECIMAL:
+                            DecimalType d = (DecimalType) dataType;
+                            scalarSchema =
+                                    new 
VariantSchema.DecimalType(d.getPrecision(), d.getScale());
+                            break;
+                        default:
+                            throw invalidVariantShreddingSchema(rowType);
+                    }
+                    break;
+
+                case VARIANT_VALUE_FIELD_NAME:
+                    if (variantIdx != -1 || !(field.type() instanceof 
VarBinaryType)) {
+                        throw invalidVariantShreddingSchema(rowType);
+                    }
+                    variantIdx = i;
+                    break;
+
+                case METADATA_FIELD_NAME:
+                    if (topLevelMetadataIdx != -1 || !(field.type() instanceof 
VarBinaryType)) {
+                        throw invalidVariantShreddingSchema(rowType);
+                    }
+                    topLevelMetadataIdx = i;
+                    break;
+
+                default:
+                    throw invalidVariantShreddingSchema(rowType);
+            }
+
+            if (topLevel && (topLevelMetadataIdx == -1)) {
+                topLevelMetadataIdx = i;
+            }
+        }
+
+        if (topLevel != (topLevelMetadataIdx >= 0)) {
+            throw invalidVariantShreddingSchema(rowType);
+        }
+
+        return new VariantSchema(
+                typedIdx,
+                variantIdx,
+                topLevelMetadataIdx,
+                fields.size(),
+                scalarSchema,
+                objectSchema,
+                arraySchema);
+    }
+
+    private static RuntimeException invalidVariantShreddingSchema(DataType 
dataType) {
+        return new RuntimeException("Invalid variant shredding schema: " + 
dataType);
+    }
+
+    /** Paimon shredded result. */
+    public static class PaimonShreddedResult implements 
VariantShreddingWriter.ShreddedResult {
+
+        private final VariantSchema schema;
+        // Result is stored as an InternalRow.
+        private final GenericRow row;
+
+        public PaimonShreddedResult(VariantSchema schema) {
+            this.schema = schema;
+            this.row = new GenericRow(schema.numFields);
+        }
+
+        @Override
+        public void addArray(VariantShreddingWriter.ShreddedResult[] array) {
+            GenericArray arrayResult =
+                    new GenericArray(
+                            java.util.Arrays.stream(array)
+                                    .map(result -> ((PaimonShreddedResult) 
result).row)
+                                    .toArray(InternalRow[]::new));
+            row.setField(schema.typedIdx, arrayResult);
+        }
+
+        @Override
+        public void addObject(VariantShreddingWriter.ShreddedResult[] values) {
+            GenericRow innerRow = new GenericRow(schema.objectSchema.length);
+            for (int i = 0; i < values.length; i++) {
+                innerRow.setField(i, ((PaimonShreddedResult) values[i]).row);
+            }
+            row.setField(schema.typedIdx, innerRow);
+        }
+
+        @Override
+        public void addVariantValue(byte[] result) {
+            row.setField(schema.variantIdx, result);
+        }
+
+        @Override
+        public void addScalar(Object result) {
+            Object paimonValue;
+            if (schema.scalarSchema instanceof VariantSchema.StringType) {
+                paimonValue = BinaryString.fromString((String) result);
+            } else if (schema.scalarSchema instanceof 
VariantSchema.DecimalType) {
+                VariantSchema.DecimalType dt = (VariantSchema.DecimalType) 
schema.scalarSchema;
+                paimonValue = Decimal.fromBigDecimal((BigDecimal) result, 
dt.precision, dt.scale);
+            } else {
+                paimonValue = result;
+            }
+            row.setField(schema.typedIdx, paimonValue);
+        }
+
+        @Override
+        public void addMetadata(byte[] result) {
+            row.setField(schema.topLevelMetadataIdx, result);
+        }
+    }
+
+    /** Paimon shredded result builder. */
+    public static class PaimonShreddedResultBuilder
+            implements VariantShreddingWriter.ShreddedResultBuilder {
+        @Override
+        public VariantShreddingWriter.ShreddedResult createEmpty(VariantSchema 
schema) {
+            return new PaimonShreddedResult(schema);
+        }
+
+        // Consider allowing this to be set via config?
+        @Override
+        public boolean allowNumericScaleChanges() {
+            return true;
+        }
+    }
+
+    /** Converts an input variant into shredded components. Returns the 
shredded result. */
+    public static InternalRow castShredded(GenericVariant variant, 
VariantSchema variantSchema) {
+        return ((PaimonShreddedResult)
+                        VariantShreddingWriter.castShredded(
+                                variant, variantSchema, new 
PaimonShreddedResultBuilder()))
+                .row;
+    }
+
+    /** Rebuilds a variant from shredded components with the variant schema. */
+    public static Variant rebuild(InternalRow row, VariantSchema 
variantSchema) {
+        return ShreddingUtils.rebuild(new PaimonShreddedRow(row), 
variantSchema);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/ShreddingUtils.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/ShreddingUtils.java
new file mode 100644
index 0000000000..f87d5f6178
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/ShreddingUtils.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.UUID;
+
+import static 
org.apache.paimon.data.variant.GenericVariantUtil.malformedVariant;
+
+/* This file is based on source code from the Spark Project 
(http://spark.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Shredding Utils. */
+public class ShreddingUtils {
+
+    /**
+     * Interface to read from a shredded result. It essentially has the same 
interface and semantics
+     * as Spark's `SpecializedGetters`, but we need a new interface to avoid 
the dependency.
+     */
+    public interface ShreddedRow {
+
+        boolean isNullAt(int ordinal);
+
+        boolean getBoolean(int ordinal);
+
+        byte getByte(int ordinal);
+
+        short getShort(int ordinal);
+
+        int getInt(int ordinal);
+
+        long getLong(int ordinal);
+
+        float getFloat(int ordinal);
+
+        double getDouble(int ordinal);
+
+        BigDecimal getDecimal(int ordinal, int precision, int scale);
+
+        String getString(int ordinal);
+
+        byte[] getBinary(int ordinal);
+
+        UUID getUuid(int ordinal);
+
+        ShreddedRow getStruct(int ordinal, int numFields);
+
+        ShreddedRow getArray(int ordinal);
+
+        int numElements();
+    }
+
+    // This `rebuild` function should only be called on the top-level schema, 
and that other private
+    // implementation will be called on any recursively shredded sub-schema.
+    public static Variant rebuild(ShreddedRow row, VariantSchema schema) {
+        if (schema.topLevelMetadataIdx < 0 || 
row.isNullAt(schema.topLevelMetadataIdx)) {
+            throw malformedVariant();
+        }
+        byte[] metadata = row.getBinary(schema.topLevelMetadataIdx);
+        if (schema.isUnshredded()) {
+            // `rebuild` is unnecessary for unshredded variant.
+            if (row.isNullAt(schema.variantIdx)) {
+                throw malformedVariant();
+            }
+            return new GenericVariant(row.getBinary(schema.variantIdx), 
metadata);
+        }
+        GenericVariantBuilder builder = new GenericVariantBuilder(false);
+        rebuild(row, metadata, schema, builder);
+        return builder.result();
+    }
+
+    // Rebuild a variant value from the shredded data according to the 
reconstruction algorithm in
+    // 
https://github.com/apache/parquet-format/blob/master/VariantShredding.md.
+    // Append the result to `builder`.
+    public static void rebuild(
+            ShreddedRow row, byte[] metadata, VariantSchema schema, 
GenericVariantBuilder builder) {
+        int typedIdx = schema.typedIdx;
+        int variantIdx = schema.variantIdx;
+        if (typedIdx >= 0 && !row.isNullAt(typedIdx)) {
+            if (schema.scalarSchema != null) {
+                VariantSchema.ScalarType scalar = schema.scalarSchema;
+                if (scalar instanceof VariantSchema.StringType) {
+                    builder.appendString(row.getString(typedIdx));
+                } else if (scalar instanceof VariantSchema.IntegralType) {
+                    VariantSchema.IntegralType it = 
(VariantSchema.IntegralType) scalar;
+                    long value = 0;
+                    switch (it.size) {
+                        case BYTE:
+                            value = row.getByte(typedIdx);
+                            break;
+                        case SHORT:
+                            value = row.getShort(typedIdx);
+                            break;
+                        case INT:
+                            value = row.getInt(typedIdx);
+                            break;
+                        case LONG:
+                            value = row.getLong(typedIdx);
+                            break;
+                    }
+                    builder.appendLong(value);
+                } else if (scalar instanceof VariantSchema.FloatType) {
+                    builder.appendFloat(row.getFloat(typedIdx));
+                } else if (scalar instanceof VariantSchema.DoubleType) {
+                    builder.appendDouble(row.getDouble(typedIdx));
+                } else if (scalar instanceof VariantSchema.BooleanType) {
+                    builder.appendBoolean(row.getBoolean(typedIdx));
+                } else if (scalar instanceof VariantSchema.BinaryType) {
+                    builder.appendBinary(row.getBinary(typedIdx));
+                } else if (scalar instanceof VariantSchema.UuidType) {
+                    builder.appendUuid(row.getUuid(typedIdx));
+                } else if (scalar instanceof VariantSchema.DecimalType) {
+                    VariantSchema.DecimalType dt = (VariantSchema.DecimalType) 
scalar;
+                    builder.appendDecimal(row.getDecimal(typedIdx, 
dt.precision, dt.scale));
+                } else if (scalar instanceof VariantSchema.DateType) {
+                    builder.appendDate(row.getInt(typedIdx));
+                } else if (scalar instanceof VariantSchema.TimestampType) {
+                    builder.appendTimestamp(row.getLong(typedIdx));
+                } else {
+                    assert scalar instanceof VariantSchema.TimestampNTZType;
+                    builder.appendTimestampNtz(row.getLong(typedIdx));
+                }
+            } else if (schema.arraySchema != null) {
+                VariantSchema elementSchema = schema.arraySchema;
+                ShreddedRow array = row.getArray(typedIdx);
+                int start = builder.getWritePos();
+                ArrayList<Integer> offsets = new 
ArrayList<>(array.numElements());
+                for (int i = 0; i < array.numElements(); i++) {
+                    offsets.add(builder.getWritePos() - start);
+                    rebuild(
+                            array.getStruct(i, elementSchema.numFields),
+                            metadata,
+                            elementSchema,
+                            builder);
+                }
+                builder.finishWritingArray(start, offsets);
+            } else {
+                ShreddedRow object = row.getStruct(typedIdx, 
schema.objectSchema.length);
+                ArrayList<GenericVariantBuilder.FieldEntry> fields = new 
ArrayList<>();
+                int start = builder.getWritePos();
+                for (int fieldIdx = 0; fieldIdx < schema.objectSchema.length; 
++fieldIdx) {
+                    // Shredded field must not be null.
+                    if (object.isNullAt(fieldIdx)) {
+                        throw malformedVariant();
+                    }
+                    String fieldName = schema.objectSchema[fieldIdx].fieldName;
+                    VariantSchema fieldSchema = 
schema.objectSchema[fieldIdx].schema;
+                    ShreddedRow fieldValue = object.getStruct(fieldIdx, 
fieldSchema.numFields);
+                    // If the field doesn't have non-null `typed_value` or 
`value`, it is missing.
+                    if ((fieldSchema.typedIdx >= 0 && 
!fieldValue.isNullAt(fieldSchema.typedIdx))
+                            || (fieldSchema.variantIdx >= 0
+                                    && 
!fieldValue.isNullAt(fieldSchema.variantIdx))) {
+                        int id = builder.addKey(fieldName);
+                        fields.add(
+                                new GenericVariantBuilder.FieldEntry(
+                                        fieldName, id, builder.getWritePos() - 
start));
+                        rebuild(fieldValue, metadata, fieldSchema, builder);
+                    }
+                }
+                if (variantIdx >= 0 && !row.isNullAt(variantIdx)) {
+                    // Add the leftover fields in the variant binary.
+                    GenericVariant v = new 
GenericVariant(row.getBinary(variantIdx), metadata);
+                    if (v.getType() != GenericVariantUtil.Type.OBJECT) {
+                        throw malformedVariant();
+                    }
+                    for (int i = 0; i < v.objectSize(); ++i) {
+                        GenericVariant.ObjectField field = 
v.getFieldAtIndex(i);
+                        // `value` must not contain any shredded field.
+                        if (schema.objectSchemaMap.containsKey(field.key)) {
+                            throw malformedVariant();
+                        }
+                        int id = builder.addKey(field.key);
+                        fields.add(
+                                new GenericVariantBuilder.FieldEntry(
+                                        field.key, id, builder.getWritePos() - 
start));
+                        builder.appendVariant(field.value);
+                    }
+                }
+                builder.finishWritingObject(start, fields);
+            }
+        } else if (variantIdx >= 0 && !row.isNullAt(variantIdx)) {
+            // `typed_value` doesn't exist or is null. Read from `value`.
+            builder.appendVariant(new 
GenericVariant(row.getBinary(variantIdx), metadata));
+        } else {
+            // This means the variant is missing in a context where it must 
present, so the input
+            // data is invalid.
+            throw malformedVariant();
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
new file mode 100644
index 0000000000..9fb7f25ba2
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/* This file is based on source code from the Spark Project 
(http://spark.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * Defines a valid shredding schema, as described in
+ * https://github.com/apache/parquet-format/blob/master/VariantShredding.md. A 
shredding schema
+ * contains a value and optional typed_value field. If a typed_value is an 
array or struct, it
+ * recursively contains its own shredding schema for elements and fields, 
respectively. The schema
+ * also contains a metadata field at the top level, but not in recursively 
shredded fields.
+ */
+public class VariantSchema {
+
+    /** Represents one field of an object in the shredding schema. */
+    public static final class ObjectField {
+        public final String fieldName;
+        public final VariantSchema schema;
+
+        public ObjectField(String fieldName, VariantSchema schema) {
+            this.fieldName = fieldName;
+            this.schema = schema;
+        }
+
+        @Override
+        public String toString() {
+            return "ObjectField{" + "fieldName=" + fieldName + ", schema=" + 
schema + '}';
+        }
+    }
+
+    /** ScalarType. */
+    public abstract static class ScalarType {}
+
+    /** StringType. */
+    public static final class StringType extends ScalarType {}
+
+    /** IntegralSize. */
+    public enum IntegralSize {
+        BYTE,
+        SHORT,
+        INT,
+        LONG
+    }
+
+    /** IntegralType. */
+    public static final class IntegralType extends ScalarType {
+        public final IntegralSize size;
+
+        public IntegralType(IntegralSize size) {
+            this.size = size;
+        }
+    }
+
+    /** FloatType. */
+    public static final class FloatType extends ScalarType {}
+
+    /** DoubleType. */
+    public static final class DoubleType extends ScalarType {}
+
+    /** BooleanType. */
+    public static final class BooleanType extends ScalarType {}
+
+    /** BinaryType. */
+    public static final class BinaryType extends ScalarType {}
+
+    /** DecimalType. */
+    public static final class DecimalType extends ScalarType {
+        public final int precision;
+        public final int scale;
+
+        public DecimalType(int precision, int scale) {
+            this.precision = precision;
+            this.scale = scale;
+        }
+    }
+
+    /** DateType. */
+    public static final class DateType extends ScalarType {}
+
+    /** TimestampType. */
+    public static final class TimestampType extends ScalarType {}
+
+    /** TimestampNTZType. */
+    public static final class TimestampNTZType extends ScalarType {}
+
+    /** UuidType. */
+    public static final class UuidType extends ScalarType {}
+
+    // The index of the typed_value, value, and metadata fields in the schema, 
respectively. If a
+    // given field is not in the schema, its value must be set to -1 to 
indicate that it is invalid.
+    // The indices of valid fields should be contiguous and start from 0.
+    public int typedIdx;
+    public int variantIdx;
+    // topLevelMetadataIdx must be non-negative in the top-level schema, and 
-1 at all other nesting
+    // levels.
+    public final int topLevelMetadataIdx;
+    // The number of fields in the schema. I.e. a value between 1 and 3, 
depending on which of
+    // value,
+    // typed_value and metadata are present.
+    public final int numFields;
+
+    public final ScalarType scalarSchema;
+    public final ObjectField[] objectSchema;
+    // Map for fast lookup of object fields by name. The values are an index 
into `objectSchema`.
+    public final Map<String, Integer> objectSchemaMap;
+    public final VariantSchema arraySchema;
+
+    public VariantSchema(
+            int typedIdx,
+            int variantIdx,
+            int topLevelMetadataIdx,
+            int numFields,
+            ScalarType scalarSchema,
+            ObjectField[] objectSchema,
+            VariantSchema arraySchema) {
+        this.typedIdx = typedIdx;
+        this.numFields = numFields;
+        this.variantIdx = variantIdx;
+        this.topLevelMetadataIdx = topLevelMetadataIdx;
+        this.scalarSchema = scalarSchema;
+        this.objectSchema = objectSchema;
+        if (objectSchema != null) {
+            objectSchemaMap = new HashMap<>();
+            for (int i = 0; i < objectSchema.length; i++) {
+                objectSchemaMap.put(objectSchema[i].fieldName, i);
+            }
+        } else {
+            objectSchemaMap = null;
+        }
+
+        this.arraySchema = arraySchema;
+    }
+
+    public void setTypedIdx(int typedIdx) {
+        this.typedIdx = typedIdx;
+        this.variantIdx = -1;
+        setMetadata();
+    }
+
+    private byte[] metadata;
+
+    public byte[] getMetadata() {
+        return metadata;
+    }
+
+    public void setMetadata() {
+        try {
+            String s = new ObjectMapper().writeValueAsString(objectSchemaMap);
+            metadata = GenericVariant.fromJson(s).metadata();
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    // Return whether the variant column is unshrededed. The user is not 
required to do anything
+    // special, but can have certain optimizations for unshrededed variant.
+    public boolean isUnshredded() {
+        return topLevelMetadataIdx >= 0 && variantIdx >= 0 && typedIdx < 0;
+    }
+
+    @Override
+    public String toString() {
+        return "VariantSchema{"
+                + "typedIdx="
+                + typedIdx
+                + ", variantIdx="
+                + variantIdx
+                + ", topLevelMetadataIdx="
+                + topLevelMetadataIdx
+                + ", numFields="
+                + numFields
+                + ", scalarSchema="
+                + scalarSchema
+                + ", objectSchema="
+                + objectSchema
+                + ", arraySchema="
+                + arraySchema
+                + '}';
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
new file mode 100644
index 0000000000..2ca87b11e6
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+
+/* This file is based on source code from the Spark Project 
(http://spark.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Class to implement shredding a Variant value. */
+public class VariantShreddingWriter {
+
+    /**
+     * Interface to build up a shredded result. Callers should implement a 
ShreddedResultBuilder to
+     * create an empty result with a given schema. The castShredded method 
will call one or more of
+     * the add* methods to populate it.
+     */
+    public interface ShreddedResult {
+
+        // Create an array. The elements are the result of shredding each 
element.
+        void addArray(ShreddedResult[] array);
+
+        // Create an object. The values are the result of shredding each 
field, order by the index
+        // in objectSchema. Missing fields are populated with an empty result.
+        void addObject(ShreddedResult[] values);
+
+        void addVariantValue(byte[] result);
+
+        // Add a scalar to typed_value. The type of Object depends on the 
scalarSchema in the
+        // shredding schema.
+        void addScalar(Object result);
+
+        void addMetadata(byte[] result);
+    }
+
+    /** Shredded result builder. */
+    public interface ShreddedResultBuilder {
+        ShreddedResult createEmpty(VariantSchema schema);
+
+        // If true, we will shred decimals to a different scale or to 
integers, as long as they are
+        // numerically equivalent. Similarly, integers will be allowed to 
shred to decimals.
+        boolean allowNumericScaleChanges();
+    }
+
+    /**
+     * Converts an input variant into shredded components. Returns the 
shredded result, as well as
+     * the original Variant with shredded fields removed. `dataType` must be a 
valid shredding
+     * schema, as described in
+     * 
https://github.com/apache/parquet-format/blob/master/VariantShredding.md.
+     */
+    public static ShreddedResult castShredded(
+            GenericVariant v, VariantSchema schema, ShreddedResultBuilder 
builder) {
+        GenericVariantUtil.Type variantType = v.getType();
+        ShreddedResult result = builder.createEmpty(schema);
+
+        if (schema.topLevelMetadataIdx >= 0) {
+            result.addMetadata(v.metadata());
+        }
+
+        if (schema.arraySchema != null && variantType == 
GenericVariantUtil.Type.ARRAY) {
+            // The array element is always a struct containing untyped and 
typed fields.
+            VariantSchema elementSchema = schema.arraySchema;
+            int size = v.arraySize();
+            ShreddedResult[] array = new ShreddedResult[size];
+            for (int i = 0; i < size; ++i) {
+                ShreddedResult shreddedArray =
+                        castShredded(v.getElementAtIndex(i), elementSchema, 
builder);
+                array[i] = shreddedArray;
+            }
+            result.addArray(array);
+        } else if (schema.objectSchema != null && variantType == 
GenericVariantUtil.Type.OBJECT) {
+            VariantSchema.ObjectField[] objectSchema = schema.objectSchema;
+            ShreddedResult[] shreddedValues = new 
ShreddedResult[objectSchema.length];
+
+            // Create a variantBuilder for any field that exist in `v`, but 
not in the
+            // shredding schema.
+            GenericVariantBuilder variantBuilder = new 
GenericVariantBuilder(false);
+            ArrayList<GenericVariantBuilder.FieldEntry> fieldEntries = new 
ArrayList<>();
+            // Keep track of which schema fields we actually found in the 
Variant value.
+            int numFieldsMatched = 0;
+            int start = variantBuilder.getWritePos();
+            for (int i = 0; i < v.objectSize(); ++i) {
+                GenericVariant.ObjectField field = v.getFieldAtIndex(i);
+                Integer fieldIdx = schema.objectSchemaMap.get(field.key);
+                if (fieldIdx != null) {
+                    // The field exists in the shredding schema. Recursively 
shred, and write the
+                    // result.
+                    ShreddedResult shreddedField =
+                            castShredded(field.value, 
objectSchema[fieldIdx].schema, builder);
+                    shreddedValues[fieldIdx] = shreddedField;
+                    numFieldsMatched++;
+                } else {
+                    // The field is not shredded. Put it in the untyped_value 
column.
+                    int id = v.getDictionaryIdAtIndex(i);
+                    fieldEntries.add(
+                            new GenericVariantBuilder.FieldEntry(
+                                    field.key, id, 
variantBuilder.getWritePos() - start));
+                    // shallowAppendVariant is needed for correctness, since 
we're relying on the
+                    // metadata IDs being unchanged.
+                    variantBuilder.shallowAppendVariant(field.value);
+                }
+            }
+            if (numFieldsMatched < objectSchema.length) {
+                // Set missing fields to non-null with all fields set to null.
+                for (int i = 0; i < objectSchema.length; ++i) {
+                    if (shreddedValues[i] == null) {
+                        VariantSchema.ObjectField fieldSchema = 
objectSchema[i];
+                        ShreddedResult emptyChild = 
builder.createEmpty(fieldSchema.schema);
+                        shreddedValues[i] = emptyChild;
+                        numFieldsMatched += 1;
+                    }
+                }
+            }
+            if (numFieldsMatched != objectSchema.length) {
+                // Since we just filled in all the null entries, this can only 
happen if we tried to
+                // write to the same field twice; i.e. the Variant contained 
duplicate fields, which
+                // is invalid.
+                throw new RuntimeException();
+            }
+            result.addObject(shreddedValues);
+            if (variantBuilder.getWritePos() != start) {
+                // We added something to the untyped value.
+                variantBuilder.finishWritingObject(start, fieldEntries);
+                result.addVariantValue(variantBuilder.valueWithoutMetadata());
+            }
+        } else if (schema.scalarSchema != null) {
+            VariantSchema.ScalarType scalarType = schema.scalarSchema;
+            Object typedValue = tryTypedShred(v, variantType, scalarType, 
builder);
+            if (typedValue != null) {
+                // Store the typed value.
+                result.addScalar(typedValue);
+            } else {
+                GenericVariantBuilder variantBuilder = new 
GenericVariantBuilder(false);
+                variantBuilder.appendVariant(v);
+                result.addVariantValue(v.value());
+            }
+        } else {
+            // Store in untyped.
+            result.addVariantValue(v.value());
+        }
+        return result;
+    }
+
+    /**
+     * Tries to cast a Variant into a typed value. If the cast fails, returns 
null.
+     *
+     * @param v
+     * @param variantType The Variant Type of v
+     * @param targetType The target type
+     * @return The scalar value, or null if the cast is not valid.
+     */
+    private static Object tryTypedShred(
+            GenericVariant v,
+            GenericVariantUtil.Type variantType,
+            VariantSchema.ScalarType targetType,
+            ShreddedResultBuilder builder) {
+        switch (variantType) {
+            case LONG:
+                if (targetType instanceof VariantSchema.IntegralType) {
+                    // Check that the target type can hold the actual value.
+                    VariantSchema.IntegralType integralType =
+                            (VariantSchema.IntegralType) targetType;
+                    VariantSchema.IntegralSize size = integralType.size;
+                    long value = v.getLong();
+                    switch (size) {
+                        case BYTE:
+                            if (value == (byte) value) {
+                                return (byte) value;
+                            }
+                            break;
+                        case SHORT:
+                            if (value == (short) value) {
+                                return (short) value;
+                            }
+                            break;
+                        case INT:
+                            if (value == (int) value) {
+                                return (int) value;
+                            }
+                            break;
+                        case LONG:
+                            return value;
+                    }
+                } else if (targetType instanceof VariantSchema.DecimalType
+                        && builder.allowNumericScaleChanges()) {
+                    VariantSchema.DecimalType decimalType = 
(VariantSchema.DecimalType) targetType;
+                    // If the integer can fit in the given decimal precision, 
allow it.
+                    long value = v.getLong();
+                    // Set to the requested scale, and check if the precision 
is large enough.
+                    BigDecimal decimalValue = BigDecimal.valueOf(value);
+                    BigDecimal scaledValue = 
decimalValue.setScale(decimalType.scale);
+                    // The initial value should have scale 0, so rescaling 
shouldn't lose
+                    // information.
+                    assert (decimalValue.compareTo(scaledValue) == 0);
+                    if (scaledValue.precision() <= decimalType.precision) {
+                        return scaledValue;
+                    }
+                }
+                break;
+            case DECIMAL:
+                if (targetType instanceof VariantSchema.DecimalType) {
+                    VariantSchema.DecimalType decimalType = 
(VariantSchema.DecimalType) targetType;
+                    // Use getDecimalWithOriginalScale so that we retain scale 
information if
+                    // allowNumericScaleChanges() is false.
+                    BigDecimal value =
+                            
GenericVariantUtil.getDecimalWithOriginalScale(v.rawValue(), v.pos());
+                    if (value.precision() <= decimalType.precision
+                            && value.scale() == decimalType.scale) {
+                        return value;
+                    }
+                    if (builder.allowNumericScaleChanges()) {
+                        // Convert to the target scale, and see if it fits. 
Rounding mode doesn't
+                        // matter, since we'll reject it if it turned out to 
require rounding.
+                        BigDecimal scaledValue =
+                                value.setScale(decimalType.scale, 
RoundingMode.FLOOR);
+                        if (scaledValue.compareTo(value) == 0
+                                && scaledValue.precision() <= 
decimalType.precision) {
+                            return scaledValue;
+                        }
+                    }
+                } else if (targetType instanceof VariantSchema.IntegralType
+                        && builder.allowNumericScaleChanges()) {
+                    VariantSchema.IntegralType integralType =
+                            (VariantSchema.IntegralType) targetType;
+                    // Check if the decimal happens to be an integer.
+                    BigDecimal value = v.getDecimal();
+                    VariantSchema.IntegralSize size = integralType.size;
+                    // Try to cast to the appropriate type, and check if any 
information is lost.
+                    switch (size) {
+                        case BYTE:
+                            if 
(value.compareTo(BigDecimal.valueOf(value.byteValue())) == 0) {
+                                return value.byteValue();
+                            }
+                            break;
+                        case SHORT:
+                            if 
(value.compareTo(BigDecimal.valueOf(value.shortValue())) == 0) {
+                                return value.shortValue();
+                            }
+                            break;
+                        case INT:
+                            if 
(value.compareTo(BigDecimal.valueOf(value.intValue())) == 0) {
+                                return value.intValue();
+                            }
+                            break;
+                        case LONG:
+                            if 
(value.compareTo(BigDecimal.valueOf(value.longValue())) == 0) {
+                                return value.longValue();
+                            }
+                    }
+                }
+                break;
+            case BOOLEAN:
+                if (targetType instanceof VariantSchema.BooleanType) {
+                    return v.getBoolean();
+                }
+                break;
+            case STRING:
+                if (targetType instanceof VariantSchema.StringType) {
+                    return v.getString();
+                }
+                break;
+            case DOUBLE:
+                if (targetType instanceof VariantSchema.DoubleType) {
+                    return v.getDouble();
+                }
+                break;
+            case DATE:
+                if (targetType instanceof VariantSchema.DateType) {
+                    return (int) v.getLong();
+                }
+                break;
+            case TIMESTAMP:
+                if (targetType instanceof VariantSchema.TimestampType) {
+                    return v.getLong();
+                }
+                break;
+            case TIMESTAMP_NTZ:
+                if (targetType instanceof VariantSchema.TimestampNTZType) {
+                    return v.getLong();
+                }
+                break;
+            case FLOAT:
+                if (targetType instanceof VariantSchema.FloatType) {
+                    return v.getFloat();
+                }
+                break;
+            case BINARY:
+                if (targetType instanceof VariantSchema.BinaryType) {
+                    return v.getBinary();
+                }
+                break;
+            case UUID:
+                if (targetType instanceof VariantSchema.UuidType) {
+                    return v.getUuid();
+                }
+                break;
+        }
+        // The stored type does not match the requested shredding type. Return 
null, and the caller
+        // will store the result in untyped_value.
+        return null;
+    }
+
+    // Add the result to the shredding result.
+    private static void addVariantValueVariant(
+            Variant variantResult, VariantSchema schema, ShreddedResult 
result) {
+        result.addVariantValue(variantResult.value());
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
index dbe9bdf3c1..c226b9dead 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
@@ -18,6 +18,15 @@
 
 package org.apache.paimon.data.variant;
 
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
 import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
@@ -95,4 +104,138 @@ public class GenericVariantTest {
         assertThat(variant.variantGet("$.boolean2")).isEqualTo(false);
         assertThat(variant.variantGet("$.nullField")).isNull();
     }
+
+    @Test
+    public void testShredding() {
+        GenericVariant variant = GenericVariant.fromJson("{\"a\": 1, \"b\": 
\"hello\"}");
+
+        // Happy path
+        RowType shreddedType1 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.STRING()},
+                        new String[] {"a", "b"});
+        GenericRow expert1 =
+                GenericRow.of(
+                        variant.metadata(),
+                        null,
+                        GenericRow.of(
+                                GenericRow.of(null, 1),
+                                GenericRow.of(null, 
BinaryString.fromString("hello"))));
+        testShreddingResult(variant, shreddedType1, expert1);
+
+        // Missing field
+        RowType shreddedType2 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.STRING(), 
DataTypes.STRING()},
+                        new String[] {"a", "c", "b"});
+        GenericRow expert2 =
+                GenericRow.of(
+                        variant.metadata(),
+                        null,
+                        GenericRow.of(
+                                GenericRow.of(null, 1),
+                                GenericRow.of(null, null),
+                                GenericRow.of(null, 
BinaryString.fromString("hello"))));
+        testShreddingResult(variant, shreddedType2, expert2);
+
+        // "a" is not present in shredding schema
+        RowType shreddedType3 =
+                RowType.of(
+                        new DataType[] {DataTypes.STRING(), 
DataTypes.STRING()},
+                        new String[] {"b", "c"});
+        GenericRow expert3 =
+                GenericRow.of(
+                        variant.metadata(),
+                        untypedValue("{\"a\": 1}"),
+                        GenericRow.of(
+                                GenericRow.of(null, 
BinaryString.fromString("hello")),
+                                GenericRow.of(null, null)));
+        testShreddingResult(variant, shreddedType3, expert3);
+    }
+
+    @Test
+    public void testShreddingAllTypes() {
+        String json =
+                "{\n"
+                        + "  \"c1\": \"Hello, World!\",\n"
+                        + "  \"c2\": 12345678901234,\n"
+                        + "  \"c3\": 
1.0123456789012345678901234567890123456789,\n"
+                        + "  \"c4\": 100.99,\n"
+                        + "  \"c5\": true,\n"
+                        + "  \"c6\": null,\n"
+                        + "  \"c7\": {\"street\" : \"Main St\",\"city\" : 
\"Hangzhou\"},\n"
+                        + "  \"c8\": [1, 2]\n"
+                        + "}\n";
+        GenericVariant variant = GenericVariant.fromJson(json);
+        RowType shreddedType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.STRING(),
+                            DataTypes.BIGINT(),
+                            DataTypes.DOUBLE(),
+                            DataTypes.DECIMAL(5, 2),
+                            DataTypes.BOOLEAN(),
+                            DataTypes.STRING(),
+                            RowType.of(
+                                    new DataType[] {DataTypes.STRING(), 
DataTypes.STRING()},
+                                    new String[] {"street", "city"}),
+                            DataTypes.ARRAY(DataTypes.INT())
+                        },
+                        new String[] {"c1", "c2", "c3", "c4", "c5", "c6", 
"c7", "c8"});
+        GenericRow expert1 =
+                GenericRow.of(
+                        variant.metadata(),
+                        null,
+                        GenericRow.of(
+                                GenericRow.of(null, 
BinaryString.fromString("Hello, World!")),
+                                GenericRow.of(null, 12345678901234L),
+                                GenericRow.of(null, 
1.0123456789012345678901234567890123456789D),
+                                GenericRow.of(
+                                        null,
+                                        Decimal.fromBigDecimal(new 
BigDecimal("100.99"), 5, 2)),
+                                GenericRow.of(null, true),
+                                GenericRow.of(new byte[] {0}, null),
+                                GenericRow.of(
+                                        null,
+                                        GenericRow.of(
+                                                GenericRow.of(
+                                                        null, 
BinaryString.fromString("Main St")),
+                                                GenericRow.of(
+                                                        null,
+                                                        
BinaryString.fromString("Hangzhou")))),
+                                GenericRow.of(
+                                        null,
+                                        new GenericArray(
+                                                new GenericRow[] {
+                                                    GenericRow.of(null, 1), 
GenericRow.of(null, 2)
+                                                }))));
+        testShreddingResult(variant, shreddedType1, expert1);
+
+        // test no shredding
+        RowType shreddedType2 =
+                RowType.of(new DataType[] {DataTypes.STRING()}, new String[] 
{"other"});
+        GenericRow expert2 =
+                GenericRow.of(
+                        variant.metadata(),
+                        untypedValue(json),
+                        GenericRow.of(GenericRow.of(null, null)));
+        testShreddingResult(variant, shreddedType2, expert2);
+    }
+
+    private byte[] untypedValue(String input) {
+        return GenericVariant.fromJson(input).value();
+    }
+
+    private void testShreddingResult(
+            GenericVariant variant, RowType shreddedType, InternalRow 
expected) {
+        RowType shreddingSchema = 
PaimonShreddingUtils.variantShreddingSchema(shreddedType);
+        VariantSchema variantSchema = 
PaimonShreddingUtils.buildVariantSchema(shreddingSchema);
+        // test cast shredded
+        InternalRow shredded = PaimonShreddingUtils.castShredded(variant, 
variantSchema);
+        assertThat(shredded).isEqualTo(expected);
+
+        // test rebuild
+        Variant rebuild = PaimonShreddingUtils.rebuild(shredded, 
variantSchema);
+        assertThat(variant.toJson()).isEqualTo(rebuild.toJson());
+    }
 }

Reply via email to