This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5949538651 [variant] Introduce assemble shredded variant struct 
function (#6652)
5949538651 is described below

commit 5949538651104cd071858da7139b157f4078131b
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Nov 24 16:08:59 2025 +0800

    [variant] Introduce assemble shredded variant struct function (#6652)
---
 LICENSE                                            |   1 +
 .../paimon/data/variant/BaseVariantReader.java     | 519 +++++++++++++++++++++
 .../apache/paimon/data/variant/GenericVariant.java |  46 +-
 .../paimon/data/variant/PaimonShreddingUtils.java  | 312 ++++++++++++-
 .../org/apache/paimon/data/variant/Variant.java    |  16 +-
 .../paimon/data/variant/VariantCastArgs.java       |  46 ++
 .../org/apache/paimon/data/variant/VariantGet.java | 184 ++++++++
 .../{PathSegment.java => VariantPathSegment.java}  |  73 +--
 .../apache/paimon/data/variant/VariantSchema.java  |   8 +
 .../data/variant/VariantShreddingWriter.java       |   2 -
 .../paimon/data/variant/GenericVariantTest.java    |  71 ++-
 .../data/variant/PaimonShreddingUtilsTest.java     | 317 +++++++++++++
 12 files changed, 1492 insertions(+), 103 deletions(-)

diff --git a/LICENSE b/LICENSE
index 7c5f46ffe4..16447645b8 100644
--- a/LICENSE
+++ b/LICENSE
@@ -295,6 +295,7 @@ 
paimon-format/test/main/java/org/apache/paimon/format/parquet/newReader/DeltaLen
 paimon-common/src/main/java/org/apache/paimon/data/variant/ShreddingUtils.java
 paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
 
paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
+paimon-common/src/main/java/org/apache/paimon/data/variant/BaseVariantReader.java
 from https://spark.apache.org/ version 4.0.0-preview2
 
 MIT License
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/BaseVariantReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/BaseVariantReader.java
new file mode 100644
index 0000000000..7b8872973f
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/BaseVariantReader.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VariantType;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.List;
+
+import static org.apache.paimon.data.variant.GenericVariantUtil.Type;
+import static 
org.apache.paimon.data.variant.GenericVariantUtil.malformedVariant;
+
+/* This file is based on source code from the Spark Project 
(http://spark.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * The base class to read variant values into a Paimon type. For convenience, 
we also allow creating
+ * an instance of the base class itself. None of its functions can be used, 
but it can serve as a
+ * container of `targetType` and `castArgs`.
+ */
+public class BaseVariantReader {
+
+    protected final VariantSchema schema;
+    protected final DataType targetType;
+    protected final VariantCastArgs castArgs;
+
+    public BaseVariantReader(VariantSchema schema, DataType targetType, 
VariantCastArgs castArgs) {
+        this.schema = schema;
+        this.targetType = targetType;
+        this.castArgs = castArgs;
+    }
+
+    public VariantSchema schema() {
+        return schema;
+    }
+
+    public DataType targetType() {
+        return targetType;
+    }
+
+    public VariantCastArgs castArgs() {
+        return castArgs;
+    }
+
+    /**
+     * Read from a row containing a variant value (shredded or unshredded) and 
return a value of
+     * `targetType`. The row schema is described by `schema`. This function 
throws MALFORMED_VARIANT
+     * if the variant is missing. If the variant can be legally missing (the 
only possible situation
+     * is struct fields in object `typed_value`), the caller should check for 
it and avoid calling
+     * this function if the variant is missing.
+     */
+    public Object read(InternalRow row, byte[] topLevelMetadata) {
+        if (schema.typedIdx < 0 || row.isNullAt(schema.typedIdx)) {
+            if (schema.variantIdx < 0 || row.isNullAt(schema.variantIdx)) {
+                // Both `typed_value` and `value` are null, meaning the 
variant is missing.
+                throw malformedVariant();
+            }
+            GenericVariant variant =
+                    new GenericVariant(row.getBinary(schema.variantIdx), 
topLevelMetadata);
+            return VariantGet.cast(variant, targetType, castArgs);
+        } else {
+            return readFromTyped(row, topLevelMetadata);
+        }
+    }
+
+    /** Subclasses should override it to produce the read result when 
`typed_value` is not null. */
+    protected Object readFromTyped(InternalRow row, byte[] topLevelMetadata) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** A util function to rebuild the variant in binary format from a variant 
value. */
+    protected Variant rebuildVariant(InternalRow row, byte[] topLevelMetadata) 
{
+        GenericVariantBuilder builder = new GenericVariantBuilder(false);
+        ShreddingUtils.rebuild(
+                new PaimonShreddingUtils.PaimonShreddedRow(row), 
topLevelMetadata, schema, builder);
+        return builder.result();
+    }
+
+    /** A util function to throw error or return null when an invalid cast 
happens. */
+    protected Object invalidCast(InternalRow row, byte[] topLevelMetadata) {
+        return VariantGet.invalidCast(rebuildVariant(row, topLevelMetadata), 
targetType, castArgs);
+    }
+
+    /**
+     * Create a reader for `targetType`. If `schema` is null, meaning that the 
extraction path
+     * doesn't exist in `typed_value`, it returns an instance of 
`BaseVariantReader`. As described
+     * in the class comment, the reader is only a container of `targetType` 
and `castArgs` in this
+     * case.
+     */
+    public static BaseVariantReader create(
+            @Nullable VariantSchema schema,
+            DataType targetType,
+            VariantCastArgs castArgs,
+            boolean isTopLevelUnshredded) {
+        if (schema == null) {
+            return new BaseVariantReader(null, targetType, castArgs);
+        }
+
+        if (targetType instanceof RowType) {
+            return new RowReader(schema, (RowType) targetType, castArgs);
+        } else if (targetType instanceof ArrayType) {
+            return new ArrayReader(schema, (ArrayType) targetType, castArgs);
+        } else if (targetType instanceof MapType
+                && ((MapType) 
targetType).getKeyType().equals(DataTypes.STRING())) {
+            if (((MapType) 
targetType).getKeyType().equals(DataTypes.STRING())) {
+                return new MapReader(schema, (MapType) targetType, castArgs);
+            } else {
+                throw new UnsupportedOperationException();
+            }
+        } else if (targetType instanceof VariantType) {
+            return new VariantReader(
+                    schema, (VariantType) targetType, castArgs, 
isTopLevelUnshredded);
+        } else {
+            return new ScalarReader(schema, targetType, castArgs);
+        }
+    }
+
+    /**
+     * Read variant values into a Paimon row type. It reads unshredded fields 
(fields that are not
+     * in the typed object) from the `value`, and reads the shredded fields 
from the object
+     * `typed_value`. `value` must not contain any shredded field according to 
the shredding spec,
+     * but this requirement is not enforced. If `value` does contain a 
shredded field, no error will
+     * occur, and the field in object `typed_value` will be the final result.
+     */
+    private static final class RowReader extends BaseVariantReader {
+
+        // For each field in `targetType`, store the index of the field with 
the same name in object
+        // `typed_value`, or -1 if it doesn't exist in object `typed_value`.
+        private final int[] fieldInputIndices;
+
+        // For each field in `targetType`, store the reader from the 
corresponding field in object
+        // `typed_value`, or null if it doesn't exist in object `typed_value`.
+        private final BaseVariantReader[] fieldReaders;
+
+        // If all fields in `targetType` can be found in object `typed_value`, 
then the reader
+        // doesn't
+        // need to read from `value`.
+        private final boolean needUnshreddedObject;
+
+        public RowReader(VariantSchema schema, RowType targetType, 
VariantCastArgs castArgs) {
+            super(schema, targetType, castArgs);
+
+            List<DataField> targetFields = targetType.getFields();
+            this.fieldInputIndices = new int[targetFields.size()];
+            for (int i = 0; i < targetFields.size(); i++) {
+                fieldInputIndices[i] =
+                        schema.objectSchemaMap != null
+                                ? 
schema.objectSchemaMap.get(targetFields.get(i).name())
+                                : -1;
+            }
+
+            this.fieldReaders = new BaseVariantReader[targetFields.size()];
+            for (int i = 0; i < targetFields.size(); i++) {
+                int inputIdx = fieldInputIndices[i];
+                if (inputIdx >= 0) {
+                    VariantSchema fieldSchema = 
schema.objectSchema[inputIdx].schema();
+                    fieldReaders[i] =
+                            BaseVariantReader.create(
+                                    fieldSchema, targetFields.get(i).type(), 
castArgs, false);
+                } else {
+                    fieldReaders[i] = null;
+                }
+            }
+
+            // Check if any field is missing (i.e., index == -1)
+            boolean needsUnshredded = false;
+            for (int idx : fieldInputIndices) {
+                if (idx < 0) {
+                    needsUnshredded = true;
+                    break;
+                }
+            }
+            this.needUnshreddedObject = needsUnshredded;
+        }
+
+        @Override
+        public Object readFromTyped(InternalRow row, byte[] topLevelMetadata) {
+            if (schema.objectSchema == null) {
+                return invalidCast(row, topLevelMetadata);
+            }
+
+            InternalRow obj = row.getRow(schema.typedIdx, 
schema.objectSchema.length);
+            GenericRow result = new GenericRow(fieldInputIndices.length);
+            GenericVariant unshreddedObject = null;
+
+            if (needUnshreddedObject
+                    && schema.variantIdx >= 0
+                    && !row.isNullAt(schema.variantIdx)) {
+                unshreddedObject =
+                        new GenericVariant(row.getBinary(schema.variantIdx), 
topLevelMetadata);
+                if (unshreddedObject.getType() != Type.OBJECT) {
+                    throw malformedVariant();
+                }
+            }
+
+            int numFields = fieldInputIndices.length;
+            int i = 0;
+            while (i < numFields) {
+                int inputIdx = fieldInputIndices[i];
+                if (inputIdx >= 0) {
+                    // Shredded field must not be null.
+                    if (obj.isNullAt(inputIdx)) {
+                        throw malformedVariant();
+                    }
+
+                    VariantSchema fieldSchema = 
schema.objectSchema[inputIdx].schema();
+                    InternalRow fieldInput = obj.getRow(inputIdx, 
fieldSchema.numFields);
+                    // Only read from the shredded field if it is not missing.
+                    if ((fieldSchema.typedIdx >= 0 && 
!fieldInput.isNullAt(fieldSchema.typedIdx))
+                            || (fieldSchema.variantIdx >= 0
+                                    && 
!fieldInput.isNullAt(fieldSchema.variantIdx))) {
+                        Object fieldValue = fieldReaders[i].read(fieldInput, 
topLevelMetadata);
+                        result.setField(i, fieldValue);
+                    }
+                } else if (unshreddedObject != null) {
+                    DataField field = ((RowType) targetType).getField(i);
+                    String fieldName = field.name();
+                    DataType fieldType = field.type();
+                    GenericVariant unshreddedField = 
unshreddedObject.getFieldByKey(fieldName);
+                    if (unshreddedField != null) {
+                        Object castedValue = VariantGet.cast(unshreddedField, 
fieldType, castArgs);
+                        result.setField(i, castedValue);
+                    }
+                }
+                i += 1;
+            }
+            return result;
+        }
+    }
+
+    /** Read Parquet variant values into a Paimon array type. */
+    private static final class ArrayReader extends BaseVariantReader {
+
+        private final BaseVariantReader elementReader;
+
+        public ArrayReader(VariantSchema schema, ArrayType targetType, 
VariantCastArgs castArgs) {
+            super(schema, targetType, castArgs);
+            if (schema.arraySchema != null) {
+                this.elementReader =
+                        BaseVariantReader.create(
+                                schema.arraySchema, 
targetType.getElementType(), castArgs, false);
+            } else {
+                this.elementReader = null;
+            }
+        }
+
+        @Override
+        protected Object readFromTyped(InternalRow row, byte[] 
topLevelMetadata) {
+            if (schema.arraySchema == null) {
+                return invalidCast(row, topLevelMetadata);
+            }
+
+            int elementNumFields = schema.arraySchema.numFields;
+            InternalArray arr = row.getArray(schema.typedIdx);
+            int size = arr.size();
+            Object[] result = new Object[size];
+            int i = 0;
+            while (i < size) {
+                if (arr.isNullAt(i)) {
+                    throw malformedVariant();
+                }
+                result[i] = elementReader.read(arr.getRow(i, 
elementNumFields), topLevelMetadata);
+                i += 1;
+            }
+            return new GenericArray(result);
+        }
+    }
+
+    /**
+     * Read variant values into a Paimon map type with string key type. The 
input must be object for
+     * a valid cast. The resulting map contains shredded fields from object 
`typed_value` and
+     * unshredded fields from object `value`. `value` must not contain any 
shredded field according
+     * to the shredding spec. Unlike `StructReader`, this requirement is 
enforced in `MapReader`. If
+     * `value` does contain a shredded field, throw a MALFORMED_VARIANT error. 
The purpose is to
+     * avoid duplicate map keys.
+     */
+    private static final class MapReader extends BaseVariantReader {
+
+        private final BaseVariantReader[] valueReaders;
+        private final BinaryString[] shreddedFieldNames;
+        private final MapType targetType;
+
+        public MapReader(VariantSchema schema, MapType targetType, 
VariantCastArgs castArgs) {
+            super(schema, targetType, castArgs);
+            this.targetType = targetType;
+
+            if (schema.objectSchema != null) {
+                int len = schema.objectSchema.length;
+                this.valueReaders = new BaseVariantReader[len];
+                this.shreddedFieldNames = new BinaryString[len];
+
+                for (int i = 0; i < len; i++) {
+                    VariantSchema.ObjectField fieldInfo = 
schema.objectSchema[i];
+                    this.valueReaders[i] =
+                            BaseVariantReader.create(
+                                    fieldInfo.schema(), 
targetType.getValueType(), castArgs, false);
+                    this.shreddedFieldNames[i] = 
BinaryString.fromString(fieldInfo.fieldName());
+                }
+            } else {
+                this.valueReaders = null;
+                this.shreddedFieldNames = null;
+            }
+        }
+
+        @Override
+        public Object readFromTyped(InternalRow row, byte[] topLevelMetadata) {
+            if (schema.objectSchema == null) {
+                return invalidCast(row, topLevelMetadata);
+            }
+
+            InternalRow obj = row.getRow(schema.typedIdx, 
schema.objectSchema.length);
+            int numShreddedFields = (valueReaders != null) ? 
valueReaders.length : 0;
+
+            GenericVariant unshreddedObject = null;
+            if (schema.variantIdx >= 0 && !row.isNullAt(schema.variantIdx)) {
+                unshreddedObject =
+                        new GenericVariant(row.getBinary(schema.variantIdx), 
topLevelMetadata);
+                if (unshreddedObject.getType() != Type.OBJECT) {
+                    throw malformedVariant();
+                }
+            }
+
+            int numUnshreddedFields =
+                    (unshreddedObject != null) ? unshreddedObject.objectSize() 
: 0;
+            int totalCapacity = numShreddedFields + numUnshreddedFields;
+
+            HashMap<BinaryString, Object> map = new HashMap<>(totalCapacity);
+            int i = 0;
+            while (i < numShreddedFields) {
+                // Shredded field must not be null.
+                if (obj.isNullAt(i)) {
+                    throw malformedVariant();
+                }
+                VariantSchema fieldSchema = schema.objectSchema[i].schema();
+                InternalRow fieldInput = obj.getRow(i, fieldSchema.numFields);
+                // Only add the shredded field to map if it is not missing.
+                if ((fieldSchema.typedIdx >= 0 && 
!fieldInput.isNullAt(fieldSchema.typedIdx))
+                        || (fieldSchema.variantIdx >= 0
+                                && 
!fieldInput.isNullAt(fieldSchema.variantIdx))) {
+                    map.put(
+                            shreddedFieldNames[i],
+                            valueReaders[i].read(fieldInput, 
topLevelMetadata));
+                }
+                i += 1;
+            }
+            i = 0;
+            while (i < numUnshreddedFields) {
+                GenericVariant.ObjectField field = 
unshreddedObject.getFieldAtIndex(i);
+                if (schema.objectSchemaMap.containsKey(field.key)) {
+                    throw malformedVariant();
+                }
+                map.put(
+                        BinaryString.fromString(field.key),
+                        VariantGet.cast(field.value, 
targetType.getValueType(), castArgs));
+                i += 1;
+            }
+            return new GenericMap(map);
+        }
+    }
+
+    /** Read variant values into a Paimon variant type (the binary format). */
+    private static final class VariantReader extends BaseVariantReader {
+
+        // An optional optimization: the user can set it to true if the 
variant column is
+        // unshredded and the extraction path is empty. We are not required to 
do anything special,
+        // but
+        // we can avoid rebuilding variant for optimization purpose.
+        private final boolean isTopLevelUnshredded;
+
+        public VariantReader(
+                VariantSchema schema,
+                VariantType targetType,
+                VariantCastArgs castArgs,
+                boolean isTopLevelUnshredded) {
+            super(schema, targetType, castArgs);
+            this.isTopLevelUnshredded = isTopLevelUnshredded;
+        }
+
+        @Override
+        public Object read(InternalRow row, byte[] topLevelMetadata) {
+            if (isTopLevelUnshredded) {
+                if (row.isNullAt(schema.variantIdx)) {
+                    throw malformedVariant();
+                }
+                return new GenericVariant(row.getBinary(schema.variantIdx), 
topLevelMetadata);
+            }
+            return rebuildVariant(row, topLevelMetadata);
+        }
+    }
+
+    /**
+     * Read variant values into a Paimon scalar type. When `typed_value` is 
not null but not a
+     * scalar, all other target types should return an invalid cast, but only 
the string target type
+     * can still build a string from array/object `typed_value`. For scalar 
`typed_value`, it
+     * depends on `ScalarCastHelper` to perform the cast. According to the 
shredding spec, scalar
+     * `typed_value` and `value` must not be non-null at the same time. The 
requirement is not
+     * enforced in this reader. If they are both non-null, no error will 
occur, and the reader will
+     * read from `typed_value`.
+     */
+    private static final class ScalarReader extends BaseVariantReader {
+
+        private final DataType scalaType;
+        @Nullable private final CastExecutor<Object, Object> resolve;
+        private final boolean noNeedCast;
+
+        public ScalarReader(VariantSchema schema, DataType targetType, 
VariantCastArgs castArgs) {
+            super(schema, targetType, castArgs);
+            if (schema.scalarSchema != null) {
+                scalaType = 
PaimonShreddingUtils.scalarSchemaToPaimonType(schema.scalarSchema);
+                noNeedCast = scalaType.equals(targetType);
+                resolve =
+                        noNeedCast
+                                ? null
+                                : (CastExecutor<Object, Object>)
+                                        CastExecutors.resolve(scalaType, 
targetType);
+            } else {
+                scalaType = null;
+                noNeedCast = false;
+                resolve = null;
+            }
+        }
+
+        @Override
+        protected Object readFromTyped(InternalRow row, byte[] 
topLevelMetadata) {
+            if (!noNeedCast && resolve == null) {
+                if (targetType.equals(DataTypes.STRING())) {
+                    return BinaryString.fromString(
+                            rebuildVariant(row, 
topLevelMetadata).toJson(castArgs.zoneId()));
+                } else {
+                    return invalidCast(row, topLevelMetadata);
+                }
+            }
+
+            int typedValueIdx = schema.typedIdx;
+
+            if (row.isNullAt(typedValueIdx)) {
+                return null;
+            }
+
+            Object i;
+            if (scalaType.equals(DataTypes.STRING())) {
+                i = row.getString(typedValueIdx);
+            } else if (scalaType instanceof TinyIntType) {
+                i = row.getByte(typedValueIdx);
+            } else if (scalaType instanceof SmallIntType) {
+                i = row.getShort(typedValueIdx);
+            } else if (scalaType instanceof IntType) {
+                i = row.getInt(typedValueIdx);
+            } else if (scalaType instanceof BigIntType) {
+                i = row.getLong(typedValueIdx);
+            } else if (scalaType instanceof FloatType) {
+                i = row.getFloat(typedValueIdx);
+            } else if (scalaType instanceof DoubleType) {
+                i = row.getDouble(typedValueIdx);
+            } else if (scalaType instanceof BooleanType) {
+                i = row.getBoolean(typedValueIdx);
+            } else if (scalaType.equals(DataTypes.BYTES())) {
+                i = row.getBinary(typedValueIdx);
+            } else if (scalaType instanceof DecimalType) {
+                i =
+                        row.getDecimal(
+                                typedValueIdx,
+                                ((DecimalType) scalaType).getPrecision(),
+                                ((DecimalType) scalaType).getScale());
+            } else {
+                throw new UnsupportedOperationException("Unsupported scalar 
type: " + scalaType);
+            }
+            if (noNeedCast) {
+                return i;
+            }
+            try {
+                return resolve.cast(i);
+            } catch (Exception e) {
+                return invalidCast(row, topLevelMetadata);
+            }
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
index 7463490d57..89181c72e0 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java
@@ -18,6 +18,10 @@
 
 package org.apache.paimon.data.variant;
 
+import org.apache.paimon.data.variant.VariantPathSegment.ArrayExtraction;
+import org.apache.paimon.data.variant.VariantPathSegment.ObjectExtraction;
+import org.apache.paimon.types.DataType;
+
 import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 
@@ -134,13 +138,9 @@ public final class GenericVariant implements Variant, 
Serializable {
         }
     }
 
-    @Override
-    public String toJson() {
-        return toJson(ZoneOffset.UTC);
-    }
-
     // Stringify the variant in JSON format.
     // Throw `MALFORMED_VARIANT` if the variant is malformed.
+    @Override
     public String toJson(ZoneId zoneId) {
         StringBuilder sb = new StringBuilder();
         toJsonImpl(value, metadata, pos, sb, zoneId);
@@ -152,39 +152,19 @@ public final class GenericVariant implements Variant, 
Serializable {
         return toJson();
     }
 
-    public Object variantGet(String path) {
+    public Object variantGet(String path, DataType dataType, VariantCastArgs 
castArgs) {
         GenericVariant v = this;
-        PathSegment[] parsedPath = PathSegment.parse(path);
-        for (PathSegment pathSegment : parsedPath) {
-            if (pathSegment.isKey() && v.getType() == Type.OBJECT) {
-                v = v.getFieldByKey(pathSegment.getKey());
-            } else if (pathSegment.isIndex() && v.getType() == Type.ARRAY) {
-                v = v.getElementAtIndex(pathSegment.getIndex());
+        VariantPathSegment[] parsedPath = VariantPathSegment.parse(path);
+        for (VariantPathSegment pathSegment : parsedPath) {
+            if (pathSegment instanceof ObjectExtraction && v.getType() == 
Type.OBJECT) {
+                v = v.getFieldByKey(((ObjectExtraction) pathSegment).getKey());
+            } else if (pathSegment instanceof ArrayExtraction && v.getType() 
== Type.ARRAY) {
+                v = v.getElementAtIndex(((ArrayExtraction) 
pathSegment).getIndex());
             } else {
                 return null;
             }
         }
-
-        switch (v.getType()) {
-            case OBJECT:
-            case ARRAY:
-                return v.toJson();
-            case STRING:
-                return v.getString();
-            case LONG:
-                return v.getLong();
-            case DOUBLE:
-                return v.getDouble();
-            case DECIMAL:
-                return v.getDecimal();
-            case BOOLEAN:
-                return v.getBoolean();
-            case NULL:
-                return null;
-            default:
-                // todo: support other types
-                throw new IllegalArgumentException("Unsupported type: " + 
v.getType());
-        }
+        return VariantGet.cast(v, dataType, castArgs);
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
index fd8285a151..b239c5cf98 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
@@ -28,6 +28,8 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.columnar.RowColumnVector;
 import org.apache.paimon.data.columnar.writable.WritableBytesVector;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.data.variant.VariantPathSegment.ArrayExtraction;
+import org.apache.paimon.data.variant.VariantPathSegment.ObjectExtraction;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
@@ -41,6 +43,10 @@ import java.util.List;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.data.variant.GenericVariantUtil.Type.ARRAY;
+import static org.apache.paimon.data.variant.GenericVariantUtil.Type.OBJECT;
+import static 
org.apache.paimon.data.variant.GenericVariantUtil.malformedVariant;
+
 /** Utils for paimon shredding. */
 public class PaimonShreddingUtils {
 
@@ -134,8 +140,76 @@ public class PaimonShreddingUtils {
         }
     }
 
+    /** The search result of a `VariantPathSegment` in a `VariantSchema`. */
+    public static class SchemaPathSegment {
+
+        private final VariantPathSegment rawPath;
+
+        // Whether this path segment is an object or array extraction.
+        private final boolean isObject;
+        // `schema.typedIdx`, if the path exists in the schema (for object 
extraction, the schema
+        // should contain an object `typed_value` containing the requested 
field; similar for array
+        // extraction). Negative otherwise.
+        private final int typedIdx;
+
+        // For object extraction, it is the index of the desired field in 
`schema.objectSchema`. If
+        // the
+        // requested field doesn't exist, both `extractionIdx/typedIdx` are 
set to negative.
+        // For array extraction, it is the array index. The information is 
already stored in
+        // `rawPath`,
+        // but accessing a raw int should be more efficient than `rawPath`, 
which is an `Either`.
+        private final int extractionIdx;
+
+        public SchemaPathSegment(
+                VariantPathSegment rawPath, boolean isObject, int typedIdx, 
int extractionIdx) {
+            this.rawPath = rawPath;
+            this.isObject = isObject;
+            this.typedIdx = typedIdx;
+            this.extractionIdx = extractionIdx;
+        }
+
+        public VariantPathSegment rawPath() {
+            return rawPath;
+        }
+
+        public boolean isObject() {
+            return isObject;
+        }
+
+        public int typedIdx() {
+            return typedIdx;
+        }
+
+        public int extractionIdx() {
+            return extractionIdx;
+        }
+    }
+
+    /**
+     * Represent a single field in a variant struct, that is a single 
requested field that the scan
+     * should produce by extracting from the variant column.
+     */
+    public static class FieldToExtract {
+
+        private final SchemaPathSegment[] path;
+        private final BaseVariantReader reader;
+
+        public FieldToExtract(SchemaPathSegment[] path, BaseVariantReader 
reader) {
+            this.path = path;
+            this.reader = reader;
+        }
+
+        public SchemaPathSegment[] path() {
+            return path;
+        }
+
+        public BaseVariantReader reader() {
+            return reader;
+        }
+    }
+
     public static RowType variantShreddingSchema(RowType rowType) {
-        return variantShreddingSchema(rowType, true);
+        return variantShreddingSchema(rowType, true, false);
     }
 
     /**
@@ -146,10 +220,11 @@ public class PaimonShreddingUtils {
      * binary, value: binary, typed_value: struct&lt; a: 
struct&lt;typed_value: int, value:
      * binary&gt;, b: struct&lt;typed_value: string, value: binary&gt;&gt;&gt;
      */
-    private static RowType variantShreddingSchema(DataType dataType, boolean 
topLevel) {
+    private static RowType variantShreddingSchema(
+            DataType dataType, boolean isTopLevel, boolean isObjectField) {
         RowType.Builder builder = RowType.builder();
-        if (topLevel) {
-            builder.field(METADATA_FIELD_NAME, DataTypes.BYTES());
+        if (isTopLevel) {
+            builder.field(METADATA_FIELD_NAME, DataTypes.BYTES().copy(false));
         }
         switch (dataType.getTypeRoot()) {
             case ARRAY:
@@ -157,7 +232,7 @@ public class PaimonShreddingUtils {
                 ArrayType shreddedArrayType =
                         new ArrayType(
                                 arrayType.isNullable(),
-                                
variantShreddingSchema(arrayType.getElementType(), false));
+                                
variantShreddingSchema(arrayType.getElementType(), false, false));
                 builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
                 builder.field(TYPED_VALUE_FIELD_NAME, shreddedArrayType);
                 break;
@@ -173,14 +248,21 @@ public class PaimonShreddingUtils {
                                                 field ->
                                                         field.newType(
                                                                 
variantShreddingSchema(
-                                                                               
 field.type(), false)
+                                                                               
 field.type(),
+                                                                               
 false,
+                                                                               
 true)
                                                                         
.notNull()))
                                         .collect(Collectors.toList()));
                 builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
                 builder.field(TYPED_VALUE_FIELD_NAME, shreddedRowType);
                 break;
             case VARIANT:
-                builder.field(VARIANT_VALUE_FIELD_NAME, DataTypes.BYTES());
+                // For Variant, we don't need a typed column. If there is no 
typed column, value is
+                // required
+                // for array elements or top-level fields, but optional for 
objects (where a null
+                // represents
+                // a missing field).
+                builder.field(VARIANT_VALUE_FIELD_NAME, 
DataTypes.BYTES().copy(isObjectField));
                 break;
             case CHAR:
             case VARCHAR:
@@ -345,6 +427,45 @@ public class PaimonShreddingUtils {
                 arraySchema);
     }
 
+    public static DataType scalarSchemaToPaimonType(VariantSchema.ScalarType 
scala) {
+        if (scala instanceof VariantSchema.StringType) {
+            return DataTypes.STRING();
+        } else if (scala instanceof VariantSchema.IntegralType) {
+            VariantSchema.IntegralType it = (VariantSchema.IntegralType) scala;
+            switch (it.size) {
+                case BYTE:
+                    return DataTypes.TINYINT();
+                case SHORT:
+                    return DataTypes.SMALLINT();
+                case INT:
+                    return DataTypes.INT();
+                case LONG:
+                    return DataTypes.BIGINT();
+                default:
+                    throw new UnsupportedOperationException();
+            }
+        } else if (scala instanceof VariantSchema.FloatType) {
+            return DataTypes.FLOAT();
+        } else if (scala instanceof VariantSchema.DoubleType) {
+            return DataTypes.DOUBLE();
+        } else if (scala instanceof VariantSchema.BooleanType) {
+            return DataTypes.BOOLEAN();
+        } else if (scala instanceof VariantSchema.BinaryType) {
+            return DataTypes.BYTES();
+        } else if (scala instanceof VariantSchema.DecimalType) {
+            VariantSchema.DecimalType dt = (VariantSchema.DecimalType) scala;
+            return DataTypes.DECIMAL(dt.precision, dt.scale);
+        } else if (scala instanceof VariantSchema.DateType) {
+            return DataTypes.DATE();
+        } else if (scala instanceof VariantSchema.TimestampType) {
+            return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
+        } else if (scala instanceof VariantSchema.TimestampNTZType) {
+            return DataTypes.TIMESTAMP();
+        } else {
+            throw new UnsupportedOperationException();
+        }
+    }
+
     private static RuntimeException invalidVariantShreddingSchema(DataType 
dataType) {
         return new RuntimeException("Invalid variant shredding schema: " + 
dataType);
     }
@@ -428,9 +549,178 @@ public class PaimonShreddingUtils {
                 .row;
     }
 
-    /** Rebuilds a variant from shredded components with the variant schema. */
-    public static Variant rebuild(InternalRow row, VariantSchema 
variantSchema) {
-        return ShreddingUtils.rebuild(new PaimonShreddedRow(row), 
variantSchema);
+    /** Assemble a variant (binary format) from a variant value. */
+    public static Variant assembleVariant(InternalRow row, VariantSchema 
schema) {
+        return ShreddingUtils.rebuild(new PaimonShreddedRow(row), schema);
+    }
+
+    /** Assemble a variant struct, in which each field is extracted from the 
variant value. */
+    public static InternalRow assembleVariantStruct(
+            InternalRow inputRow, VariantSchema schema, FieldToExtract[] 
fields) {
+        if (inputRow.isNullAt(schema.topLevelMetadataIdx)) {
+            throw malformedVariant();
+        }
+        byte[] topLevelMetadata = 
inputRow.getBinary(schema.topLevelMetadataIdx);
+        int numFields = fields.length;
+        GenericRow resultRow = new GenericRow(numFields);
+        int fieldIdx = 0;
+        while (fieldIdx < numFields) {
+            resultRow.setField(
+                    fieldIdx,
+                    extractField(
+                            inputRow,
+                            topLevelMetadata,
+                            schema,
+                            fields[fieldIdx].path(),
+                            fields[fieldIdx].reader()));
+            fieldIdx += 1;
+        }
+        return resultRow;
+    }
+
+    /**
+     * Return a list of fields to extract. `targetType` must be either variant 
or variant struct. If
+     * it is variant, return null because the target is the full variant and 
there is no field to
+     * extract. If it is variant struct, return a list of fields matching the 
variant struct fields.
+     */
+    public static FieldToExtract[] getFieldsToExtract(
+            DataType targetType, VariantSchema variantSchema) {
+        // todo: implement it
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * According to the dataType, variant extraction path and variantSchema, 
build the
+     * FieldToExtract.
+     */
+    public static FieldToExtract buildFieldsToExtract(
+            DataType dataType, String path, VariantCastArgs castArgs, 
VariantSchema inputSchema) {
+        VariantPathSegment[] rawPath = VariantPathSegment.parse(path);
+        SchemaPathSegment[] schemaPath = new SchemaPathSegment[rawPath.length];
+        VariantSchema schema = inputSchema;
+        // Search `rawPath` in `schema` to produce `schemaPath`. If a raw path 
segment cannot be
+        // found at a certain level of the file type, then `typedIdx` will be 
-1 starting from
+        // this position, and the final `schema` will be null.
+        for (int i = 0; i < rawPath.length; i++) {
+            VariantPathSegment extraction = rawPath[i];
+            boolean isObject = extraction instanceof ObjectExtraction;
+            int typedIdx = -1;
+            int extractionIdx = -1;
+
+            if (extraction instanceof ObjectExtraction) {
+                ObjectExtraction objExtr = (ObjectExtraction) extraction;
+                if (schema != null && schema.objectSchemaMap != null) {
+                    Integer fieldIdx = 
schema.objectSchemaMap.get(objExtr.getKey());
+                    if (fieldIdx != null) {
+                        typedIdx = schema.typedIdx;
+                        extractionIdx = fieldIdx;
+                        schema = schema.objectSchema[fieldIdx].schema();
+                    } else {
+                        schema = null;
+                    }
+                } else {
+                    schema = null;
+                }
+            } else if (extraction instanceof ArrayExtraction) {
+                ArrayExtraction arrExtr = (ArrayExtraction) extraction;
+                if (schema != null && schema.arraySchema != null) {
+                    typedIdx = schema.typedIdx;
+                    extractionIdx = arrExtr.getIndex();
+                    schema = schema.arraySchema;
+                } else {
+                    schema = null;
+                }
+            } else {
+                schema = null;
+            }
+            schemaPath[i] = new SchemaPathSegment(extraction, isObject, 
typedIdx, extractionIdx);
+        }
+
+        BaseVariantReader reader =
+                BaseVariantReader.create(
+                        schema,
+                        dataType,
+                        castArgs,
+                        (schemaPath.length == 0) && 
inputSchema.isUnshredded());
+        return new FieldToExtract(schemaPath, reader);
+    }
+
+    /**
+     * Extract a single variant struct field from a variant value. It steps 
into `inputRow`
+     * according to the variant extraction path, and read the extracted value 
as the target type.
+     */
+    private static Object extractField(
+            InternalRow inputRow,
+            byte[] topLevelMetadata,
+            VariantSchema inputSchema,
+            SchemaPathSegment[] pathList,
+            BaseVariantReader reader) {
+        int pathIdx = 0;
+        int pathLen = pathList.length;
+        InternalRow row = inputRow;
+        VariantSchema schema = inputSchema;
+        while (pathIdx < pathLen) {
+            SchemaPathSegment path = pathList[pathIdx];
+
+            if (path.typedIdx() < 0) {
+                // The extraction doesn't exist in `typed_value`. Try to 
extract the remaining part
+                // of the
+                // path in `value`.
+                int variantIdx = schema.variantIdx;
+                if (variantIdx < 0 || row.isNullAt(variantIdx)) {
+                    return null;
+                }
+                GenericVariant v = new 
GenericVariant(row.getBinary(variantIdx), topLevelMetadata);
+                while (pathIdx < pathLen) {
+                    VariantPathSegment rowPath = pathList[pathIdx].rawPath();
+                    if (rowPath instanceof ObjectExtraction && v.getType() == 
OBJECT) {
+                        v = v.getFieldByKey(((ObjectExtraction) 
rowPath).getKey());
+                    } else if (rowPath instanceof ArrayExtraction && 
v.getType() == ARRAY) {
+                        v = v.getElementAtIndex(((ArrayExtraction) 
rowPath).getIndex());
+                    } else {
+                        v = null;
+                    }
+                    if (v == null) {
+                        return null;
+                    }
+                    pathIdx += 1;
+                }
+                return VariantGet.cast(v, reader.targetType(), 
reader.castArgs());
+            }
+
+            if (row.isNullAt(path.typedIdx())) {
+                return null;
+            }
+
+            if (path.isObject()) {
+                InternalRow obj = row.getRow(path.typedIdx(), 
schema.objectSchema.length);
+                // Object field must not be null.
+                if (obj.isNullAt(path.extractionIdx())) {
+                    throw malformedVariant();
+                }
+                schema = schema.objectSchema[path.extractionIdx()].schema();
+                row = obj.getRow(path.extractionIdx(), schema.numFields);
+                // Return null if the field is missing.
+                if ((schema.typedIdx < 0 || row.isNullAt(schema.typedIdx))
+                        && (schema.variantIdx < 0 || 
row.isNullAt(schema.variantIdx))) {
+                    return null;
+                }
+            } else {
+                InternalArray arr = row.getArray(path.typedIdx());
+                // Return null if the extraction index is out of bound.
+                if (path.extractionIdx() >= arr.size()) {
+                    return null;
+                }
+                // Array element must not be null.
+                if (arr.isNullAt(path.extractionIdx())) {
+                    throw malformedVariant();
+                }
+                schema = schema.arraySchema;
+                row = arr.getRow(path.extractionIdx(), schema.numFields);
+            }
+            pathIdx += 1;
+        }
+        return reader.read(row, topLevelMetadata);
     }
 
     /** Assemble a batch of variant (binary format) from a batch of variant 
values. */
@@ -445,7 +735,7 @@ public class PaimonShreddingUtils {
             if (input.isNullAt(i)) {
                 output.setNullAt(i);
             } else {
-                Variant v = rebuild(((RowColumnVector) input).getRow(i), 
variantSchema);
+                Variant v = assembleVariant(((RowColumnVector) 
input).getRow(i), variantSchema);
                 byte[] value = v.value();
                 byte[] metadata = v.metadata();
                 valueChild.putByteArray(i, value, 0, value.length);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
index af37d18b26..5bc19972f9 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
@@ -18,6 +18,11 @@
 
 package org.apache.paimon.data.variant;
 
+import org.apache.paimon.types.DataType;
+
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+
 /**
  * A Variant represents a type that contain one of: 1) Primitive: A type and 
corresponding value
  * (e.g. INT, STRING); 2) Array: An ordered list of Variant values; 3) Object: 
An unordered
@@ -43,7 +48,12 @@ public interface Variant {
     byte[] value();
 
     /** Parses the variant to json. */
-    String toJson();
+    default String toJson() {
+        return toJson(ZoneOffset.UTC);
+    }
+
+    /** Parses the variant to json with zoneId. */
+    String toJson(ZoneId zoneId);
 
     /**
      * Extracts a sub-variant value according to a path which start with a 
`$`. e.g.
@@ -51,8 +61,10 @@ public interface Variant {
      * <p>access object's field: `$.key` or `$['key']` or `$["key"]`.
      *
      * <p>access array's first elem: `$.array[0]`
+     *
+     * <p>and then cast the value to the target type.
      */
-    Object variantGet(String path);
+    Object variantGet(String path, DataType dataType, VariantCastArgs 
castArgs);
 
     /** Returns the size of the variant in bytes. */
     long sizeInBytes();
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
new file mode 100644
index 0000000000..f95acc94b1
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantCastArgs.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import java.time.ZoneId;
+
+/** Several parameters used by `VariantGet.cast`. Packed together to simplify 
parameter passing. */
+public class VariantCastArgs {
+
+    private final boolean failOnError;
+    private final ZoneId zoneId;
+
+    public VariantCastArgs(boolean failOnError, ZoneId zoneId) {
+        this.failOnError = failOnError;
+        this.zoneId = zoneId;
+    }
+
+    public boolean failOnError() {
+        return failOnError;
+    }
+
+    public ZoneId zoneId() {
+        return zoneId;
+    }
+
+    @Override
+    public String toString() {
+        return "VariantCastArgs{" + "failOnError=" + failOnError + ", zoneId=" 
+ zoneId + '}';
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantGet.java 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantGet.java
new file mode 100644
index 0000000000..51233624f9
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantGet.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.variant.GenericVariantUtil.Type;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VariantType;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+
+/** Utils for variant get. */
+public class VariantGet {
+
+    public static Object cast(GenericVariant v, DataType dataType, 
VariantCastArgs castArgs) {
+        if (dataType instanceof VariantType) {
+            GenericVariantBuilder builder = new GenericVariantBuilder(false);
+            builder.appendVariant(v);
+            GenericVariant result = builder.result();
+            return new GenericVariant(result.value(), result.metadata());
+        }
+
+        Type variantType = v.getType();
+        if (variantType == Type.NULL) {
+            return null;
+        }
+
+        if (variantType == Type.UUID) {
+            // There's no UUID type in Paimon. We only allow it to be cast to 
string.
+            if (dataType.equals(DataTypes.STRING())) {
+                return BinaryString.fromString(v.getUuid().toString());
+            } else {
+                return invalidCast(v, dataType, castArgs);
+            }
+        }
+
+        if (dataType instanceof RowType) {
+            RowType rowType = (RowType) dataType;
+            if (variantType == Type.OBJECT) {
+                GenericRow row = new GenericRow(rowType.getFieldCount());
+                for (int i = 0; i < v.objectSize(); i++) {
+                    GenericVariant.ObjectField field = v.getFieldAtIndex(i);
+                    int idx = rowType.getFieldIndex(field.key);
+                    if (idx != -1) {
+                        row.setField(idx, cast(field.value, 
rowType.getTypeAt(idx), castArgs));
+                    }
+                }
+                return row;
+            } else {
+                return invalidCast(v, dataType, castArgs);
+            }
+        } else if (dataType instanceof MapType) {
+            MapType mapType = (MapType) dataType;
+            DataType valueType = mapType.getValueType();
+            if (mapType.getKeyType().equals(DataTypes.STRING())) {
+                if (variantType == Type.OBJECT) {
+                    int size = v.objectSize();
+                    HashMap<BinaryString, Object> map = new HashMap<>();
+                    for (int i = 0; i < size; i++) {
+                        GenericVariant.ObjectField field = 
v.getFieldAtIndex(i);
+                        map.put(
+                                BinaryString.fromString(field.key),
+                                cast(field.value, valueType, castArgs));
+                    }
+                    return new GenericMap(map);
+                } else {
+                    return invalidCast(v, dataType, castArgs);
+                }
+            } else {
+                return invalidCast(v, dataType, castArgs);
+            }
+        } else if (dataType instanceof ArrayType) {
+            ArrayType arrayType = (ArrayType) dataType;
+            if (variantType == Type.ARRAY) {
+                int size = v.arraySize();
+                Object[] array = new Object[size];
+                for (int i = 0; i < size; i++) {
+                    array[i] = cast(v.getElementAtIndex(i), 
arrayType.getElementType(), castArgs);
+                }
+                return new GenericArray(array);
+            } else {
+                return invalidCast(v, dataType, castArgs);
+            }
+        } else {
+            Object input;
+            DataType inputType;
+            switch (variantType) {
+                case OBJECT:
+                case ARRAY:
+                    if (dataType.equals(DataTypes.STRING())) {
+                        return 
BinaryString.fromString(v.toJson(castArgs.zoneId()));
+                    } else {
+                        return invalidCast(v, dataType, castArgs);
+                    }
+                case BOOLEAN:
+                    input = v.getBoolean();
+                    inputType = DataTypes.BOOLEAN();
+                    break;
+                case LONG:
+                    input = v.getLong();
+                    inputType = DataTypes.BIGINT();
+                    break;
+                case STRING:
+                    input = BinaryString.fromString(v.getString());
+                    inputType = DataTypes.STRING();
+                    break;
+                case DOUBLE:
+                    input = v.getDouble();
+                    inputType = DataTypes.DOUBLE();
+                    break;
+                case DECIMAL:
+                    BigDecimal decimal = v.getDecimal();
+                    int precision = decimal.precision();
+                    int scale = decimal.scale();
+                    input = Decimal.fromBigDecimal(decimal, precision, scale);
+                    inputType = DataTypes.DECIMAL(precision, scale);
+                    break;
+                case DATE:
+                    input = (int) v.getLong();
+                    inputType = DataTypes.DATE();
+                    break;
+                case FLOAT:
+                    input = v.getFloat();
+                    inputType = DataTypes.FLOAT();
+                    break;
+                default:
+                    // todo: support other types
+                    throw new IllegalArgumentException("Unsupported type: " + 
v.getType());
+            }
+
+            if (inputType.equals(dataType)) {
+                return input;
+            }
+
+            CastExecutor<Object, Object> resolve =
+                    (CastExecutor<Object, Object>) 
CastExecutors.resolve(inputType, dataType);
+            if (resolve != null) {
+                try {
+                    return resolve.cast(input);
+                } catch (Exception e) {
+                    return invalidCast(v, dataType, castArgs);
+                }
+            }
+
+            return invalidCast(v, dataType, castArgs);
+        }
+    }
+
+    public static Object invalidCast(Variant v, DataType dataType, 
VariantCastArgs castArgs) {
+        if (castArgs.failOnError()) {
+            throw new RuntimeException(
+                    "Invalid cast " + v.toJson(castArgs.zoneId()) + " to " + 
dataType);
+        } else {
+            return null;
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PathSegment.java 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantPathSegment.java
similarity index 70%
rename from 
paimon-common/src/main/java/org/apache/paimon/data/variant/PathSegment.java
rename to 
paimon-common/src/main/java/org/apache/paimon/data/variant/VariantPathSegment.java
index 5804fb9fcb..e6d8866ce6 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/PathSegment.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantPathSegment.java
@@ -26,38 +26,9 @@ import java.util.regex.Pattern;
 /**
  * A path segment for variant get to represent either an object key access or 
an array index access.
  */
-public class PathSegment {
-    private final String key;
-    private final Integer index;
+public abstract class VariantPathSegment {
 
-    private PathSegment(String key, Integer index) {
-        this.key = key;
-        this.index = index;
-    }
-
-    public static PathSegment createKeySegment(String key) {
-        return new PathSegment(key, null);
-    }
-
-    public static PathSegment createIndexSegment(int index) {
-        return new PathSegment(null, index);
-    }
-
-    public boolean isKey() {
-        return key != null;
-    }
-
-    public boolean isIndex() {
-        return index != null;
-    }
-
-    public String getKey() {
-        return key;
-    }
-
-    public Integer getIndex() {
-        return index;
-    }
+    public VariantPathSegment() {}
 
     private static final Pattern ROOT_PATTERN = Pattern.compile("\\$");
     // Parse index segment like `[123]`.
@@ -66,21 +37,21 @@ public class PathSegment {
     private static final Pattern KEY_PATTERN =
             Pattern.compile("\\.([^.\\[]+)|\\['([^']+)']|\\[\"([^\"]+)\"]");
 
-    public static PathSegment[] parse(String str) {
+    public static VariantPathSegment[] parse(String str) {
         // Validate root
         Matcher rootMatcher = ROOT_PATTERN.matcher(str);
         if (str.isEmpty() || !rootMatcher.find()) {
             throw new IllegalArgumentException("Invalid path: " + str);
         }
 
-        List<PathSegment> segments = new ArrayList<>();
+        List<VariantPathSegment> segments = new ArrayList<>();
         String remaining = str.substring(rootMatcher.end());
         // Parse indexes and keys
         while (!remaining.isEmpty()) {
             Matcher indexMatcher = INDEX_PATTERN.matcher(remaining);
             if (indexMatcher.lookingAt()) {
                 int index = Integer.parseInt(indexMatcher.group(1));
-                segments.add(PathSegment.createIndexSegment(index));
+                segments.add(new ArrayExtraction(index));
                 remaining = remaining.substring(indexMatcher.end());
                 continue;
             }
@@ -89,7 +60,7 @@ public class PathSegment {
             if (keyMatcher.lookingAt()) {
                 for (int i = 1; i <= 3; i++) {
                     if (keyMatcher.group(i) != null) {
-                        
segments.add(PathSegment.createKeySegment(keyMatcher.group(i)));
+                        segments.add(new 
ObjectExtraction(keyMatcher.group(i)));
                         break;
                     }
                 }
@@ -99,6 +70,36 @@ public class PathSegment {
             throw new IllegalArgumentException("Invalid path: " + str);
         }
 
-        return segments.toArray(new PathSegment[0]);
+        return segments.toArray(new VariantPathSegment[0]);
+    }
+
+    /** A path segment for object extraction. */
+    public static class ObjectExtraction extends VariantPathSegment {
+
+        private final String key;
+
+        private ObjectExtraction(String key) {
+            super();
+            this.key = key;
+        }
+
+        public String getKey() {
+            return key;
+        }
+    }
+
+    /** A path segment for array extraction. */
+    public static class ArrayExtraction extends VariantPathSegment {
+
+        private final Integer index;
+
+        public ArrayExtraction(Integer index) {
+            super();
+            this.index = index;
+        }
+
+        public Integer getIndex() {
+            return index;
+        }
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
index 9fb7f25ba2..68cc9ab0c0 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantSchema.java
@@ -47,6 +47,14 @@ public class VariantSchema {
             this.schema = schema;
         }
 
+        public String fieldName() {
+            return fieldName;
+        }
+
+        public VariantSchema schema() {
+            return schema;
+        }
+
         @Override
         public String toString() {
             return "ObjectField{" + "fieldName=" + fieldName + ", schema=" + 
schema + '}';
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
index 2ca87b11e6..6a4aa5944b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantShreddingWriter.java
@@ -149,8 +149,6 @@ public class VariantShreddingWriter {
                 // Store the typed value.
                 result.addScalar(typedValue);
             } else {
-                GenericVariantBuilder variantBuilder = new 
GenericVariantBuilder(false);
-                variantBuilder.appendVariant(v);
                 result.addVariantValue(v.value());
             }
         } else {
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
index c226b9dead..e7d81b57a2 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java
@@ -30,7 +30,12 @@ import org.apache.paimon.types.RowType;
 import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
+import java.time.ZoneOffset;
 
+import static 
org.apache.paimon.data.variant.PaimonShreddingUtils.assembleVariant;
+import static 
org.apache.paimon.data.variant.PaimonShreddingUtils.buildVariantSchema;
+import static org.apache.paimon.data.variant.PaimonShreddingUtils.castShredded;
+import static 
org.apache.paimon.data.variant.PaimonShreddingUtils.variantShreddingSchema;
 import static org.apache.paimon.types.DataTypesTest.assertThat;
 
 /** Test of {@link GenericVariant}. */
@@ -86,23 +91,51 @@ public class GenericVariantTest {
                         + "}\n";
 
         Variant variant = GenericVariant.fromJson(json);
-        assertThat(variant.variantGet("$.object"))
+
+        VariantCastArgs castArgs = new VariantCastArgs(false, ZoneOffset.UTC);
+        assertThat(variant.variantGet("$.object", DataTypes.STRING(), 
castArgs))
                 .isEqualTo(
-                        "{\"address\":{\"city\":\"Hangzhou\",\"street\":\"Main 
St\"},\"age\":2,\"name\":\"Apache Paimon\"}");
-        assertThat(variant.variantGet("$.object.name")).isEqualTo("Apache 
Paimon");
-        
assertThat(variant.variantGet("$.object.address.street")).isEqualTo("Main St");
-        
assertThat(variant.variantGet("$[\"object\"]['address'].city")).isEqualTo("Hangzhou");
-        assertThat(variant.variantGet("$.array")).isEqualTo("[1,2,3,4,5]");
-        assertThat(variant.variantGet("$.array[0]")).isEqualTo(1L);
-        assertThat(variant.variantGet("$.array[3]")).isEqualTo(4L);
-        assertThat(variant.variantGet("$.string")).isEqualTo("Hello, World!");
-        assertThat(variant.variantGet("$.long")).isEqualTo(12345678901234L);
-        assertThat(variant.variantGet("$.double"))
+                        BinaryString.fromString(
+                                
"{\"address\":{\"city\":\"Hangzhou\",\"street\":\"Main 
St\"},\"age\":2,\"name\":\"Apache Paimon\"}"));
+        RowType address =
+                RowType.of(
+                        new DataType[] {DataTypes.STRING(), 
DataTypes.STRING()},
+                        new String[] {"street", "city"});
+        assertThat(variant.variantGet("$.object.address", address, castArgs))
+                .isEqualTo(
+                        GenericRow.of(
+                                BinaryString.fromString("Main St"),
+                                BinaryString.fromString("Hangzhou")));
+        assertThat(variant.variantGet("$.object.name", DataTypes.STRING(), 
castArgs))
+                .isEqualTo(BinaryString.fromString("Apache Paimon"));
+        assertThat(variant.variantGet("$.object.address.street", 
DataTypes.STRING(), castArgs))
+                .isEqualTo(BinaryString.fromString("Main St"));
+        assertThat(
+                        variant.variantGet(
+                                "$[\"object\"]['address'].city", 
DataTypes.STRING(), castArgs))
+                .isEqualTo(BinaryString.fromString("Hangzhou"));
+        assertThat(variant.variantGet("$.array", DataTypes.STRING(), castArgs))
+                .isEqualTo(BinaryString.fromString("[1,2,3,4,5]"));
+        assertThat(variant.variantGet("$.array", 
DataTypes.ARRAY(DataTypes.INT()), castArgs))
+                .isEqualTo(new GenericArray(new Integer[] {1, 2, 3, 4, 5}));
+        assertThat(variant.variantGet("$.array[0]", DataTypes.BIGINT(), 
castArgs)).isEqualTo(1L);
+        assertThat(variant.variantGet("$.array[3]", DataTypes.BIGINT(), 
castArgs)).isEqualTo(4L);
+        assertThat(variant.variantGet("$.string", DataTypes.STRING(), 
castArgs))
+                .isEqualTo(BinaryString.fromString("Hello, World!"));
+        assertThat(variant.variantGet("$.long", DataTypes.BIGINT(), castArgs))
+                .isEqualTo(12345678901234L);
+        assertThat(variant.variantGet("$.long", DataTypes.STRING(), castArgs))
+                .isEqualTo(BinaryString.fromString("12345678901234"));
+        assertThat(variant.variantGet("$.double", DataTypes.DOUBLE(), 
castArgs))
                 .isEqualTo(1.0123456789012345678901234567890123456789);
-        assertThat(variant.variantGet("$.decimal")).isEqualTo(new 
BigDecimal("100.99"));
-        assertThat(variant.variantGet("$.boolean1")).isEqualTo(true);
-        assertThat(variant.variantGet("$.boolean2")).isEqualTo(false);
-        assertThat(variant.variantGet("$.nullField")).isNull();
+        assertThat(variant.variantGet("$.decimal", DataTypes.DECIMAL(5, 2), 
castArgs))
+                .isEqualTo(Decimal.fromBigDecimal(new BigDecimal("100.99"), 5, 
2));
+        assertThat(variant.variantGet("$.decimal", DataTypes.STRING(), 
castArgs))
+                .isEqualTo(BinaryString.fromString("100.99"));
+        assertThat(variant.variantGet("$.boolean1", DataTypes.BOOLEAN(), 
castArgs)).isEqualTo(true);
+        assertThat(variant.variantGet("$.boolean2", DataTypes.BOOLEAN(), 
castArgs))
+                .isEqualTo(false);
+        assertThat(variant.variantGet("$.nullField", DataTypes.BOOLEAN(), 
castArgs)).isNull();
     }
 
     @Test
@@ -228,14 +261,14 @@ public class GenericVariantTest {
 
     private void testShreddingResult(
             GenericVariant variant, RowType shreddedType, InternalRow 
expected) {
-        RowType shreddingSchema = 
PaimonShreddingUtils.variantShreddingSchema(shreddedType);
-        VariantSchema variantSchema = 
PaimonShreddingUtils.buildVariantSchema(shreddingSchema);
+        RowType shreddingSchema = variantShreddingSchema(shreddedType);
+        VariantSchema variantSchema = buildVariantSchema(shreddingSchema);
         // test cast shredded
-        InternalRow shredded = PaimonShreddingUtils.castShredded(variant, 
variantSchema);
+        InternalRow shredded = castShredded(variant, variantSchema);
         assertThat(shredded).isEqualTo(expected);
 
         // test rebuild
-        Variant rebuild = PaimonShreddingUtils.rebuild(shredded, 
variantSchema);
+        Variant rebuild = assembleVariant(shredded, variantSchema);
         assertThat(variant.toJson()).isEqualTo(rebuild.toJson());
     }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/variant/PaimonShreddingUtilsTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/variant/PaimonShreddingUtilsTest.java
new file mode 100644
index 0000000000..d918045702
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/variant/PaimonShreddingUtilsTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.variant;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.variant.PaimonShreddingUtils.FieldToExtract;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.ZoneOffset;
+import java.util.HashMap;
+
+import static 
org.apache.paimon.data.variant.PaimonShreddingUtils.assembleVariantStruct;
+import static 
org.apache.paimon.data.variant.PaimonShreddingUtils.buildFieldsToExtract;
+import static 
org.apache.paimon.data.variant.PaimonShreddingUtils.buildVariantSchema;
+import static org.apache.paimon.data.variant.PaimonShreddingUtils.castShredded;
+import static 
org.apache.paimon.data.variant.PaimonShreddingUtils.variantShreddingSchema;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for PaimonShreddingUtils. */
+public class PaimonShreddingUtilsTest {
+
+    @Test
+    void testAssembleAllTypes() {
+        VariantCastArgs castArgs = new VariantCastArgs(true, ZoneOffset.UTC);
+
+        DataField f1 =
+                new DataField(
+                        1,
+                        "object",
+                        RowType.of(
+                                new DataType[] {DataTypes.STRING(), 
DataTypes.INT()},
+                                new String[] {"name", "age"}));
+        DataField f2 = new DataField(2, "array", 
DataTypes.ARRAY(DataTypes.INT()));
+        DataField f3 = new DataField(3, "string", DataTypes.STRING());
+        DataField f4 = new DataField(4, "tinyint", DataTypes.TINYINT());
+        DataField f5 = new DataField(5, "smallint", DataTypes.SMALLINT());
+        DataField f6 = new DataField(6, "int", DataTypes.INT());
+        DataField f7 = new DataField(7, "long", DataTypes.BIGINT());
+        DataField f8 = new DataField(8, "double", DataTypes.DOUBLE());
+        DataField f9 = new DataField(9, "decimal", DataTypes.DECIMAL(5, 2));
+        DataField f10 = new DataField(10, "boolean", DataTypes.BOOLEAN());
+        DataField f11 = new DataField(11, "nullField", DataTypes.INT());
+        RowType allTypes = RowType.of(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, 
f11);
+
+        String json =
+                "{\n"
+                        + "  \"object\": {\n"
+                        + "    \"name\": \"Apache Paimon\",\n"
+                        + "    \"age\": 3\n"
+                        + "  },\n"
+                        + "  \"array\": [1, 2, 3, 4, 5],\n"
+                        + "  \"string\": \"Hello, World!\",\n"
+                        + "  \"tinyint\": 1,\n"
+                        + "  \"smallint\": 3000,\n"
+                        + "  \"int\": 400000,\n"
+                        + "  \"long\": 12345678901234,\n"
+                        + "  \"double\": 
1.0123456789012345678901234567890123456789,\n"
+                        + "  \"decimal\": 100.99,\n"
+                        + "  \"boolean\": true,\n"
+                        + "  \"nullField\": null\n"
+                        + "}\n";
+
+        GenericVariant v = GenericVariant.fromJson(json);
+        GenericRow expert =
+                GenericRow.of(
+                        GenericRow.of(BinaryString.fromString("Apache 
Paimon"), 3),
+                        new GenericArray(new Integer[] {1, 2, 3, 4, 5}),
+                        BinaryString.fromString("Hello, World!"),
+                        (byte) 1,
+                        (short) 3000,
+                        400000,
+                        12345678901234L,
+                        1.0123456789012345678901234567890123456789D,
+                        Decimal.fromBigDecimal(new 
java.math.BigDecimal("100.99"), 5, 2),
+                        true,
+                        null);
+
+        // shredding to real type
+        RowType shreddedType = new RowType(allTypes.getFields());
+        RowType shreddingSchema = variantShreddingSchema(shreddedType);
+        VariantSchema variantSchema = buildVariantSchema(shreddingSchema);
+        FieldToExtract[] fields = new FieldToExtract[allTypes.getFieldCount()];
+        for (int i = 0; i < allTypes.getFields().size(); i++) {
+            fields[i] =
+                    buildFieldsToExtract(
+                            allTypes.getFields().get(i).type(),
+                            "$." + allTypes.getFields().get(i).name(),
+                            castArgs,
+                            variantSchema);
+        }
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(expert);
+
+        // no shredding
+        shreddedType = RowType.of();
+        shreddingSchema = variantShreddingSchema(shreddedType);
+        variantSchema = buildVariantSchema(shreddingSchema);
+        fields = new FieldToExtract[allTypes.getFieldCount()];
+        for (int i = 0; i < allTypes.getFields().size(); i++) {
+            fields[i] =
+                    buildFieldsToExtract(
+                            allTypes.getFields().get(i).type(),
+                            "$." + allTypes.getFields().get(i).name(),
+                            castArgs,
+                            variantSchema);
+        }
+
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(expert);
+
+        // shredding to string, then cast to the real type
+        shreddedType =
+                RowType.of(
+                        allTypes.getFields().stream()
+                                .map(a -> a.newType(DataTypes.STRING()))
+                                .toArray(DataField[]::new));
+        shreddingSchema = variantShreddingSchema(shreddedType);
+        variantSchema = buildVariantSchema(shreddingSchema);
+        fields = new FieldToExtract[allTypes.getFieldCount()];
+        for (int i = 0; i < allTypes.getFields().size(); i++) {
+            fields[i] =
+                    buildFieldsToExtract(
+                            allTypes.getFields().get(i).type(),
+                            "$." + allTypes.getFields().get(i).name(),
+                            castArgs,
+                            variantSchema);
+        }
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(expert);
+
+        // shredding to real type, then cast to the string
+        expert =
+                GenericRow.of(
+                        BinaryString.fromString("{\"age\":3,\"name\":\"Apache 
Paimon\"}"),
+                        BinaryString.fromString("[1,2,3,4,5]"),
+                        BinaryString.fromString("Hello, World!"),
+                        BinaryString.fromString("1"),
+                        BinaryString.fromString("3000"),
+                        BinaryString.fromString("400000"),
+                        BinaryString.fromString("12345678901234"),
+                        BinaryString.fromString("1.0123456789012346"),
+                        BinaryString.fromString("100.99"),
+                        BinaryString.fromString("true"),
+                        null);
+
+        shreddedType = new RowType(allTypes.getFields());
+        shreddingSchema = variantShreddingSchema(shreddedType);
+        variantSchema = buildVariantSchema(shreddingSchema);
+        fields = new FieldToExtract[allTypes.getFieldCount()];
+        for (int i = 0; i < allTypes.getFields().size(); i++) {
+            fields[i] =
+                    buildFieldsToExtract(
+                            DataTypes.STRING(),
+                            "$." + allTypes.getFields().get(i).name(),
+                            castArgs,
+                            variantSchema);
+        }
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(expert);
+
+        // no shredding, then cast to the string
+        shreddedType = RowType.of();
+        shreddingSchema = variantShreddingSchema(shreddedType);
+        variantSchema = buildVariantSchema(shreddingSchema);
+        fields = new FieldToExtract[allTypes.getFieldCount()];
+        for (int i = 0; i < allTypes.getFields().size(); i++) {
+            fields[i] =
+                    buildFieldsToExtract(
+                            DataTypes.STRING(),
+                            "$." + allTypes.getFields().get(i).name(),
+                            castArgs,
+                            variantSchema);
+        }
+
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(expert);
+
+        // cast struct to map
+        shreddedType = RowType.of(f1);
+        shreddingSchema = variantShreddingSchema(shreddedType);
+        variantSchema = buildVariantSchema(shreddingSchema);
+        fields = new FieldToExtract[1];
+        fields[0] =
+                buildFieldsToExtract(
+                        new MapType(DataTypes.STRING(), DataTypes.STRING()),
+                        "$.object",
+                        castArgs,
+                        variantSchema);
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(
+                        GenericRow.of(
+                                new GenericMap(
+                                        new HashMap<BinaryString, 
BinaryString>() {
+                                            {
+                                                put(
+                                                        
BinaryString.fromString("age"),
+                                                        
BinaryString.fromString("3"));
+                                                put(
+                                                        
BinaryString.fromString("name"),
+                                                        
BinaryString.fromString("Apache Paimon"));
+                                            }
+                                        })));
+    }
+
+    @Test
+    void testAssembleVariantStructWithShredding() {
+        VariantCastArgs castArgs = new VariantCastArgs(true, ZoneOffset.UTC);
+
+        // shreddedType: ROW<a INT, b STRING>
+        RowType shreddedType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.STRING()},
+                        new String[] {"a", "b"});
+        RowType shreddingSchema = variantShreddingSchema(shreddedType);
+        VariantSchema variantSchema = buildVariantSchema(shreddingSchema);
+
+        // fieldsToExtract: $.a : int, $.b : string
+        FieldToExtract f1 = buildFieldsToExtract(DataTypes.INT(), "$.a", 
castArgs, variantSchema);
+        FieldToExtract f2 =
+                buildFieldsToExtract(DataTypes.STRING(), "$.b", castArgs, 
variantSchema);
+        FieldToExtract[] fields = new FieldToExtract[] {f1, f2};
+
+        GenericVariant v = GenericVariant.fromJson("{\"a\": 1, \"b\": 
\"hello\"}");
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(GenericRow.of(1, BinaryString.fromString("hello")));
+
+        v = GenericVariant.fromJson("{\"a\": 27}");
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(GenericRow.of(27, null));
+
+        v = GenericVariant.fromJson("{\"b\":\"hangzhou\", \"other\":\"xxx\"}");
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(GenericRow.of(null, 
BinaryString.fromString("hangzhou")));
+
+        v = GenericVariant.fromJson("{\"other\":\"yyy\"}");
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(GenericRow.of(null, null));
+
+        v = GenericVariant.fromJson("{\"a\":\"27\"}");
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(GenericRow.of(27, null));
+
+        v = GenericVariant.fromJson("{}");
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(GenericRow.of(null, null));
+
+        // fieldsToExtract: $.a : int, $.b : string, $.c : string and c is not 
in shreddedType
+        FieldToExtract f3 =
+                buildFieldsToExtract(DataTypes.STRING(), "$.c", castArgs, 
variantSchema);
+        fields = new FieldToExtract[] {f1, f2, f3};
+        v = GenericVariant.fromJson("{\"c\": \"hi\", \"a\": 27}");
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(GenericRow.of(27, null, 
BinaryString.fromString("hi")));
+
+        v = GenericVariant.fromJson("{\"c\": 1111, \"a\": 27}");
+        assertThat(assembleVariantStruct(castShredded(v, variantSchema), 
variantSchema, fields))
+                .isEqualTo(GenericRow.of(27, null, 
BinaryString.fromString("1111")));
+    }
+
+    @Test
+    void testVariantCastArgs() {
+        // shreddedType: ROW<a INT, b STRING>
+        RowType shreddedType = RowType.of(new DataType[] {DataTypes.INT()}, 
new String[] {"a"});
+        RowType shreddingSchema = variantShreddingSchema(shreddedType);
+        VariantSchema variantSchema = buildVariantSchema(shreddingSchema);
+
+        // failOnError = true
+        VariantCastArgs castArgs = new VariantCastArgs(true, ZoneOffset.UTC);
+
+        FieldToExtract f1 = buildFieldsToExtract(DataTypes.INT(), "$.a", 
castArgs, variantSchema);
+        FieldToExtract[] fields1 = new FieldToExtract[] {f1};
+
+        GenericVariant v1 = GenericVariant.fromJson("{\"a\": \"not a num\"}");
+        assertThatThrownBy(
+                () ->
+                        assembleVariantStruct(
+                                castShredded(v1, variantSchema), 
variantSchema, fields1));
+
+        // failOnError = false
+        castArgs = new VariantCastArgs(false, ZoneOffset.UTC);
+        FieldToExtract f2 = buildFieldsToExtract(DataTypes.INT(), "$.a", 
castArgs, variantSchema);
+        FieldToExtract[] fields2 = new FieldToExtract[] {f2};
+
+        GenericVariant v2 = GenericVariant.fromJson("{\"a\": \"not a num\"}");
+        GenericRow genericRow = new GenericRow(1);
+        genericRow.setField(0, null);
+        assertThat(assembleVariantStruct(castShredded(v2, variantSchema), 
variantSchema, fields2))
+                .isEqualTo(genericRow);
+    }
+}

Reply via email to