This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 842112286 [FLINK-38886] Introduce `GenericRecordData` and Internal / 
External converters in flink-cdc-common (#4218)
842112286 is described below

commit 842112286864d411cdc32572083d1b48835f67e7
Author: yuxiqian <[email protected]>
AuthorDate: Fri Jan 23 15:54:48 2026 +0800

    [FLINK-38886] Introduce `GenericRecordData` and Internal / External 
converters in flink-cdc-common (#4218)
---
 docs/content.zh/docs/core-concept/type-mappings.md |  62 +++
 docs/content/docs/core-concept/type-mappings.md    |  62 +++
 .../cdc/common/converter/CommonConverter.java      | 449 +++++++++++++++++++++
 .../common/converter/InternalClassConverter.java   | 174 ++++++++
 .../common/converter/InternalObjectConverter.java  | 176 ++++++++
 .../cdc/common/converter/JavaClassConverter.java   | 174 ++++++++
 .../cdc/common/converter/JavaObjectConverter.java  | 172 ++++++++
 .../flink/cdc/common/data/GenericArrayData.java    |   5 +
 .../flink/cdc/common/data/GenericRecordData.java   | 239 +++++++++++
 .../flink/cdc/common/utils/SchemaMergingUtils.java |   9 +-
 .../converter/InternalClassConverterTest.java      |  97 +++++
 .../converter/InternalObjectConverterTest.java     | 409 +++++++++++++++++++
 .../common/converter/JavaClassConverterTest.java   |  97 +++++
 .../common/converter/JavaObjectConverterTest.java  | 389 ++++++++++++++++++
 .../common/converter/VariantConvertingTest.java    | 144 +++++++
 .../cdc/common/utils/SchemaMergingUtilsTest.java   |  47 ++-
 16 files changed, 2687 insertions(+), 18 deletions(-)

diff --git a/docs/content.zh/docs/core-concept/type-mappings.md 
b/docs/content.zh/docs/core-concept/type-mappings.md
new file mode 100644
index 000000000..a58784fa2
--- /dev/null
+++ b/docs/content.zh/docs/core-concept/type-mappings.md
@@ -0,0 +1,62 @@
+---
+title: "Type Mappings"
+weight: 8
+type: docs
+aliases:
+  - /core-concept/type-mappings/
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# 类型映射
+
+对于每个 CDC 数据类型(`org.apache.flink.cdc.common.types.DataType` 
的子类),我们规定了用于内部序列化和反序列化的 CDC 内部类型,以及用于类型合并、类型转换和 UDF 求值的 Java 外部类型。
+
+## 内部类型和外部类型
+
+一些基本类型对于内部类型和 Java 类可能具有相同的表示形式(例如,`DataTypes.INT()` 使用 `java.lang.Integer` 
同时作为内部和外部类型)。
+其他类型可能使用不同的表示形式,例如,`DataTypes.TIMESTAMP` 在内部表示中使用 
`org.apache.flink.cdc.common.data.TimestampData`,在外部操作中使用 
`java.time.LocalDateTime`。
+
+如果您正在编写 YAML Pipeline 连接器,`DataChangeEvent` 应当携带内部类型 
`RecordData`,并且其所有字段都是内部类型的实例。
+
+如果您正在编写 Transform UDF,则其参数和返回值类型应定义为其外部 Java 类型。
+
+## 完整类型列表
+
+| CDC 数据类型                       | CDC 内部类型                                    
               | Java 外部类型                                           |
+|--------------------------------|------------------------------------------------------------|-----------------------------------------------------|
+| BOOLEAN                        | `java.lang.Boolean`                         
               | `java.lang.Boolean`                                 |
+| TINYINT                        | `java.lang.Byte`                            
               | `java.lang.Byte`                                    |
+| SMALLINT                       | `java.lang.Short`                           
               | `java.lang.Short`                                   |
+| INTEGER                        | `java.lang.Integer`                         
               | `java.lang.Integer`                                 |
+| BIGINT                         | `java.lang.Long`                            
               | `java.lang.Long`                                    |
+| FLOAT                          | `java.lang.Float`                           
               | `java.lang.Float`                                   |
+| DOUBLE                         | `java.lang.Double`                          
               | `java.lang.Double`                                  |
+| DECIMAL                        | 
`org.apache.flink.cdc.common.data.DecimalData`             | 
`java.math.BigDecimal`                              |
+| DATE                           | `org.apache.flink.cdc.common.data.DateData` 
               | `java.time.LocalDate`                               |
+| TIME                           | `org.apache.flink.cdc.common.data.TimeData` 
               | `java.time.LocalTime`                               |
+| TIMESTAMP                      | 
`org.apache.flink.cdc.common.data.TimestampData`           | 
`java.time.LocalDateTime`                           |
+| TIMESTAMP_TZ                   | 
`org.apache.flink.cdc.common.data.ZonedTimestampData`      | 
`java.time.ZonedDateTime`                           |
+| TIMESTAMP_LTZ                  | 
`org.apache.flink.cdc.common.data.LocalZonedTimestampData` | 
`java.time.Instant`                                 |
+| CHAR<br/>VARCHAR<br/>STRING    | 
`org.apache.flink.cdc.common.data.StringData`              | `java.lang.String` 
                                 |
+| BINARY<br/>VARBINARY<br/>BYTES | `byte[]`                                    
               | `byte[]`                                            |
+| ARRAY                          | 
`org.apache.flink.cdc.common.data.ArrayData`               | 
`java.util.List<T>`                                 |
+| MAP                            | `org.apache.flink.cdc.common.data.MapData`  
               | `java.util.Map<K, V>`                               |
+| ROW                            | 
`org.apache.flink.cdc.common.data.RecordData`              | 
`java.util.List<Object>`                            |
+| VARIANT                        | 
`org.apache.flink.cdc.common.types.variant.Variant`        | 
`org.apache.flink.cdc.common.types.variant.Variant` |
diff --git a/docs/content/docs/core-concept/type-mappings.md 
b/docs/content/docs/core-concept/type-mappings.md
new file mode 100644
index 000000000..a02f9b3ba
--- /dev/null
+++ b/docs/content/docs/core-concept/type-mappings.md
@@ -0,0 +1,62 @@
+---
+title: "Type Mappings"
+weight: 8
+type: docs
+aliases:
+  - /core-concept/type-mappings/
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Type Mappings
+
+For each CDC DataType (subclasses of 
`org.apache.flink.cdc.common.types.DataType`), we define CDC Internal types for 
internal serialization & deserialization and Java classes (for type merging, 
casting, and UDF evaluations).
+
+## Internal and External Types
+
+Primitive types may have the same representation for Internal type and Java 
classes (`DataTypes.INT()` only uses `java.lang.Integer`).
+Other types use different representations, like `DataTypes.TIMESTAMP` uses 
`org.apache.flink.cdc.common.data.TimestampData` for internal representation 
and `java.time.LocalDateTime` for external operations.
+
+If you're writing a pipeline source / sink connector, `DataChangeEvent` 
carries internal type `RecordData`, and all its fields are internal type 
instances.
+
+If you're writing a UDF, its arguments and return value types should be 
defined as its external Java type.
+
+## Full Types List
+
+| CDC Data Type                  | CDC Internal Type                           
               | External Java Class                                 |
+|--------------------------------|------------------------------------------------------------|-----------------------------------------------------|
+| BOOLEAN                        | `java.lang.Boolean`                         
               | `java.lang.Boolean`                                 |
+| TINYINT                        | `java.lang.Byte`                            
               | `java.lang.Byte`                                    |
+| SMALLINT                       | `java.lang.Short`                           
               | `java.lang.Short`                                   |
+| INTEGER                        | `java.lang.Integer`                         
               | `java.lang.Integer`                                 |
+| BIGINT                         | `java.lang.Long`                            
               | `java.lang.Long`                                    |
+| FLOAT                          | `java.lang.Float`                           
               | `java.lang.Float`                                   |
+| DOUBLE                         | `java.lang.Double`                          
               | `java.lang.Double`                                  |
+| DECIMAL                        | 
`org.apache.flink.cdc.common.data.DecimalData`             | 
`java.math.BigDecimal`                              |
+| DATE                           | `org.apache.flink.cdc.common.data.DateData` 
               | `java.time.LocalDate`                               |
+| TIME                           | `org.apache.flink.cdc.common.data.TimeData` 
               | `java.time.LocalTime`                               |
+| TIMESTAMP                      | 
`org.apache.flink.cdc.common.data.TimestampData`           | 
`java.time.LocalDateTime`                           |
+| TIMESTAMP_TZ                   | 
`org.apache.flink.cdc.common.data.ZonedTimestampData`      | 
`java.time.ZonedDateTime`                           |
+| TIMESTAMP_LTZ                  | 
`org.apache.flink.cdc.common.data.LocalZonedTimestampData` | 
`java.time.Instant`                                 |
+| CHAR<br/>VARCHAR<br/>STRING    | 
`org.apache.flink.cdc.common.data.StringData`              | `java.lang.String` 
                                 |
+| BINARY<br/>VARBINARY<br/>BYTES | `byte[]`                                    
               | `byte[]`                                            |
+| ARRAY                          | 
`org.apache.flink.cdc.common.data.ArrayData`               | 
`java.util.List<T>`                                 |
+| MAP                            | `org.apache.flink.cdc.common.data.MapData`  
               | `java.util.Map<K, V>`                               |
+| ROW                            | 
`org.apache.flink.cdc.common.data.RecordData`              | 
`java.util.List<Object>`                            |
+| VARIANT                        | 
`org.apache.flink.cdc.common.types.variant.Variant`        | 
`org.apache.flink.cdc.common.types.variant.Variant` |
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/CommonConverter.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/CommonConverter.java
new file mode 100644
index 000000000..feb0e4459
--- /dev/null
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/CommonConverter.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.VariantType;
+import org.apache.flink.cdc.common.types.variant.Variant;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Some shared converters between Java objects and internal objects. None of 
these functions are
+ * null-safe.
+ */
+public class CommonConverter {
+
+    // ----------------------
+    // These are shared converters used for both Internal and Java objects.
+    // ----------------------
+    static Boolean convertToBoolean(Object obj) {
+        if (obj instanceof Boolean) {
+            return (Boolean) obj;
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
BOOLEAN.");
+    }
+
+    static Byte convertToByte(Object obj) {
+        if (obj instanceof Byte) {
+            return (Byte) obj;
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
TINYINT.");
+    }
+
+    static Short convertToShort(Object obj) {
+        if (obj instanceof Short) {
+            return (Short) obj;
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
SMALLINT.");
+    }
+
+    static Integer convertToInt(Object obj) {
+        if (obj instanceof Integer) {
+            return (Integer) obj;
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
INT.");
+    }
+
+    static Long convertToLong(Object obj) {
+        if (obj instanceof Long) {
+            return (Long) obj;
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
BIGINT.");
+    }
+
+    static Float convertToFloat(Object obj) {
+        if (obj instanceof Float) {
+            return (Float) obj;
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
FLOAT.");
+    }
+
+    static Double convertToDouble(Object obj) {
+        if (obj instanceof Double) {
+            return (Double) obj;
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
DOUBLE.");
+    }
+
+    static byte[] convertToBinary(Object obj) {
+        if (obj instanceof byte[]) {
+            return (byte[]) obj;
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
BINARY.");
+    }
+
+    static Variant convertToVariant(Object obj, VariantType variantType) {
+        if (obj instanceof Variant) {
+            return (Variant) obj;
+        }
+        throw new RuntimeException(
+                "Cannot convert "
+                        + obj
+                        + " of type "
+                        + obj.getClass()
+                        + " to Variant ("
+                        + variantType
+                        + ").");
+    }
+
+    // ----------------------
+    // These are converters to CDC Internal objects.
+    // ----------------------
+
+    static StringData convertToStringData(Object obj) {
+        if (obj instanceof StringData) {
+            return (StringData) obj;
+        }
+        if (obj instanceof String) {
+            return BinaryStringData.fromString((String) obj);
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
STRING DATA.");
+    }
+
+    static DecimalData convertToDecimalData(Object obj) {
+        if (obj instanceof DecimalData) {
+            return (DecimalData) obj;
+        }
+        if (obj instanceof BigDecimal) {
+            BigDecimal bd = (BigDecimal) obj;
+            return DecimalData.fromBigDecimal(bd, bd.precision(), bd.scale());
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
DECIMAL DATA.");
+    }
+
+    static DateData convertToDateData(Object obj) {
+        if (obj instanceof DateData) {
+            return (DateData) obj;
+        }
+        if (obj instanceof LocalDate) {
+            return DateData.fromLocalDate((LocalDate) obj);
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
DATE DATA.");
+    }
+
+    static TimeData convertToTimeData(Object obj) {
+        if (obj instanceof TimeData) {
+            return (TimeData) obj;
+        }
+        if (obj instanceof LocalTime) {
+            return TimeData.fromLocalTime((LocalTime) obj);
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
TIME DATA.");
+    }
+
+    static TimestampData convertToTimestampData(Object obj) {
+        if (obj instanceof TimestampData) {
+            return (TimestampData) obj;
+        }
+        if (obj instanceof LocalDateTime) {
+            return TimestampData.fromLocalDateTime((LocalDateTime) obj);
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
TIMESTAMP DATA.");
+    }
+
+    static ZonedTimestampData convertToZonedTimestampData(Object obj) {
+        if (obj instanceof ZonedTimestampData) {
+            return (ZonedTimestampData) obj;
+        }
+        if (obj instanceof ZonedDateTime) {
+            return ZonedTimestampData.fromZonedDateTime((ZonedDateTime) obj);
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
TIMESTAMP_TZ DATA.");
+    }
+
+    static LocalZonedTimestampData convertToLocalZonedTimestampData(Object 
obj) {
+        if (obj instanceof LocalZonedTimestampData) {
+            return (LocalZonedTimestampData) obj;
+        }
+        if (obj instanceof Instant) {
+            return LocalZonedTimestampData.fromInstant((Instant) obj);
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
TIMESTAMP_LTZ DATA.");
+    }
+
+    static ArrayData convertToArrayData(Object obj, ArrayType arrayType) {
+        if (obj instanceof ArrayData) {
+            return (ArrayData) obj;
+        }
+        if (obj instanceof List) {
+            DataType elementType = arrayType.getElementType();
+            List<?> objects = (List<?>) obj;
+            List<Object> convertedObjects = new ArrayList<>(objects.size());
+            for (Object object : objects) {
+                convertedObjects.add(
+                        InternalObjectConverter.convertToInternal(object, 
elementType));
+            }
+            return new GenericArrayData(convertedObjects.toArray());
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
ARRAY DATA.");
+    }
+
+    static MapData convertToMapData(Object obj, MapType mapType) {
+        if (obj instanceof MapData) {
+            return (MapData) obj;
+        }
+        if (obj instanceof Map) {
+            DataType keyType = mapType.getKeyType();
+            DataType valueType = mapType.getValueType();
+            Map<?, ?> map = (Map<?, ?>) obj;
+            Map<Object, Object> convertedMap = new HashMap<>(map.size());
+            for (Map.Entry<?, ?> entry : map.entrySet()) {
+                Object key = 
InternalObjectConverter.convertToInternal(entry.getKey(), keyType);
+                Object value =
+                        
InternalObjectConverter.convertToInternal(entry.getValue(), valueType);
+                convertedMap.put(key, value);
+            }
+            return new GenericMapData(convertedMap);
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
MAP DATA.");
+    }
+
+    static RecordData convertToRowData(Object obj, RowType rowType) {
+        if (obj instanceof RecordData) {
+            return (RecordData) obj;
+        }
+        if (obj instanceof List) {
+            List<DataType> dataTypes = rowType.getFieldTypes();
+            List<?> objects = (List<?>) obj;
+            List<Object> convertedObjects = new ArrayList<>(objects.size());
+            Preconditions.checkArgument(
+                    objects.size() == dataTypes.size(),
+                    "Cannot convert "
+                            + obj
+                            + " of type "
+                            + obj.getClass()
+                            + " with different arity.");
+            for (int i = 0; i < objects.size(); i++) {
+                convertedObjects.add(
+                        InternalObjectConverter.convertToInternal(
+                                objects.get(i), dataTypes.get(i)));
+            }
+            return GenericRecordData.of(convertedObjects.toArray());
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
ROW DATA.");
+    }
+
+    // ----------------------
+    // These are converters to Java objects.
+    // ----------------------
+
+    static String convertToString(Object obj) {
+        if (obj instanceof String) {
+            return (String) obj;
+        }
+        if (obj instanceof StringData) {
+            return obj.toString();
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
STRING.");
+    }
+
+    static BigDecimal convertToBigDecimal(Object obj) {
+        if (obj instanceof BigDecimal) {
+            return (BigDecimal) obj;
+        }
+        if (obj instanceof DecimalData) {
+            return ((DecimalData) obj).toBigDecimal();
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
DECIMAL.");
+    }
+
+    static LocalDate convertToLocalDate(Object obj) {
+        if (obj instanceof LocalDate) {
+            return (LocalDate) obj;
+        }
+        if (obj instanceof DateData) {
+            return ((DateData) obj).toLocalDate();
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
LOCAL DATE.");
+    }
+
+    static LocalTime convertToLocalTime(Object obj) {
+        if (obj instanceof LocalTime) {
+            return (LocalTime) obj;
+        }
+        if (obj instanceof TimeData) {
+            return ((TimeData) obj).toLocalTime();
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
LOCAL TIME.");
+    }
+
+    static LocalDateTime convertToLocalDateTime(Object obj) {
+        if (obj instanceof LocalDateTime) {
+            return (LocalDateTime) obj;
+        }
+        if (obj instanceof TimestampData) {
+            return ((TimestampData) obj).toLocalDateTime();
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
LOCAL DATETIME.");
+    }
+
+    static ZonedDateTime convertToZonedDateTime(Object obj) {
+        if (obj instanceof ZonedDateTime) {
+            return (ZonedDateTime) obj;
+        }
+        if (obj instanceof ZonedTimestampData) {
+            return ((ZonedTimestampData) obj).getZonedDateTime();
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
ZONED DATETIME.");
+    }
+
+    static Instant convertToInstant(Object obj) {
+        if (obj instanceof Instant) {
+            return (Instant) obj;
+        }
+        if (obj instanceof LocalZonedTimestampData) {
+            return ((LocalZonedTimestampData) obj).toInstant();
+        }
+        throw new RuntimeException(
+                "Cannot convert " + obj + " of type " + obj.getClass() + " to 
INSTANT.");
+    }
+
+    static List<?> convertToList(Object obj, ArrayType arrayType) {
+        if (obj instanceof List) {
+            return (List<?>) obj;
+        }
+        if (obj instanceof ArrayData) {
+            ArrayData arrayData = (ArrayData) obj;
+            DataType elementType = arrayType.getElementType();
+            ArrayData.ElementGetter elementGetter = 
ArrayData.createElementGetter(elementType);
+            List<Object> convertedObjects = new ArrayList<>(arrayData.size());
+            for (int i = 0; i < arrayData.size(); i++) {
+                convertedObjects.add(
+                        JavaObjectConverter.convertToJava(
+                                elementGetter.getElementOrNull(arrayData, i), 
elementType));
+            }
+            return convertedObjects;
+        }
+        throw new RuntimeException(
+                "Cannot convert "
+                        + obj
+                        + " of type "
+                        + obj.getClass()
+                        + " to LIST ("
+                        + arrayType
+                        + ").");
+    }
+
+    static Map<?, ?> convertToMap(Object obj, MapType mapType) {
+        if (obj instanceof Map) {
+            return (Map<?, ?>) obj;
+        }
+        if (obj instanceof MapData) {
+            MapData mapData = (MapData) obj;
+            DataType keyType = mapType.getKeyType();
+            DataType valueType = mapType.getValueType();
+            ArrayData keyArray = mapData.keyArray();
+            ArrayData valueArray = mapData.valueArray();
+            List<?> keyObjects = convertToList(keyArray, new 
ArrayType(keyType));
+            List<?> valueObjects = convertToList(valueArray, new 
ArrayType(valueType));
+            Map<Object, Object> convertedMap = new HashMap<>(mapData.size());
+            for (int i = 0; i < mapData.size(); i++) {
+                convertedMap.put(keyObjects.get(i), valueObjects.get(i));
+            }
+            return convertedMap;
+        }
+        throw new RuntimeException(
+                "Cannot convert "
+                        + obj
+                        + " of type "
+                        + obj.getClass()
+                        + " to MAP ("
+                        + mapType
+                        + ").");
+    }
+
+    static List<?> convertToRow(Object obj, RowType rowType) {
+        if (obj instanceof List) {
+            return (List<?>) obj;
+        }
+        if (obj instanceof RecordData) {
+            RecordData recordData = (RecordData) obj;
+            List<DataType> dataTypes = rowType.getFieldTypes();
+            List<RecordData.FieldGetter> fieldGetters =
+                    SchemaUtils.createFieldGetters(dataTypes.toArray(new 
DataType[0]));
+            List<Object> objects = new ArrayList<>(recordData.getArity());
+            for (int i = 0; i < fieldGetters.size(); i++) {
+                objects.add(
+                        JavaObjectConverter.convertToJava(
+                                
fieldGetters.get(i).getFieldOrNull(recordData), dataTypes.get(i)));
+            }
+            return objects;
+        }
+        throw new RuntimeException(
+                "Cannot convert "
+                        + obj
+                        + " of type "
+                        + obj.getClass()
+                        + " to ROW ("
+                        + rowType
+                        + ").");
+    }
+}
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalClassConverter.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalClassConverter.java
new file mode 100644
index 000000000..fd860f5c9
--- /dev/null
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalClassConverter.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.BooleanType;
+import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeVisitor;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.IntType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.VariantType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.cdc.common.types.variant.Variant;
+
+/**
+ * Converts a {@link org.apache.flink.cdc.common.types.DataType} to its CDC 
Internal representation.
+ */
+public class InternalClassConverter implements DataTypeVisitor<Class<?>> {
+
+    public static final InternalClassConverter INSTANCE = new 
InternalClassConverter();
+
+    public static Class<?> toInternalClass(DataType dataType) {
+        return dataType.accept(INSTANCE);
+    }
+
+    private InternalClassConverter() {
+        // No instantiation.
+    }
+
+    @Override
+    public Class<?> visit(CharType charType) {
+        return StringData.class;
+    }
+
+    @Override
+    public Class<?> visit(VarCharType varCharType) {
+        return StringData.class;
+    }
+
+    @Override
+    public Class<?> visit(BooleanType booleanType) {
+        return Boolean.class;
+    }
+
+    @Override
+    public Class<?> visit(BinaryType binaryType) {
+        return byte[].class;
+    }
+
+    @Override
+    public Class<?> visit(VarBinaryType varBinaryType) {
+        return byte[].class;
+    }
+
+    @Override
+    public Class<?> visit(DecimalType decimalType) {
+        return DecimalData.class;
+    }
+
+    @Override
+    public Class<?> visit(TinyIntType tinyIntType) {
+        return Byte.class;
+    }
+
+    @Override
+    public Class<?> visit(SmallIntType smallIntType) {
+        return Short.class;
+    }
+
+    @Override
+    public Class<?> visit(IntType intType) {
+        return Integer.class;
+    }
+
+    @Override
+    public Class<?> visit(BigIntType bigIntType) {
+        return Long.class;
+    }
+
+    @Override
+    public Class<?> visit(FloatType floatType) {
+        return Float.class;
+    }
+
+    @Override
+    public Class<?> visit(DoubleType doubleType) {
+        return Double.class;
+    }
+
+    @Override
+    public Class<?> visit(DateType dateType) {
+        return DateData.class;
+    }
+
+    @Override
+    public Class<?> visit(TimeType timeType) {
+        return TimeData.class;
+    }
+
+    @Override
+    public Class<?> visit(TimestampType timestampType) {
+        return TimestampData.class;
+    }
+
+    @Override
+    public Class<?> visit(ZonedTimestampType zonedTimestampType) {
+        return ZonedTimestampData.class;
+    }
+
+    @Override
+    public Class<?> visit(LocalZonedTimestampType localZonedTimestampType) {
+        return LocalZonedTimestampData.class;
+    }
+
+    @Override
+    public Class<?> visit(ArrayType arrayType) {
+        return ArrayData.class;
+    }
+
+    @Override
+    public Class<?> visit(MapType mapType) {
+        return MapData.class;
+    }
+
+    @Override
+    public Class<?> visit(RowType rowType) {
+        return RecordData.class;
+    }
+
+    @Override
+    public Class<?> visit(VariantType variantType) {
+        return Variant.class;
+    }
+}
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalObjectConverter.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalObjectConverter.java
new file mode 100644
index 000000000..7f1cc97bd
--- /dev/null
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/InternalObjectConverter.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.BooleanType;
+import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeVisitor;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.IntType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.VariantType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+
+import java.util.function.Function;
+
+/** Common converters from Java objects to CDC Internal objects. */
+public class InternalObjectConverter {
+
+    private static final ToInternalObjectConverter converter = new 
ToInternalObjectConverter();
+
+    static class ToInternalObjectConverter implements 
DataTypeVisitor<Function<Object, ?>> {
+
+        @Override
+        public Function<Object, StringData> visit(CharType charType) {
+            return CommonConverter::convertToStringData;
+        }
+
+        @Override
+        public Function<Object, StringData> visit(VarCharType varCharType) {
+            return CommonConverter::convertToStringData;
+        }
+
+        @Override
+        public Function<Object, Boolean> visit(BooleanType booleanType) {
+            return CommonConverter::convertToBoolean;
+        }
+
+        @Override
+        public Function<Object, byte[]> visit(BinaryType binaryType) {
+            return CommonConverter::convertToBinary;
+        }
+
+        @Override
+        public Function<Object, byte[]> visit(VarBinaryType varBinaryType) {
+            return CommonConverter::convertToBinary;
+        }
+
+        @Override
+        public Function<Object, DecimalData> visit(DecimalType decimalType) {
+            return CommonConverter::convertToDecimalData;
+        }
+
+        @Override
+        public Function<Object, Byte> visit(TinyIntType tinyIntType) {
+            return CommonConverter::convertToByte;
+        }
+
+        @Override
+        public Function<Object, Short> visit(SmallIntType smallIntType) {
+            return CommonConverter::convertToShort;
+        }
+
+        @Override
+        public Function<Object, Integer> visit(IntType intType) {
+            return CommonConverter::convertToInt;
+        }
+
+        @Override
+        public Function<Object, Long> visit(BigIntType bigIntType) {
+            return CommonConverter::convertToLong;
+        }
+
+        @Override
+        public Function<Object, Float> visit(FloatType floatType) {
+            return CommonConverter::convertToFloat;
+        }
+
+        @Override
+        public Function<Object, Double> visit(DoubleType doubleType) {
+            return CommonConverter::convertToDouble;
+        }
+
+        @Override
+        public Function<Object, DateData> visit(DateType dateType) {
+            return CommonConverter::convertToDateData;
+        }
+
+        @Override
+        public Function<Object, TimeData> visit(TimeType timeType) {
+            return CommonConverter::convertToTimeData;
+        }
+
+        @Override
+        public Function<Object, TimestampData> visit(TimestampType 
timestampType) {
+            return CommonConverter::convertToTimestampData;
+        }
+
+        @Override
+        public Function<Object, ZonedTimestampData> visit(ZonedTimestampType 
zonedTimestampType) {
+            return CommonConverter::convertToZonedTimestampData;
+        }
+
+        @Override
+        public Function<Object, LocalZonedTimestampData> visit(
+                LocalZonedTimestampType localZonedTimestampType) {
+            return CommonConverter::convertToLocalZonedTimestampData;
+        }
+
+        @Override
+        public Function<Object, ArrayData> visit(ArrayType arrayType) {
+            return o -> CommonConverter.convertToArrayData(o, arrayType);
+        }
+
+        @Override
+        public Function<Object, MapData> visit(MapType mapType) {
+            return o -> CommonConverter.convertToMapData(o, mapType);
+        }
+
+        @Override
+        public Function<Object, RecordData> visit(RowType rowType) {
+            return o -> CommonConverter.convertToRowData(o, rowType);
+        }
+
+        @Override
+        public Function<Object, ?> visit(VariantType variantType) {
+            return o -> CommonConverter.convertToVariant(o, variantType);
+        }
+    }
+
+    public static Object convertToInternal(Object obj, DataType dataType) {
+        if (obj == null) {
+            return null;
+        }
+        return dataType.accept(converter).apply(obj);
+    }
+}
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaClassConverter.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaClassConverter.java
new file mode 100644
index 000000000..2a1ffbfea
--- /dev/null
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaClassConverter.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.BooleanType;
+import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeVisitor;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.IntType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.VariantType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.cdc.common.types.variant.Variant;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZonedDateTime;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Converts a {@link org.apache.flink.cdc.common.types.DataType} to its Java 
representation that
+ * could be used in UDF manipulation.
+ */
+public class JavaClassConverter implements DataTypeVisitor<Class<?>> {
+
+    public static final JavaClassConverter INSTANCE = new JavaClassConverter();
+
+    public static Class<?> toJavaClass(DataType dataType) {
+        return dataType.accept(INSTANCE);
+    }
+
+    private JavaClassConverter() {
+        // No instantiation.
+    }
+
+    @Override
+    public Class<?> visit(CharType charType) {
+        return String.class;
+    }
+
+    @Override
+    public Class<?> visit(VarCharType varCharType) {
+        return String.class;
+    }
+
+    @Override
+    public Class<?> visit(BooleanType booleanType) {
+        return Boolean.class;
+    }
+
+    @Override
+    public Class<?> visit(BinaryType binaryType) {
+        return byte[].class;
+    }
+
+    @Override
+    public Class<?> visit(VarBinaryType varBinaryType) {
+        return byte[].class;
+    }
+
+    @Override
+    public Class<?> visit(DecimalType decimalType) {
+        return BigDecimal.class;
+    }
+
+    @Override
+    public Class<?> visit(TinyIntType tinyIntType) {
+        return Byte.class;
+    }
+
+    @Override
+    public Class<?> visit(SmallIntType smallIntType) {
+        return Short.class;
+    }
+
+    @Override
+    public Class<?> visit(IntType intType) {
+        return Integer.class;
+    }
+
+    @Override
+    public Class<?> visit(BigIntType bigIntType) {
+        return Long.class;
+    }
+
+    @Override
+    public Class<?> visit(FloatType floatType) {
+        return Float.class;
+    }
+
+    @Override
+    public Class<?> visit(DoubleType doubleType) {
+        return Double.class;
+    }
+
+    @Override
+    public Class<?> visit(DateType dateType) {
+        return LocalDate.class;
+    }
+
+    @Override
+    public Class<?> visit(TimeType timeType) {
+        return LocalTime.class;
+    }
+
+    @Override
+    public Class<?> visit(TimestampType timestampType) {
+        return LocalDateTime.class;
+    }
+
+    @Override
+    public Class<?> visit(ZonedTimestampType zonedTimestampType) {
+        return ZonedDateTime.class;
+    }
+
+    @Override
+    public Class<?> visit(LocalZonedTimestampType localZonedTimestampType) {
+        return Instant.class;
+    }
+
+    @Override
+    public Class<?> visit(ArrayType arrayType) {
+        return List.class;
+    }
+
+    @Override
+    public Class<?> visit(MapType mapType) {
+        return Map.class;
+    }
+
+    @Override
+    public Class<?> visit(RowType rowType) {
+        return List.class;
+    }
+
+    @Override
+    public Class<?> visit(VariantType variantType) {
+        return Variant.class;
+    }
+}
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaObjectConverter.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaObjectConverter.java
new file mode 100644
index 000000000..ddb848a16
--- /dev/null
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/converter/JavaObjectConverter.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.BigIntType;
+import org.apache.flink.cdc.common.types.BinaryType;
+import org.apache.flink.cdc.common.types.BooleanType;
+import org.apache.flink.cdc.common.types.CharType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeVisitor;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.DoubleType;
+import org.apache.flink.cdc.common.types.FloatType;
+import org.apache.flink.cdc.common.types.IntType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.SmallIntType;
+import org.apache.flink.cdc.common.types.TimeType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.TinyIntType;
+import org.apache.flink.cdc.common.types.VarBinaryType;
+import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.VariantType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZonedDateTime;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/** Common converters from CDC Internal type to Java type. */
+public class JavaObjectConverter {
+
+    private static final ToJavaObjectConverter converter = new 
ToJavaObjectConverter();
+
+    static class ToJavaObjectConverter implements 
DataTypeVisitor<Function<Object, ?>> {
+        @Override
+        public Function<Object, String> visit(CharType charType) {
+            return CommonConverter::convertToString;
+        }
+
+        @Override
+        public Function<Object, String> visit(VarCharType varCharType) {
+            return CommonConverter::convertToString;
+        }
+
+        @Override
+        public Function<Object, Boolean> visit(BooleanType booleanType) {
+            return CommonConverter::convertToBoolean;
+        }
+
+        @Override
+        public Function<Object, byte[]> visit(BinaryType binaryType) {
+            return CommonConverter::convertToBinary;
+        }
+
+        @Override
+        public Function<Object, byte[]> visit(VarBinaryType varBinaryType) {
+            return CommonConverter::convertToBinary;
+        }
+
+        @Override
+        public Function<Object, BigDecimal> visit(DecimalType decimalType) {
+            return CommonConverter::convertToBigDecimal;
+        }
+
+        @Override
+        public Function<Object, Byte> visit(TinyIntType tinyIntType) {
+            return CommonConverter::convertToByte;
+        }
+
+        @Override
+        public Function<Object, Short> visit(SmallIntType smallIntType) {
+            return CommonConverter::convertToShort;
+        }
+
+        @Override
+        public Function<Object, Integer> visit(IntType intType) {
+            return CommonConverter::convertToInt;
+        }
+
+        @Override
+        public Function<Object, Long> visit(BigIntType bigIntType) {
+            return CommonConverter::convertToLong;
+        }
+
+        @Override
+        public Function<Object, Float> visit(FloatType floatType) {
+            return CommonConverter::convertToFloat;
+        }
+
+        @Override
+        public Function<Object, Double> visit(DoubleType doubleType) {
+            return CommonConverter::convertToDouble;
+        }
+
+        @Override
+        public Function<Object, LocalDate> visit(DateType dateType) {
+            return CommonConverter::convertToLocalDate;
+        }
+
+        @Override
+        public Function<Object, LocalTime> visit(TimeType timeType) {
+            return CommonConverter::convertToLocalTime;
+        }
+
+        @Override
+        public Function<Object, LocalDateTime> visit(TimestampType 
timestampType) {
+            return CommonConverter::convertToLocalDateTime;
+        }
+
+        @Override
+        public Function<Object, ZonedDateTime> visit(ZonedTimestampType 
zonedTimestampType) {
+            return CommonConverter::convertToZonedDateTime;
+        }
+
+        @Override
+        public Function<Object, Instant> visit(LocalZonedTimestampType 
localZonedTimestampType) {
+            return CommonConverter::convertToInstant;
+        }
+
+        @Override
+        public Function<Object, List<?>> visit(ArrayType arrayType) {
+            return o -> CommonConverter.convertToList(o, arrayType);
+        }
+
+        @Override
+        public Function<Object, Map<?, ?>> visit(MapType mapType) {
+            return o -> CommonConverter.convertToMap(o, mapType);
+        }
+
+        @Override
+        public Function<Object, List<?>> visit(RowType rowType) {
+            return o -> CommonConverter.convertToRow(o, rowType);
+        }
+
+        @Override
+        public Function<Object, ?> visit(VariantType variantType) {
+            return o -> CommonConverter.convertToVariant(o, variantType);
+        }
+    }
+
+    public static Object convertToJava(Object obj, DataType dataType) {
+        if (obj == null) {
+            return null;
+        }
+        return dataType.accept(converter).apply(obj);
+    }
+}
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericArrayData.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericArrayData.java
index f87905c43..22a2b9336 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericArrayData.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericArrayData.java
@@ -341,4 +341,9 @@ public final class GenericArrayData implements ArrayData {
         checkNoNull();
         return ArrayUtils.toPrimitive((Double[]) array);
     }
+
+    @Override
+    public String toString() {
+        return Arrays.asList(toObjectArray()).toString();
+    }
 }
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericRecordData.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericRecordData.java
new file mode 100644
index 000000000..87540674f
--- /dev/null
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/GenericRecordData.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.data;
+
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.types.variant.Variant;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * An internal data structure representing data of {@link RecordData}.
+ *
+ * <p>{@link GenericRecordData} is a generic implementation of {@link 
RecordData} which is backed by
+ * an array of Java {@link Object}. A {@link GenericRecordData} can have an 
arbitrary number of
+ * fields of different types. The fields in a row can be accessed by position 
(0-based) using either
+ * the generic {@link #getField(int)} or type-specific getters (such as {@link 
#getInt(int)}). A
+ * field can be updated by the generic {@link #setField(int, Object)}.
+ *
+ * <p>Note: All fields of this data structure must be internal data 
structures. See {@link
+ * RecordData} for more information about internal data structures.
+ *
+ * <p>The fields in {@link GenericRecordData} can be null for representing 
nullability.
+ */
+@PublicEvolving
+public final class GenericRecordData implements RecordData {
+
+    /** The array to store the actual internal format values. */
+    private final Object[] fields;
+
+    /**
+     * Creates an instance of {@link GenericRecordData} with given number of 
fields.
+     *
+     * <p>Initially, all fields are set to null.
+     *
+     * <p>Note: All fields of the row must be internal data structures.
+     *
+     * @param arity number of fields
+     */
+    public GenericRecordData(int arity) {
+        this.fields = new Object[arity];
+    }
+
+    /**
+     * Sets the field value at the given position.
+     *
+     * <p>Note: The given field value must be an internal data structures. 
Otherwise the {@link
+     * GenericRecordData} is corrupted and may throw exception when 
processing. See {@link
+     * RecordData} for more information about internal data structures.
+     *
+     * <p>The field value can be null for representing nullability.
+     */
+    public void setField(int pos, Object value) {
+        this.fields[pos] = value;
+    }
+
+    /**
+     * Returns the field value at the given position.
+     *
+     * <p>Note: The returned value is in internal data structure. See {@link 
RecordData} for more
+     * information about internal data structures.
+     *
+     * <p>The returned field value can be null for representing nullability.
+     */
+    public Object getField(int pos) {
+        return this.fields[pos];
+    }
+
+    @Override
+    public int getArity() {
+        return fields.length;
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return this.fields[pos] == null;
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return (boolean) this.fields[pos];
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return (byte) this.fields[pos];
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return (short) this.fields[pos];
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return (int) this.fields[pos];
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return (long) this.fields[pos];
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return (float) this.fields[pos];
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return (double) this.fields[pos];
+    }
+
+    @Override
+    public StringData getString(int pos) {
+        return (StringData) this.fields[pos];
+    }
+
+    @Override
+    public DecimalData getDecimal(int pos, int precision, int scale) {
+        return (DecimalData) this.fields[pos];
+    }
+
+    @Override
+    public TimestampData getTimestamp(int pos, int precision) {
+        return (TimestampData) this.fields[pos];
+    }
+
+    @Override
+    public ZonedTimestampData getZonedTimestamp(int pos, int precision) {
+        return (ZonedTimestampData) this.fields[pos];
+    }
+
+    @Override
+    public LocalZonedTimestampData getLocalZonedTimestampData(int pos, int 
precision) {
+        return (LocalZonedTimestampData) this.fields[pos];
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return (byte[]) this.fields[pos];
+    }
+
+    @Override
+    public ArrayData getArray(int pos) {
+        return (ArrayData) this.fields[pos];
+    }
+
+    @Override
+    public MapData getMap(int pos) {
+        return (MapData) this.fields[pos];
+    }
+
+    @Override
+    public RecordData getRow(int pos, int numFields) {
+        return (RecordData) this.fields[pos];
+    }
+
+    @Override
+    public DateData getDate(int pos) {
+        return (DateData) this.fields[pos];
+    }
+
+    @Override
+    public TimeData getTime(int pos) {
+        return (TimeData) this.fields[pos];
+    }
+
+    @Override
+    public Variant getVariant(int pos) {
+        return (Variant) this.fields[pos];
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof GenericRecordData)) {
+            return false;
+        }
+        GenericRecordData that = (GenericRecordData) o;
+        return Arrays.deepEquals(fields, that.fields);
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.deepHashCode(fields);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("(");
+        for (int i = 0; i < fields.length; i++) {
+            if (i != 0) {
+                sb.append(",");
+            }
+            sb.append(Objects.toString(fields[i]));
+        }
+        sb.append(")");
+        return sb.toString();
+    }
+
+    // 
----------------------------------------------------------------------------------------
+    // Utilities
+    // 
----------------------------------------------------------------------------------------
+
+    /**
+     * Creates an instance of {@link GenericRecordData} with given field 
values.
+     *
+     * <p>Note: All fields of the row must be internal data structures.
+     */
+    public static GenericRecordData of(Object... values) {
+        GenericRecordData row = new GenericRecordData(values.length);
+
+        for (int i = 0; i < values.length; ++i) {
+            row.setField(i, values[i]);
+        }
+
+        return row;
+    }
+}
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
index eaea127cf..0d95f050b 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
@@ -59,7 +59,9 @@ import org.apache.flink.cdc.common.types.TimestampType;
 import org.apache.flink.cdc.common.types.TinyIntType;
 import org.apache.flink.cdc.common.types.VarBinaryType;
 import org.apache.flink.cdc.common.types.VarCharType;
+import org.apache.flink.cdc.common.types.VariantType;
 import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.cdc.common.types.variant.Variant;
 
 import 
org.apache.flink.shaded.guava31.com.google.common.collect.ArrayListMultimap;
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
@@ -613,7 +615,7 @@ public class SchemaMergingUtils {
     }
 
     @VisibleForTesting
-    static Object coerceObject(
+    public static Object coerceObject(
             String timezone,
             Object originalField,
             DataType originalType,
@@ -733,6 +735,10 @@ public class SchemaMergingUtils {
             return BinaryStringData.fromString(hexlify((byte[]) 
originalField));
         }
 
+        if (originalField instanceof Variant) {
+            return BinaryStringData.fromString(((Variant) 
originalField).toJson());
+        }
+
         return BinaryStringData.fromString(originalField.toString());
     }
 
@@ -1046,6 +1052,7 @@ public class SchemaMergingUtils {
         mergingTree.put(RowType.class, ImmutableList.of(stringType));
         mergingTree.put(ArrayType.class, ImmutableList.of(stringType));
         mergingTree.put(MapType.class, ImmutableList.of(stringType));
+        mergingTree.put(VariantType.class, ImmutableList.of(stringType));
         return mergingTree;
     }
 }
diff --git 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalClassConverterTest.java
 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalClassConverterTest.java
new file mode 100644
index 000000000..55e78ddb0
--- /dev/null
+++ 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalClassConverterTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test cases for {@link InternalClassConverter}. */
+class InternalClassConverterTest {
+
+    @Test
+    void testConvertingFullTypes() {
+        assertThat(
+                        Stream.of(
+                                DataTypes.BOOLEAN(),
+                                DataTypes.BYTES(),
+                                DataTypes.BINARY(10),
+                                DataTypes.VARBINARY(10),
+                                DataTypes.CHAR(10),
+                                DataTypes.VARCHAR(10),
+                                DataTypes.STRING(),
+                                DataTypes.INT(),
+                                DataTypes.TINYINT(),
+                                DataTypes.SMALLINT(),
+                                DataTypes.BIGINT(),
+                                DataTypes.DOUBLE(),
+                                DataTypes.FLOAT(),
+                                DataTypes.DECIMAL(6, 3),
+                                DataTypes.DATE(),
+                                DataTypes.TIME(),
+                                DataTypes.TIME(6),
+                                DataTypes.TIMESTAMP(),
+                                DataTypes.TIMESTAMP(6),
+                                DataTypes.TIMESTAMP_LTZ(),
+                                DataTypes.TIMESTAMP_LTZ(6),
+                                DataTypes.TIMESTAMP_TZ(),
+                                DataTypes.TIMESTAMP_TZ(6),
+                                DataTypes.ARRAY(DataTypes.BIGINT()),
+                                DataTypes.MAP(DataTypes.SMALLINT(), 
DataTypes.STRING()),
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("f1", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f2", 
DataTypes.STRING(), "desc")),
+                                DataTypes.ROW(DataTypes.SMALLINT(), 
DataTypes.STRING()),
+                                DataTypes.VARIANT()))
+                .map(InternalClassConverter::toInternalClass)
+                .map(Class::getCanonicalName)
+                .containsExactly(
+                        "java.lang.Boolean",
+                        "byte[]",
+                        "byte[]",
+                        "byte[]",
+                        "org.apache.flink.cdc.common.data.StringData",
+                        "org.apache.flink.cdc.common.data.StringData",
+                        "org.apache.flink.cdc.common.data.StringData",
+                        "java.lang.Integer",
+                        "java.lang.Byte",
+                        "java.lang.Short",
+                        "java.lang.Long",
+                        "java.lang.Double",
+                        "java.lang.Float",
+                        "org.apache.flink.cdc.common.data.DecimalData",
+                        "org.apache.flink.cdc.common.data.DateData",
+                        "org.apache.flink.cdc.common.data.TimeData",
+                        "org.apache.flink.cdc.common.data.TimeData",
+                        "org.apache.flink.cdc.common.data.TimestampData",
+                        "org.apache.flink.cdc.common.data.TimestampData",
+                        
"org.apache.flink.cdc.common.data.LocalZonedTimestampData",
+                        
"org.apache.flink.cdc.common.data.LocalZonedTimestampData",
+                        "org.apache.flink.cdc.common.data.ZonedTimestampData",
+                        "org.apache.flink.cdc.common.data.ZonedTimestampData",
+                        "org.apache.flink.cdc.common.data.ArrayData",
+                        "org.apache.flink.cdc.common.data.MapData",
+                        "org.apache.flink.cdc.common.data.RecordData",
+                        "org.apache.flink.cdc.common.data.RecordData",
+                        "org.apache.flink.cdc.common.types.variant.Variant");
+    }
+}
diff --git 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalObjectConverterTest.java
 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalObjectConverterTest.java
new file mode 100644
index 000000000..2a4c3e1b3
--- /dev/null
+++ 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/InternalObjectConverterTest.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.cdc.common.converter.InternalObjectConverter.convertToInternal;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test cases for {@link InternalObjectConverter}. */
+class InternalObjectConverterTest {
+
+    @Test
+    void testConvertToBoolean() {
+        assertThat(convertToInternal(true, 
DataTypes.BOOLEAN())).isEqualTo(true);
+        assertThat(convertToInternal(false, 
DataTypes.BOOLEAN())).isEqualTo(false);
+        assertThat(convertToInternal(null, DataTypes.BOOLEAN())).isNull();
+    }
+
+    @Test
+    void testConvertToBytes() {
+        assertThat(convertToInternal("Alice".getBytes(), DataTypes.BYTES()))
+                .isInstanceOf(byte[].class)
+                .extracting(byte[].class::cast)
+                .extracting(Arrays::toString)
+                .isEqualTo("[65, 108, 105, 99, 101]");
+        assertThat(
+                        convertToInternal(
+                                new byte[] {(byte) 0xca, (byte) 0xfe, (byte) 
0xba, (byte) 0xbe},
+                                DataTypes.BYTES()))
+                .isInstanceOf(byte[].class)
+                .extracting(byte[].class::cast)
+                .extracting(Arrays::toString)
+                .isEqualTo("[-54, -2, -70, -66]");
+        assertThat(convertToInternal(null, DataTypes.BYTES())).isNull();
+    }
+
+    @Test
+    void testConvertToBinary() {
+        assertThat(convertToInternal("Alice".getBytes(), DataTypes.BINARY(5)))
+                .isEqualTo(new byte[] {65, 108, 105, 99, 101});
+        assertThat(
+                        convertToInternal(
+                                new byte[] {(byte) 0xca, (byte) 0xfe, (byte) 
0xba, (byte) 0xbe},
+                                DataTypes.BINARY(4)))
+                .isEqualTo(new byte[] {-54, -2, -70, -66});
+        assertThat(convertToInternal(null, DataTypes.BINARY(3))).isNull();
+    }
+
+    @Test
+    void testConvertToVarBinary() {
+        assertThat(convertToInternal("Alice".getBytes(), 
DataTypes.VARBINARY(5)))
+                .isEqualTo(new byte[] {65, 108, 105, 99, 101});
+        assertThat(
+                        convertToInternal(
+                                new byte[] {(byte) 0xca, (byte) 0xfe, (byte) 
0xba, (byte) 0xbe},
+                                DataTypes.VARBINARY(4)))
+                .isEqualTo(new byte[] {-54, -2, -70, -66});
+        assertThat(convertToInternal(null, DataTypes.VARBINARY(3))).isNull();
+    }
+
+    @Test
+    void testConvertToChar() {
+        assertThat(convertToInternal("Alice", DataTypes.CHAR(5)))
+                .isInstanceOf(StringData.class)
+                .hasToString("Alice");
+
+        assertThat(convertToInternal(BinaryStringData.fromString("Bob"), 
DataTypes.CHAR(5)))
+                .isInstanceOf(StringData.class)
+                .hasToString("Bob");
+
+        assertThat(convertToInternal(null, DataTypes.CHAR(5))).isNull();
+    }
+
+    @Test
+    void testConvertToVarChar() {
+        assertThat(convertToInternal("Alice", DataTypes.VARCHAR(5)))
+                .isInstanceOf(StringData.class)
+                .hasToString("Alice");
+
+        assertThat(convertToInternal(BinaryStringData.fromString("Bob"), 
DataTypes.VARCHAR(5)))
+                .isInstanceOf(StringData.class)
+                .hasToString("Bob");
+
+        assertThat(convertToInternal(null, DataTypes.VARCHAR(5))).isNull();
+    }
+
+    @Test
+    void testConvertToString() {
+        assertThat(convertToInternal("Alice", DataTypes.STRING()))
+                .isInstanceOf(StringData.class)
+                .hasToString("Alice");
+        assertThat(convertToInternal(BinaryStringData.fromString("Bob"), 
DataTypes.STRING()))
+                .isInstanceOf(StringData.class)
+                .hasToString("Bob");
+        assertThat(convertToInternal(null, DataTypes.STRING())).isNull();
+    }
+
+    @Test
+    void testConvertToInt() {
+        assertThat(convertToInternal(11, DataTypes.INT())).isEqualTo(11);
+        assertThat(convertToInternal(-14, DataTypes.INT())).isEqualTo(-14);
+        assertThat(convertToInternal(17, DataTypes.INT())).isEqualTo(17);
+        assertThat(convertToInternal(null, DataTypes.INT())).isNull();
+    }
+
+    @Test
+    void testConvertToTinyInt() {
+        assertThat(convertToInternal((byte) 11, 
DataTypes.TINYINT())).isEqualTo((byte) 11);
+        assertThat(convertToInternal((byte) -14, 
DataTypes.TINYINT())).isEqualTo((byte) -14);
+        assertThat(convertToInternal((byte) 17, 
DataTypes.TINYINT())).isEqualTo((byte) 17);
+        assertThat(convertToInternal(null, DataTypes.TINYINT())).isNull();
+    }
+
+    @Test
+    void testConvertToSmallInt() {
+        assertThat(convertToInternal((short) 11, 
DataTypes.SMALLINT())).isEqualTo((short) 11);
+        assertThat(convertToInternal((short) -14, 
DataTypes.SMALLINT())).isEqualTo((short) -14);
+        assertThat(convertToInternal((short) 17, 
DataTypes.SMALLINT())).isEqualTo((short) 17);
+        assertThat(convertToInternal(null, DataTypes.SMALLINT())).isNull();
+    }
+
+    @Test
+    void testConvertToBigInt() {
+        assertThat(convertToInternal((long) 11, 
DataTypes.BIGINT())).isEqualTo((long) 11);
+        assertThat(convertToInternal((long) -14, 
DataTypes.BIGINT())).isEqualTo((long) -14);
+        assertThat(convertToInternal((long) 17, 
DataTypes.BIGINT())).isEqualTo((long) 17);
+        assertThat(convertToInternal(null, DataTypes.BIGINT())).isNull();
+    }
+
+    @Test
+    void testConvertToFloat() {
+        assertThat(convertToInternal((float) 11, 
DataTypes.FLOAT())).isEqualTo((float) 11);
+        assertThat(convertToInternal((float) -14, 
DataTypes.FLOAT())).isEqualTo((float) -14);
+        assertThat(convertToInternal((float) 17, 
DataTypes.FLOAT())).isEqualTo((float) 17);
+        assertThat(convertToInternal(null, DataTypes.FLOAT())).isNull();
+    }
+
+    @Test
+    void testConvertToDouble() {
+        assertThat(convertToInternal((double) 11, 
DataTypes.DOUBLE())).isEqualTo((double) 11);
+        assertThat(convertToInternal((double) -14, 
DataTypes.DOUBLE())).isEqualTo((double) -14);
+        assertThat(convertToInternal((double) 17, 
DataTypes.DOUBLE())).isEqualTo((double) 17);
+        assertThat(convertToInternal(null, DataTypes.DOUBLE())).isNull();
+    }
+
+    @Test
+    void testConvertToDecimal() {
+        assertThat(convertToInternal(new BigDecimal("4.2"), 
DataTypes.DECIMAL(2, 1)))
+                .isInstanceOf(DecimalData.class)
+                .hasToString("4.2");
+        assertThat(convertToInternal(new BigDecimal("-3.1415926"), 
DataTypes.DECIMAL(20, 10)))
+                .isInstanceOf(DecimalData.class)
+                .hasToString("-3.1415926");
+
+        assertThat(
+                        convertToInternal(
+                                DecimalData.fromUnscaledLong(42, 2, 1), 
DataTypes.DECIMAL(2, 1)))
+                .isInstanceOf(DecimalData.class)
+                .hasToString("4.2");
+        assertThat(
+                        convertToInternal(
+                                DecimalData.fromUnscaledLong(-31415926, 14, 7),
+                                DataTypes.DECIMAL(14, 7)))
+                .isInstanceOf(DecimalData.class)
+                .hasToString("-3.1415926");
+
+        assertThat(convertToInternal(null, DataTypes.DECIMAL(20, 
10))).isNull();
+    }
+
+    @Test
+    void testConvertToDate() {
+        assertThat(convertToInternal(LocalDate.of(2017, 12, 31), 
DataTypes.DATE()))
+                .isInstanceOf(DateData.class)
+                .hasToString("2017-12-31");
+        assertThat(convertToInternal(DateData.fromEpochDay(14417), 
DataTypes.DATE()))
+                .isInstanceOf(DateData.class)
+                .hasToString("2009-06-22");
+        assertThat(convertToInternal(null, DataTypes.DATE())).isNull();
+    }
+
+    @Test
+    void testConvertToTime() {
+        assertThat(convertToInternal(LocalTime.of(21, 48, 25), 
DataTypes.TIME(0)))
+                .isInstanceOf(TimeData.class)
+                .hasToString("21:48:25");
+        assertThat(convertToInternal(LocalTime.ofSecondOfDay(14419), 
DataTypes.TIME(0)))
+                .isInstanceOf(TimeData.class)
+                .hasToString("04:00:19");
+        assertThat(convertToInternal(LocalTime.of(21, 48, 25, 123456789), 
DataTypes.TIME(3)))
+                .isInstanceOf(TimeData.class)
+                .hasToString("21:48:25.123");
+        assertThat(convertToInternal(LocalTime.ofNanoOfDay(14419123456789L), 
DataTypes.TIME(3)))
+                .isInstanceOf(TimeData.class)
+                .hasToString("04:00:19.123");
+
+        assertThat(
+                        convertToInternal(
+                                TimeData.fromLocalTime(LocalTime.of(21, 48, 
25)),
+                                DataTypes.TIME(0)))
+                .isInstanceOf(TimeData.class)
+                .hasToString("21:48:25");
+        assertThat(convertToInternal(TimeData.fromSecondOfDay(14419), 
DataTypes.TIME(0)))
+                .isInstanceOf(TimeData.class)
+                .hasToString("04:00:19");
+        assertThat(
+                        convertToInternal(
+                                TimeData.fromLocalTime(LocalTime.of(21, 48, 
25, 123456789)),
+                                DataTypes.TIME(3)))
+                .isInstanceOf(TimeData.class)
+                .hasToString("21:48:25.123");
+        assertThat(convertToInternal(TimeData.fromNanoOfDay(14419123456789L), 
DataTypes.TIME(3)))
+                .isInstanceOf(TimeData.class)
+                .hasToString("04:00:19.123");
+        assertThat(convertToInternal(null, DataTypes.TIME())).isNull();
+    }
+
+    @Test
+    void testConvertToTimestamp() {
+        assertThat(
+                        convertToInternal(
+                                TimestampData.fromMillis(2147483648491L), 
DataTypes.TIMESTAMP(3)))
+                .isInstanceOf(TimestampData.class)
+                .hasToString("2038-01-19T03:14:08.491");
+        assertThat(
+                        convertToInternal(
+                                LocalDateTime.of(2019, 12, 25, 21, 48, 25, 
123456789),
+                                DataTypes.TIMESTAMP(9)))
+                .isInstanceOf(TimestampData.class)
+                .hasToString("2019-12-25T21:48:25.123456789");
+        assertThat(convertToInternal(null, DataTypes.TIMESTAMP())).isNull();
+    }
+
+    @Test
+    void testConvertToZonedTimestamp() {
+        assertThat(
+                        convertToInternal(
+                                ZonedTimestampData.of(2143658709L, 0, "UTC"),
+                                DataTypes.TIMESTAMP_TZ(3)))
+                .isInstanceOf(ZonedTimestampData.class)
+                .hasToString("1970-01-25T19:27:38.709Z");
+        assertThat(
+                        convertToInternal(
+                                ZonedDateTime.of(
+                                        2019,
+                                        12,
+                                        25,
+                                        21,
+                                        48,
+                                        25,
+                                        123456789,
+                                        ZoneId.of("UTC+08:00")),
+                                DataTypes.TIMESTAMP_TZ(9)))
+                .isInstanceOf(ZonedTimestampData.class)
+                .hasToString("2019-12-25T21:48:25.123456789+08:00");
+        assertThat(convertToInternal(null, DataTypes.TIMESTAMP_TZ())).isNull();
+    }
+
+    @Test
+    void testConvertToLocalZonedTimestamp() {
+        assertThat(
+                        convertToInternal(
+                                
LocalZonedTimestampData.fromEpochMillis(3141592653589L),
+                                DataTypes.TIMESTAMP_LTZ(3)))
+                .isInstanceOf(LocalZonedTimestampData.class)
+                .hasToString("2069-07-21T00:37:33.589");
+        assertThat(
+                        convertToInternal(
+                                Instant.ofEpochSecond(2718281828L, 123456789),
+                                DataTypes.TIMESTAMP_LTZ(9)))
+                .isInstanceOf(LocalZonedTimestampData.class)
+                .hasToString("2056-02-20T14:17:08.123456789");
+        assertThat(convertToInternal(null, 
DataTypes.TIMESTAMP_LTZ())).isNull();
+    }
+
+    @Test
+    void testConvertToArray() {
+        assertThat(
+                        convertToInternal(
+                                Arrays.asList("Alice", "Bob", "Charlie"),
+                                DataTypes.ARRAY(DataTypes.STRING())))
+                .isInstanceOf(ArrayData.class)
+                .hasToString("[Alice, Bob, Charlie]")
+                .extracting(ArrayData.class::cast)
+                .extracting(s -> s.getString(0))
+                .isInstanceOf(StringData.class);
+        assertThat(
+                        convertToInternal(
+                                new GenericArrayData(
+                                        new StringData[] {
+                                            
BinaryStringData.fromString("Derrida"),
+                                            
BinaryStringData.fromString("Enigma"),
+                                            BinaryStringData.fromString("Fall")
+                                        }),
+                                DataTypes.ARRAY(DataTypes.STRING())))
+                .isInstanceOf(ArrayData.class)
+                .hasToString("[Derrida, Enigma, Fall]")
+                .extracting(ArrayData.class::cast)
+                .extracting(s -> s.getString(0))
+                .isInstanceOf(StringData.class);
+        assertThat(convertToInternal(null, 
DataTypes.TIMESTAMP_LTZ())).isNull();
+    }
+
+    @Test
+    void testConvertToMap() {
+        MapType targetType =
+                DataTypes.MAP(
+                        DataTypes.STRING(), DataTypes.ROW(DataTypes.INT(), 
DataTypes.STRING()));
+        assertThat(
+                        convertToInternal(
+                                Map.of(
+                                        "Alice", List.of(5, "AILISI"),
+                                        "Bob", List.of(3, "BAOBO"),
+                                        "Cancan", List.of(6, "KANGKANG")),
+                                targetType))
+                .isInstanceOf(MapData.class)
+                .hasToString("{Alice=(5,AILISI), Cancan=(6,KANGKANG), 
Bob=(3,BAOBO)}")
+                .extracting(MapData.class::cast)
+                .extracting(MapData::keyArray, MapData::valueArray)
+                .map(Object::toString)
+                .containsExactly("[Alice, Cancan, Bob]", "[(5,AILISI), 
(6,KANGKANG), (3,BAOBO)]");
+
+        assertThat(
+                        convertToInternal(
+                                new GenericMapData(
+                                        Map.of(
+                                                
BinaryStringData.fromString("Derrida"),
+                                                GenericRecordData.of(
+                                                        7, 
BinaryStringData.fromString("DELIDA")))),
+                                targetType))
+                .isInstanceOf(MapData.class)
+                .hasToString("{Derrida=(7,DELIDA)}")
+                .extracting(MapData.class::cast)
+                .extracting(MapData::keyArray, MapData::valueArray)
+                .map(Object::toString)
+                .containsExactly("[Derrida]", "[(7,DELIDA)]");
+        assertThat(convertToInternal(null, targetType)).isNull();
+    }
+
+    @Test
+    void testConvertToRow() {
+        RowType targetType =
+                DataTypes.ROW(
+                        DataTypes.STRING(), DataTypes.ROW(DataTypes.INT(), 
DataTypes.STRING()));
+        assertThat(convertToInternal(List.of("Alice", List.of(5, "AILISI")), 
targetType))
+                .isInstanceOf(RecordData.class)
+                .hasToString("(Alice,(5,AILISI))")
+                .extracting(RecordData.class::cast)
+                .extracting(o -> o.getString(0))
+                .isInstanceOf(StringData.class)
+                .hasToString("Alice");
+
+        assertThat(convertToInternal(List.of("Bob", List.of(3, "BAOBO")), 
targetType))
+                .isInstanceOf(RecordData.class)
+                .hasToString("(Bob,(3,BAOBO))")
+                .extracting(RecordData.class::cast)
+                .extracting(o -> o.getRow(1, 2))
+                .isInstanceOf(RecordData.class)
+                .hasToString("(3,BAOBO)")
+                .extracting(o -> o.getString(1))
+                .isInstanceOf(StringData.class);
+
+        assertThat(convertToInternal(null, targetType)).isNull();
+    }
+}
diff --git 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/JavaClassConverterTest.java
 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/JavaClassConverterTest.java
new file mode 100644
index 000000000..fb84bd7a7
--- /dev/null
+++ 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/JavaClassConverterTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test cases for {@link JavaClassConverter}. */
+class JavaClassConverterTest {
+
+    @Test
+    void testConvertingFullTypes() {
+        assertThat(
+                        Stream.of(
+                                DataTypes.BOOLEAN(),
+                                DataTypes.BYTES(),
+                                DataTypes.BINARY(10),
+                                DataTypes.VARBINARY(10),
+                                DataTypes.CHAR(10),
+                                DataTypes.VARCHAR(10),
+                                DataTypes.STRING(),
+                                DataTypes.INT(),
+                                DataTypes.TINYINT(),
+                                DataTypes.SMALLINT(),
+                                DataTypes.BIGINT(),
+                                DataTypes.DOUBLE(),
+                                DataTypes.FLOAT(),
+                                DataTypes.DECIMAL(6, 3),
+                                DataTypes.DATE(),
+                                DataTypes.TIME(),
+                                DataTypes.TIME(6),
+                                DataTypes.TIMESTAMP(),
+                                DataTypes.TIMESTAMP(6),
+                                DataTypes.TIMESTAMP_LTZ(),
+                                DataTypes.TIMESTAMP_LTZ(6),
+                                DataTypes.TIMESTAMP_TZ(),
+                                DataTypes.TIMESTAMP_TZ(6),
+                                DataTypes.ARRAY(DataTypes.BIGINT()),
+                                DataTypes.MAP(DataTypes.SMALLINT(), 
DataTypes.STRING()),
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("f1", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f2", 
DataTypes.STRING(), "desc")),
+                                DataTypes.ROW(DataTypes.SMALLINT(), 
DataTypes.STRING()),
+                                DataTypes.VARIANT()))
+                .map(JavaClassConverter::toJavaClass)
+                .map(Class::getCanonicalName)
+                .containsExactly(
+                        "java.lang.Boolean",
+                        "byte[]",
+                        "byte[]",
+                        "byte[]",
+                        "java.lang.String",
+                        "java.lang.String",
+                        "java.lang.String",
+                        "java.lang.Integer",
+                        "java.lang.Byte",
+                        "java.lang.Short",
+                        "java.lang.Long",
+                        "java.lang.Double",
+                        "java.lang.Float",
+                        "java.math.BigDecimal",
+                        "java.time.LocalDate",
+                        "java.time.LocalTime",
+                        "java.time.LocalTime",
+                        "java.time.LocalDateTime",
+                        "java.time.LocalDateTime",
+                        "java.time.Instant",
+                        "java.time.Instant",
+                        "java.time.ZonedDateTime",
+                        "java.time.ZonedDateTime",
+                        "java.util.List",
+                        "java.util.Map",
+                        "java.util.List",
+                        "java.util.List",
+                        "org.apache.flink.cdc.common.types.variant.Variant");
+    }
+}
diff --git 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/JavaObjectConverterTest.java
 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/JavaObjectConverterTest.java
new file mode 100644
index 000000000..786ef63e2
--- /dev/null
+++ 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/JavaObjectConverterTest.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.cdc.common.converter.JavaObjectConverter.convertToJava;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.InstanceOfAssertFactories.list;
+import static org.assertj.core.api.InstanceOfAssertFactories.map;
+
+/** Unit test cases for {@link JavaObjectConverter}. */
+class JavaObjectConverterTest {
+
+    @Test
+    void testConvertToBoolean() {
+        assertThat(convertToJava(true, DataTypes.BOOLEAN())).isEqualTo(true);
+        assertThat(convertToJava(false, DataTypes.BOOLEAN())).isEqualTo(false);
+        assertThat(convertToJava(null, DataTypes.BOOLEAN())).isNull();
+    }
+
+    @Test
+    void testConvertToBytes() {
+        assertThat(convertToJava("Alice".getBytes(), DataTypes.BYTES()))
+                .isInstanceOf(byte[].class)
+                .extracting(byte[].class::cast)
+                .extracting(Arrays::toString)
+                .isEqualTo("[65, 108, 105, 99, 101]");
+        assertThat(
+                        convertToJava(
+                                new byte[] {(byte) 0xca, (byte) 0xfe, (byte) 
0xba, (byte) 0xbe},
+                                DataTypes.BYTES()))
+                .isInstanceOf(byte[].class)
+                .extracting(byte[].class::cast)
+                .extracting(Arrays::toString)
+                .isEqualTo("[-54, -2, -70, -66]");
+        assertThat(convertToJava(null, DataTypes.BYTES())).isNull();
+    }
+
+    @Test
+    void testConvertToBinary() {
+        assertThat(convertToJava("Alice".getBytes(), DataTypes.BINARY(5)))
+                .isEqualTo(new byte[] {65, 108, 105, 99, 101});
+        assertThat(
+                        convertToJava(
+                                new byte[] {(byte) 0xca, (byte) 0xfe, (byte) 
0xba, (byte) 0xbe},
+                                DataTypes.BINARY(4)))
+                .isEqualTo(new byte[] {-54, -2, -70, -66});
+        assertThat(convertToJava(null, DataTypes.BINARY(3))).isNull();
+    }
+
+    @Test
+    void testConvertToVarBinary() {
+        assertThat(convertToJava("Alice".getBytes(), DataTypes.VARBINARY(5)))
+                .isEqualTo(new byte[] {65, 108, 105, 99, 101});
+        assertThat(
+                        convertToJava(
+                                new byte[] {(byte) 0xca, (byte) 0xfe, (byte) 
0xba, (byte) 0xbe},
+                                DataTypes.VARBINARY(4)))
+                .isEqualTo(new byte[] {-54, -2, -70, -66});
+        assertThat(convertToJava(null, DataTypes.VARBINARY(3))).isNull();
+    }
+
+    @Test
+    void testConvertToChar() {
+        assertThat(convertToJava("Alice", DataTypes.CHAR(5)))
+                .isInstanceOf(String.class)
+                .hasToString("Alice");
+
+        assertThat(convertToJava(BinaryStringData.fromString("Bob"), 
DataTypes.CHAR(5)))
+                .isInstanceOf(String.class)
+                .hasToString("Bob");
+
+        assertThat(convertToJava(null, DataTypes.CHAR(5))).isNull();
+    }
+
+    @Test
+    void testConvertToVarChar() {
+        assertThat(convertToJava("Alice", DataTypes.VARCHAR(5)))
+                .isInstanceOf(String.class)
+                .hasToString("Alice");
+
+        assertThat(convertToJava(BinaryStringData.fromString("Bob"), 
DataTypes.VARCHAR(5)))
+                .isInstanceOf(String.class)
+                .hasToString("Bob");
+
+        assertThat(convertToJava(null, DataTypes.VARCHAR(5))).isNull();
+    }
+
+    @Test
+    void testConvertToString() {
+        assertThat(convertToJava("Alice", DataTypes.STRING()))
+                .isInstanceOf(String.class)
+                .hasToString("Alice");
+        assertThat(convertToJava(BinaryStringData.fromString("Bob"), 
DataTypes.STRING()))
+                .isInstanceOf(String.class)
+                .hasToString("Bob");
+        assertThat(convertToJava(null, DataTypes.STRING())).isNull();
+    }
+
+    @Test
+    void testConvertToInt() {
+        assertThat(convertToJava(11, DataTypes.INT())).isEqualTo(11);
+        assertThat(convertToJava(-14, DataTypes.INT())).isEqualTo(-14);
+        assertThat(convertToJava(17, DataTypes.INT())).isEqualTo(17);
+        assertThat(convertToJava(null, DataTypes.INT())).isNull();
+    }
+
+    @Test
+    void testConvertToTinyInt() {
+        assertThat(convertToJava((byte) 11, 
DataTypes.TINYINT())).isEqualTo((byte) 11);
+        assertThat(convertToJava((byte) -14, 
DataTypes.TINYINT())).isEqualTo((byte) -14);
+        assertThat(convertToJava((byte) 17, 
DataTypes.TINYINT())).isEqualTo((byte) 17);
+        assertThat(convertToJava(null, DataTypes.TINYINT())).isNull();
+    }
+
+    @Test
+    void testConvertToSmallInt() {
+        assertThat(convertToJava((short) 11, 
DataTypes.SMALLINT())).isEqualTo((short) 11);
+        assertThat(convertToJava((short) -14, 
DataTypes.SMALLINT())).isEqualTo((short) -14);
+        assertThat(convertToJava((short) 17, 
DataTypes.SMALLINT())).isEqualTo((short) 17);
+        assertThat(convertToJava(null, DataTypes.SMALLINT())).isNull();
+    }
+
+    @Test
+    void testConvertToBigInt() {
+        assertThat(convertToJava((long) 11, 
DataTypes.BIGINT())).isEqualTo((long) 11);
+        assertThat(convertToJava((long) -14, 
DataTypes.BIGINT())).isEqualTo((long) -14);
+        assertThat(convertToJava((long) 17, 
DataTypes.BIGINT())).isEqualTo((long) 17);
+        assertThat(convertToJava(null, DataTypes.BIGINT())).isNull();
+    }
+
+    @Test
+    void testConvertToFloat() {
+        assertThat(convertToJava((float) 11, 
DataTypes.FLOAT())).isEqualTo((float) 11);
+        assertThat(convertToJava((float) -14, 
DataTypes.FLOAT())).isEqualTo((float) -14);
+        assertThat(convertToJava((float) 17, 
DataTypes.FLOAT())).isEqualTo((float) 17);
+        assertThat(convertToJava(null, DataTypes.FLOAT())).isNull();
+    }
+
+    @Test
+    void testConvertToDouble() {
+        assertThat(convertToJava((double) 11, 
DataTypes.DOUBLE())).isEqualTo((double) 11);
+        assertThat(convertToJava((double) -14, 
DataTypes.DOUBLE())).isEqualTo((double) -14);
+        assertThat(convertToJava((double) 17, 
DataTypes.DOUBLE())).isEqualTo((double) 17);
+        assertThat(convertToJava(null, DataTypes.DOUBLE())).isNull();
+    }
+
+    @Test
+    void testConvertToDecimal() {
+        assertThat(convertToJava(new BigDecimal("4.2"), DataTypes.DECIMAL(2, 
1)))
+                .isInstanceOf(BigDecimal.class)
+                .hasToString("4.2");
+        assertThat(convertToJava(new BigDecimal("-3.1415926"), 
DataTypes.DECIMAL(20, 10)))
+                .isInstanceOf(BigDecimal.class)
+                .hasToString("-3.1415926");
+
+        assertThat(convertToJava(DecimalData.fromUnscaledLong(42, 2, 1), 
DataTypes.DECIMAL(2, 1)))
+                .isInstanceOf(BigDecimal.class)
+                .hasToString("4.2");
+        assertThat(
+                        convertToJava(
+                                DecimalData.fromUnscaledLong(-31415926, 14, 7),
+                                DataTypes.DECIMAL(14, 7)))
+                .isInstanceOf(BigDecimal.class)
+                .hasToString("-3.1415926");
+
+        assertThat(convertToJava(null, DataTypes.DECIMAL(20, 10))).isNull();
+    }
+
+    @Test
+    void testConvertToDate() {
+        assertThat(convertToJava(LocalDate.of(2017, 12, 31), DataTypes.DATE()))
+                .isInstanceOf(LocalDate.class)
+                .hasToString("2017-12-31");
+        assertThat(convertToJava(DateData.fromEpochDay(14417), 
DataTypes.DATE()))
+                .isInstanceOf(LocalDate.class)
+                .hasToString("2009-06-22");
+        assertThat(convertToJava(null, DataTypes.DATE())).isNull();
+    }
+
+    @Test
+    void testConvertToTime() {
+        assertThat(convertToJava(LocalTime.of(21, 48, 25), DataTypes.TIME(0)))
+                .isInstanceOf(LocalTime.class)
+                .hasToString("21:48:25");
+        assertThat(convertToJava(LocalTime.ofSecondOfDay(14419), 
DataTypes.TIME(0)))
+                .isInstanceOf(LocalTime.class)
+                .hasToString("04:00:19");
+        assertThat(convertToJava(LocalTime.of(21, 48, 25, 123456789), 
DataTypes.TIME(3)))
+                .isInstanceOf(LocalTime.class)
+                .hasToString("21:48:25.123456789");
+        assertThat(convertToJava(LocalTime.ofNanoOfDay(14419123456789L), 
DataTypes.TIME(3)))
+                .isInstanceOf(LocalTime.class)
+                .hasToString("04:00:19.123456789");
+
+        assertThat(
+                        convertToJava(
+                                TimeData.fromLocalTime(LocalTime.of(21, 48, 
25)),
+                                DataTypes.TIME(0)))
+                .isInstanceOf(LocalTime.class)
+                .hasToString("21:48:25");
+        assertThat(convertToJava(TimeData.fromSecondOfDay(14419), 
DataTypes.TIME(0)))
+                .isInstanceOf(LocalTime.class)
+                .hasToString("04:00:19");
+        assertThat(
+                        convertToJava(
+                                TimeData.fromLocalTime(LocalTime.of(21, 48, 
25, 123456789)),
+                                DataTypes.TIME(3)))
+                .isInstanceOf(LocalTime.class)
+                .hasToString("21:48:25.123");
+        assertThat(convertToJava(TimeData.fromNanoOfDay(14419123456789L), 
DataTypes.TIME(3)))
+                .isInstanceOf(LocalTime.class)
+                .hasToString("04:00:19.123");
+        assertThat(convertToJava(null, DataTypes.TIME())).isNull();
+    }
+
+    @Test
+    void testConvertToTimestamp() {
+        assertThat(convertToJava(TimestampData.fromMillis(2147483648491L), 
DataTypes.TIMESTAMP(3)))
+                .isInstanceOf(LocalDateTime.class)
+                .hasToString("2038-01-19T03:14:08.491");
+        assertThat(
+                        convertToJava(
+                                LocalDateTime.of(2019, 12, 25, 21, 48, 25, 
123456789),
+                                DataTypes.TIMESTAMP(9)))
+                .isInstanceOf(LocalDateTime.class)
+                .hasToString("2019-12-25T21:48:25.123456789");
+        assertThat(convertToJava(null, DataTypes.TIMESTAMP())).isNull();
+    }
+
+    @Test
+    void testConvertToZonedTimestamp() {
+        assertThat(
+                        convertToJava(
+                                ZonedTimestampData.of(2143658709L, 0, "UTC"),
+                                DataTypes.TIMESTAMP_TZ(3)))
+                .isInstanceOf(ZonedDateTime.class)
+                .hasToString("1970-01-25T19:27:38.709Z[UTC]");
+        assertThat(
+                        convertToJava(
+                                ZonedDateTime.of(
+                                        2019,
+                                        12,
+                                        25,
+                                        21,
+                                        48,
+                                        25,
+                                        123456789,
+                                        ZoneId.of("UTC+08:00")),
+                                DataTypes.TIMESTAMP_TZ(9)))
+                .isInstanceOf(ZonedDateTime.class)
+                .hasToString("2019-12-25T21:48:25.123456789+08:00[UTC+08:00]");
+        assertThat(convertToJava(null, DataTypes.TIMESTAMP_TZ())).isNull();
+    }
+
+    @Test
+    void testConvertToLocalZonedTimestamp() {
+        assertThat(
+                        convertToJava(
+                                
LocalZonedTimestampData.fromEpochMillis(3141592653589L),
+                                DataTypes.TIMESTAMP_LTZ(3)))
+                .isInstanceOf(Instant.class)
+                .hasToString("2069-07-21T00:37:33.589Z");
+        assertThat(
+                        convertToJava(
+                                Instant.ofEpochSecond(2718281828L, 123456789),
+                                DataTypes.TIMESTAMP_LTZ(9)))
+                .isInstanceOf(Instant.class)
+                .hasToString("2056-02-20T14:17:08.123456789Z");
+        assertThat(convertToJava(null, DataTypes.TIMESTAMP_LTZ())).isNull();
+    }
+
+    @Test
+    void testConvertToArray() {
+        assertThat(
+                        convertToJava(
+                                Arrays.asList("Alice", "Bob", "Charlie"),
+                                DataTypes.ARRAY(DataTypes.STRING())))
+                .isInstanceOf(List.class)
+                .hasToString("[Alice, Bob, Charlie]")
+                .extracting(List.class::cast)
+                .extracting(s -> s.get(0))
+                .isInstanceOf(String.class);
+        assertThat(
+                        convertToJava(
+                                new GenericArrayData(
+                                        new StringData[] {
+                                            
BinaryStringData.fromString("Derrida"),
+                                            
BinaryStringData.fromString("Enigma"),
+                                            BinaryStringData.fromString("Fall")
+                                        }),
+                                DataTypes.ARRAY(DataTypes.STRING())))
+                .isInstanceOf(List.class)
+                .hasToString("[Derrida, Enigma, Fall]")
+                .extracting(List.class::cast)
+                .extracting(s -> s.get(0))
+                .isInstanceOf(String.class);
+        assertThat(convertToJava(null, DataTypes.TIMESTAMP_LTZ())).isNull();
+    }
+
+    @Test
+    void testConvertToMap() {
+        MapType targetType =
+                DataTypes.MAP(
+                        DataTypes.STRING(), DataTypes.ROW(DataTypes.INT(), 
DataTypes.STRING()));
+        Map<String, List<Object>> originalMap =
+                Map.of(
+                        "Alice", List.of(5, "AILISI"),
+                        "Bob", List.of(3, "BAOBO"),
+                        "Cancan", List.of(6, "KANGKANG"));
+        assertThat(convertToJava(originalMap, targetType))
+                .isInstanceOf(Map.class)
+                .asInstanceOf(map(String.class, List.class))
+                .containsExactlyEntriesOf(originalMap);
+
+        assertThat(
+                        convertToJava(
+                                new GenericMapData(
+                                        Map.of(
+                                                
BinaryStringData.fromString("Derrida"),
+                                                GenericRecordData.of(
+                                                        7, 
BinaryStringData.fromString("DELIDA")))),
+                                targetType))
+                .isInstanceOf(Map.class)
+                .asInstanceOf(map(String.class, List.class))
+                .containsExactlyEntriesOf(Map.of("Derrida", List.of(7, 
"DELIDA")));
+        assertThat(convertToJava(null, targetType)).isNull();
+    }
+
+    @Test
+    void testConvertToRow() {
+        RowType targetType =
+                DataTypes.ROW(
+                        DataTypes.STRING(), DataTypes.ROW(DataTypes.INT(), 
DataTypes.STRING()));
+        assertThat(convertToJava(List.of("Alice", List.of(5, "AILISI")), 
targetType))
+                .isInstanceOf(List.class)
+                .asInstanceOf(list(Object.class))
+                .containsExactly("Alice", List.of(5, "AILISI"));
+
+        assertThat(convertToJava(List.of("Bob", List.of(3, "BAOBO")), 
targetType))
+                .isInstanceOf(List.class)
+                .asInstanceOf(list(Object.class))
+                .containsExactly("Bob", List.of(3, "BAOBO"));
+
+        assertThat(convertToJava(null, targetType)).isNull();
+    }
+}
diff --git 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/VariantConvertingTest.java
 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/VariantConvertingTest.java
new file mode 100644
index 000000000..fbcc66fd1
--- /dev/null
+++ 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/converter/VariantConvertingTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.converter;
+
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.variant.BinaryVariantBuilder;
+import org.apache.flink.cdc.common.types.variant.Variant;
+import org.apache.flink.cdc.common.types.variant.VariantBuilder;
+import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Converter tests for {@link org.apache.flink.cdc.common.types.VariantType}. 
*/
+class VariantConvertingTest {
+
+    private static final BinaryVariantBuilder BUILDER = new 
BinaryVariantBuilder();
+    private static final VariantBuilder.VariantArrayBuilder ARRAY_BUILDER =
+            new BinaryVariantBuilder.VariantArrayBuilder();
+    private static final BinaryVariantBuilder.VariantObjectBuilder 
OBJECT_BUILDER =
+            new BinaryVariantBuilder.VariantObjectBuilder(true);
+
+    private static final Variant[] TEST_VARIANTS = {
+        BUILDER.of(true),
+        BUILDER.of((byte) 2),
+        BUILDER.of((short) 3),
+        BUILDER.of(5),
+        BUILDER.of((long) 7),
+        BUILDER.of("11"),
+        BUILDER.of((double) 13),
+        BUILDER.of((float) 17),
+        BUILDER.of("19".getBytes()),
+        BUILDER.of(new BigDecimal("23")),
+        BUILDER.of(Instant.ofEpochMilli(29)),
+        BUILDER.of(LocalDate.ofEpochDay(31)),
+        BUILDER.of(LocalDateTime.ofEpochSecond(37, 37, ZoneOffset.UTC)),
+        BUILDER.ofNull(),
+        ARRAY_BUILDER
+                .add(BUILDER.of(true))
+                .add(BUILDER.of((byte) 2))
+                .add(BUILDER.of((short) 3))
+                .add(BUILDER.of(5))
+                .add(BUILDER.of((long) 7))
+                .add(BUILDER.of("11"))
+                .add(BUILDER.of((double) 13))
+                .add(BUILDER.of((float) 17))
+                .add(BUILDER.of("19".getBytes()))
+                .add(BUILDER.of(new BigDecimal("23")))
+                .add(BUILDER.of(Instant.ofEpochMilli(29)))
+                .add(BUILDER.of(LocalDate.ofEpochDay(31)))
+                .add(BUILDER.of(LocalDateTime.ofEpochSecond(37, 37, 
ZoneOffset.UTC)))
+                .add(BUILDER.ofNull())
+                .build(),
+        OBJECT_BUILDER
+                .add("col_bool", BUILDER.of(true))
+                .add("col_tinyint", BUILDER.of((byte) 2))
+                .add("col_shortint", BUILDER.of((short) 3))
+                .add("col_int", BUILDER.of(5))
+                .add("col_bigint", BUILDER.of((long) 7))
+                .add("col_string", BUILDER.of("11"))
+                .add("col_double", BUILDER.of((double) 13))
+                .add("col_float", BUILDER.of((float) 17))
+                .add("col_bytes", BUILDER.of("19".getBytes()))
+                .add("col_decimal", BUILDER.of(new BigDecimal("23")))
+                .add("col_timestamp", BUILDER.of(Instant.ofEpochMilli(29)))
+                .add("col_date", BUILDER.of(LocalDate.ofEpochDay(31)))
+                .add(
+                        "col_datetime",
+                        BUILDER.of(LocalDateTime.ofEpochSecond(37, 37, 
ZoneOffset.UTC)))
+                .add("col_null", BUILDER.ofNull())
+                .build()
+    };
+
+    @Test
+    void testConvertingFromVariant() {
+        assertThat(Stream.of(TEST_VARIANTS))
+                .map(o -> InternalObjectConverter.convertToInternal(o, 
DataTypes.VARIANT()))
+                .containsExactly(TEST_VARIANTS);
+    }
+
+    @Test
+    void testConvertingToVariant() {
+        assertThat(Stream.of(TEST_VARIANTS))
+                .map(o -> JavaObjectConverter.convertToJava(o, 
DataTypes.VARIANT()))
+                .containsExactly(TEST_VARIANTS);
+    }
+
+    @Test
+    void testVariantTypeCoercion() {
+        List<BinaryStringData> expectedStringResult =
+                Stream.of(
+                                "true",
+                                "2",
+                                "3",
+                                "5",
+                                "7",
+                                "\"11\"",
+                                "13.0",
+                                "17.0",
+                                "\"MTk=\"",
+                                "23",
+                                "\"1970-01-01T00:00:00.029+00:00\"",
+                                "\"1970-02-01\"",
+                                "\"1970-01-01T00:00:37\"",
+                                "null",
+                                
"[true,2,3,5,7,\"11\",13.0,17.0,\"MTk=\",23,\"1970-01-01T00:00:00.029+00:00\",\"1970-02-01\",\"1970-01-01T00:00:37\",null]",
+                                
"{\"col_bigint\":7,\"col_bool\":true,\"col_bytes\":\"MTk=\",\"col_date\":\"1970-02-01\",\"col_datetime\":\"1970-01-01T00:00:37\",\"col_decimal\":23,\"col_double\":13.0,\"col_float\":17.0,\"col_int\":5,\"col_null\":null,\"col_shortint\":3,\"col_string\":\"11\",\"col_timestamp\":\"1970-01-01T00:00:00.029+00:00\",\"col_tinyint\":2}")
+                        .map(BinaryStringData::new)
+                        .collect(Collectors.toList());
+
+        assertThat(Stream.of(TEST_VARIANTS))
+                .map(
+                        variant ->
+                                SchemaMergingUtils.coerceObject(
+                                        "UTC", variant, DataTypes.VARIANT(), 
DataTypes.STRING()))
+                .containsExactlyElementsOf(expectedStringResult);
+    }
+}
diff --git 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java
 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java
index cc653857e..1f53668d4 100644
--- 
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java
+++ 
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java
@@ -104,6 +104,7 @@ class SchemaMergingUtilsTest {
     private static final DataType ROW = DataTypes.ROW(INT, STRING);
     private static final DataType ARRAY = DataTypes.ARRAY(STRING);
     private static final DataType MAP = DataTypes.MAP(INT, STRING);
+    private static final DataType VARIANT = DataTypes.VARIANT();
 
     private static final List<DataType> ALL_TYPES =
             Arrays.asList(
@@ -130,7 +131,8 @@ class SchemaMergingUtilsTest {
                     // Complex types
                     ROW,
                     ARRAY,
-                    MAP);
+                    MAP,
+                    VARIANT);
 
     private static final Map<DataType, Object> DUMMY_OBJECTS =
             ImmutableMap.of(
@@ -966,35 +968,35 @@ class SchemaMergingUtilsTest {
                 Arrays.asList(
                         STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
                         STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
-                        STRING));
+                        STRING, STRING));
 
         assertTypeMergingVector(
                 CHAR,
                 Arrays.asList(
                         STRING, CHAR, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING,
                         STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
-                        STRING));
+                        STRING, STRING));
 
         assertTypeMergingVector(
                 VARCHAR,
                 Arrays.asList(
                         STRING, STRING, VARCHAR, STRING, STRING, STRING, 
STRING, STRING, STRING,
                         STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
-                        STRING));
+                        STRING, STRING));
 
         assertTypeMergingVector(
                 BINARY,
                 Arrays.asList(
                         STRING, STRING, STRING, BINARY, STRING, STRING, 
STRING, STRING, STRING,
                         STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
-                        STRING));
+                        STRING, STRING));
 
         assertTypeMergingVector(
                 VARBINARY,
                 Arrays.asList(
                         STRING, STRING, STRING, STRING, VARBINARY, STRING, 
STRING, STRING, STRING,
                         STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
-                        STRING));
+                        STRING, STRING));
 
         // 8-bit TINYINT could fit into FLOAT (24 sig bits) or DOUBLE (53 sig 
bits)
         assertTypeMergingVector(
@@ -1002,7 +1004,7 @@ class SchemaMergingUtilsTest {
                 Arrays.asList(
                         STRING, STRING, STRING, STRING, STRING, TINYINT, 
SMALLINT, INT, BIGINT,
                         DECIMAL, FLOAT, DOUBLE, STRING, STRING, STRING, 
STRING, STRING, STRING,
-                        STRING));
+                        STRING, STRING));
 
         // 16-bit SMALLINT could fit into FLOAT (24 sig bits) or DOUBLE (53 
sig bits)
         assertTypeMergingVector(
@@ -1010,42 +1012,43 @@ class SchemaMergingUtilsTest {
                 Arrays.asList(
                         STRING, STRING, STRING, STRING, STRING, SMALLINT, 
SMALLINT, INT, BIGINT,
                         DECIMAL, FLOAT, DOUBLE, STRING, STRING, STRING, 
STRING, STRING, STRING,
-                        STRING));
+                        STRING, STRING));
 
         // 32-bit INT could fit into DOUBLE (53 sig bits)
         assertTypeMergingVector(
                 INT,
                 Arrays.asList(
                         STRING, STRING, STRING, STRING, STRING, INT, INT, INT, 
BIGINT, DECIMAL,
-                        DOUBLE, DOUBLE, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING));
+                        DOUBLE, DOUBLE, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
+                        STRING));
 
         assertTypeMergingVector(
                 BIGINT,
                 Arrays.asList(
                         STRING, STRING, STRING, STRING, STRING, BIGINT, 
BIGINT, BIGINT, BIGINT,
                         DECIMAL, DOUBLE, DOUBLE, STRING, STRING, STRING, 
STRING, STRING, STRING,
-                        STRING));
+                        STRING, STRING));
 
         assertTypeMergingVector(
                 DECIMAL,
                 Arrays.asList(
                         STRING, STRING, STRING, STRING, STRING, DECIMAL, 
DECIMAL, DECIMAL, DECIMAL,
                         DECIMAL, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
-                        STRING));
+                        STRING, STRING));
 
         assertTypeMergingVector(
                 FLOAT,
                 Arrays.asList(
                         STRING, STRING, STRING, STRING, STRING, FLOAT, FLOAT, 
DOUBLE, DOUBLE,
                         STRING, FLOAT, DOUBLE, STRING, STRING, STRING, STRING, 
STRING, STRING,
-                        STRING));
+                        STRING, STRING));
 
         assertTypeMergingVector(
                 DOUBLE,
                 Arrays.asList(
                         STRING, STRING, STRING, STRING, STRING, DOUBLE, 
DOUBLE, DOUBLE, DOUBLE,
                         STRING, DOUBLE, DOUBLE, STRING, STRING, STRING, 
STRING, STRING, STRING,
-                        STRING));
+                        STRING, STRING));
 
         assertTypeMergingVector(
                 TIMESTAMP,
@@ -1068,6 +1071,7 @@ class SchemaMergingUtilsTest {
                         STRING,
                         STRING,
                         STRING,
+                        STRING,
                         STRING));
 
         assertTypeMergingVector(
@@ -1091,6 +1095,7 @@ class SchemaMergingUtilsTest {
                         STRING,
                         STRING,
                         STRING,
+                        STRING,
                         STRING));
 
         assertTypeMergingVector(
@@ -1114,6 +1119,7 @@ class SchemaMergingUtilsTest {
                         STRING,
                         STRING,
                         STRING,
+                        STRING,
                         STRING));
 
         assertTypeMergingVector(
@@ -1121,13 +1127,13 @@ class SchemaMergingUtilsTest {
                 Arrays.asList(
                         STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
                         STRING, STRING, STRING, STRING, STRING, STRING, TIME, 
STRING, STRING,
-                        STRING));
+                        STRING, STRING));
 
         assertTypeMergingVector(
                 ROW,
                 Arrays.asList(
                         STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
-                        STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, ROW, STRING,
+                        STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, ROW, STRING, STRING,
                         STRING));
 
         assertTypeMergingVector(
@@ -1135,14 +1141,21 @@ class SchemaMergingUtilsTest {
                 Arrays.asList(
                         STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
                         STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, ARRAY,
-                        STRING));
+                        STRING, STRING));
 
         assertTypeMergingVector(
                 MAP,
+                Arrays.asList(
+                        STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
+                        STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING, MAP,
+                        STRING));
+
+        assertTypeMergingVector(
+                VARIANT,
                 Arrays.asList(
                         STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
                         STRING, STRING, STRING, STRING, STRING, STRING, 
STRING, STRING, STRING,
-                        MAP));
+                        STRING, VARIANT));
     }
 
     private static void assertTypeMergingVector(DataType incomingType, 
List<DataType> resultTypes) {

Reply via email to