This is an automated email from the ASF dual-hosted git repository.

lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new bb5982b93 [FLINK-39417] Fix GenericRecordData could not be 
[de]serialized in pipeline (#4374)
bb5982b93 is described below

commit bb5982b93824e6857889a76591ebd24c9f0bdb60
Author: yuxiqian <[email protected]>
AuthorDate: Thu Apr 30 11:11:03 2026 +0800

    [FLINK-39417] Fix GenericRecordData could not be [de]serialized in pipeline 
(#4374)
---
 .../flink/FlinkPipelineComposerITCase.java         | 234 ++++++++++++
 .../cdc/connectors/values/ValuesDatabase.java      |   6 +-
 .../operators/transform/PreTransformOperator.java  |   5 +-
 .../operators/transform/PreTransformProcessor.java |   2 +-
 .../data/GenericRecordDataSerializer.java          | 416 +++++++++++++++++++++
 .../serializer/data/RecordDataSerializer.java      |  68 +++-
 .../serializer/data/writer/BinaryWriter.java       |  26 +-
 .../serializer/data/RecordDataSerializerTest.java  | 321 +++++++++++++++-
 8 files changed, 1061 insertions(+), 17 deletions(-)

diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 47e8fae57..80606a38a 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -18,8 +18,13 @@
 package org.apache.flink.cdc.composer.flink;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.DateData;
 import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
 import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.TimeData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
 import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
@@ -72,6 +77,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -1733,6 +1739,234 @@ class FlinkPipelineComposerITCase {
                 .isGreaterThan(0);
     }
 
+    @ParameterizedTest
+    @EnumSource
+    void testGenericRecordDataEndToEnd(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+        TableId myTable1 = TableId.tableId("default_namespace", 
"default_schema", "mytable1");
+        Schema table1Schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .physicalColumn("age", DataTypes.INT())
+                        .physicalColumn("col_bool", DataTypes.BOOLEAN())
+                        .physicalColumn("col_tinyint", DataTypes.TINYINT())
+                        .physicalColumn("col_smallint", DataTypes.SMALLINT())
+                        .physicalColumn("col_bigint", DataTypes.BIGINT())
+                        .physicalColumn("col_float", DataTypes.FLOAT())
+                        .physicalColumn("col_double", DataTypes.DOUBLE())
+                        .physicalColumn("col_decimal", DataTypes.DECIMAL(10, 
2))
+                        .physicalColumn("col_date", DataTypes.DATE())
+                        .physicalColumn("col_time", DataTypes.TIME())
+                        .physicalColumn("col_timestamp", 
DataTypes.TIMESTAMP(3))
+                        .physicalColumn("col_timestamp_ltz", 
DataTypes.TIMESTAMP_LTZ(3))
+                        .physicalColumn("col_timestamp_tz", 
DataTypes.TIMESTAMP_TZ(3))
+                        .physicalColumn("col_array", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .physicalColumn(
+                                "col_map", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT()))
+                        .physicalColumn(
+                                "col_row",
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("f0", DataTypes.INT()),
+                                        DataTypes.FIELD("f1", 
DataTypes.STRING())))
+                        .primaryKey("id")
+                        .build();
+
+        GenericArrayData testArray =
+                new GenericArrayData(
+                        new Object[] {
+                            BinaryStringData.fromString("x"), 
BinaryStringData.fromString("y")
+                        });
+        GenericMapData testMap = new 
GenericMapData(Map.of(BinaryStringData.fromString("k1"), 100));
+        GenericRecordData testRow = GenericRecordData.of(77, 
BinaryStringData.fromString("inner"));
+        DecimalData testDecimal = DecimalData.fromBigDecimal(new 
BigDecimal("123.45"), 10, 2);
+        TimestampData testTs = TimestampData.fromMillis(1609459200000L);
+        LocalZonedTimestampData testTsLtz = 
LocalZonedTimestampData.fromEpochMillis(1609459200000L);
+        ZonedTimestampData testTsTz = ZonedTimestampData.of(1609459200000L, 0, 
"UTC");
+
+        List<Event> events = new ArrayList<>();
+        events.add(new CreateTableEvent(myTable1, table1Schema));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable1,
+                        GenericRecordData.of(
+                                1,
+                                BinaryStringData.fromString("Alice"),
+                                18,
+                                true,
+                                (byte) 1,
+                                (short) 100,
+                                1000L,
+                                1.0f,
+                                1.0,
+                                testDecimal,
+                                DateData.fromEpochDay(18628),
+                                TimeData.fromMillisOfDay(43200000),
+                                testTs,
+                                testTsLtz,
+                                testTsTz,
+                                testArray,
+                                testMap,
+                                testRow)));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable1,
+                        GenericRecordData.of(
+                                2,
+                                BinaryStringData.fromString("Bob"),
+                                20,
+                                true,
+                                (byte) 42,
+                                (short) 200,
+                                9876543210L,
+                                3.14f,
+                                2.718,
+                                testDecimal,
+                                DateData.fromEpochDay(18628),
+                                TimeData.fromMillisOfDay(43200000),
+                                testTs,
+                                testTsLtz,
+                                testTsTz,
+                                testArray,
+                                testMap,
+                                testRow)));
+        events.add(
+                DataChangeEvent.updateEvent(
+                        myTable1,
+                        GenericRecordData.of(
+                                2,
+                                BinaryStringData.fromString("Bob"),
+                                20,
+                                true,
+                                (byte) 42,
+                                (short) 200,
+                                9876543210L,
+                                3.14f,
+                                2.718,
+                                testDecimal,
+                                DateData.fromEpochDay(18628),
+                                TimeData.fromMillisOfDay(43200000),
+                                testTs,
+                                testTsLtz,
+                                testTsTz,
+                                testArray,
+                                testMap,
+                                testRow),
+                        GenericRecordData.of(
+                                2,
+                                BinaryStringData.fromString("Bob"),
+                                30,
+                                false,
+                                (byte) 43,
+                                (short) 201,
+                                9876543211L,
+                                3.15f,
+                                2.719,
+                                testDecimal,
+                                DateData.fromEpochDay(18629),
+                                TimeData.fromMillisOfDay(43201000),
+                                testTs,
+                                testTsLtz,
+                                testTsTz,
+                                testArray,
+                                testMap,
+                                testRow)));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable1,
+                        GenericRecordData.of(
+                                3, null, null, null, null, null, null, null, 
null, null, null, null,
+                                null, null, null, null, null, null)));
+        events.add(
+                DataChangeEvent.deleteEvent(
+                        myTable1,
+                        GenericRecordData.of(
+                                1,
+                                BinaryStringData.fromString("Alice"),
+                                18,
+                                true,
+                                (byte) 1,
+                                (short) 100,
+                                1000L,
+                                1.0f,
+                                1.0,
+                                testDecimal,
+                                DateData.fromEpochDay(18628),
+                                TimeData.fromMillisOfDay(43200000),
+                                testTs,
+                                testTsLtz,
+                                testTsTz,
+                                testArray,
+                                testMap,
+                                testRow)));
+
+        
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        TransformDef transformDef =
+                new TransformDef(
+                        "default_namespace.default_schema.mytable1",
+                        "*, 'test_tag' as tag",
+                        "id <> 1",
+                        null,
+                        null,
+                        null,
+                        "",
+                        null);
+
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        new 
ArrayList<>(Collections.singletonList(transformDef)),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        List<String> results = ValuesDatabase.getResults(myTable1);
+        assertThat(results).hasSize(2);
+        assertThat(results)
+                .anySatisfy(
+                        r ->
+                                assertThat(r)
+                                        .startsWith(
+                                                
"default_namespace.default_schema.mytable1:id=2;name=Bob;age=30;col_bool=false;col_tinyint=43;col_smallint=201;col_bigint=9876543211;col_float=3.15;col_double=2.719;col_decimal=123.45;col_date=2021-01-02;col_time=12:00:01;col_timestamp=2021-01-01T00:00;col_timestamp_ltz=2021-01-01T00:00;col_timestamp_tz=2021-01-01T00:00:00Z;")
+                                        .endsWith("tag=test_tag"))
+                .anySatisfy(
+                        r ->
+                                assertThat(r)
+                                        .isEqualTo(
+                                                
"default_namespace.default_schema.mytable1:id=3;name=;age=;col_bool=;col_tinyint=;col_smallint=;col_bigint=;col_float=;col_double=;col_decimal=;col_date=;col_time=;col_timestamp=;col_timestamp_ltz=;col_timestamp_tz=;col_array=;col_map=;col_row=;tag=test_tag"));
+
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+        assertThat(Arrays.asList(outputEvents))
+                .containsExactly(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`col_bool` 
BOOLEAN,`col_tinyint` TINYINT,`col_smallint` SMALLINT,`col_bigint` 
BIGINT,`col_float` FLOAT,`col_double` DOUBLE,`col_decimal` DECIMAL(10, 
2),`col_date` DATE,`col_time` TIME(0),`col_timestamp` 
TIMESTAMP(3),`col_timestamp_ltz` TIMESTAMP_LTZ(3),`col_timestamp_tz` 
TIMESTAMP(3) WITH TIME ZONE,`col_array` ARRAY<STRING>,`col_map` MAP<S [...]
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, true, 42, 200, 9876543210, 3.14, 2.718, 123.45, 2021-01-01, 
12:00, 2021-01-01T00:00, 2021-01-01T00:00, 2021-01-01T00:00:00Z, [x, y], {k1 -> 
100}, {f0: INT -> 77, f1: STRING -> inner}, test_tag], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, true, 42, 200, 9876543210, 3.14, 2.718, 123.45, 2021-01-01, 12:00, 
2021-01-01T00:00, 2021-01-01T00:00, 2021-01-01T00:00:00Z, [x, y], {k1 -> 100}, 
{f0: INT -> 77, f1: STRING -> inner}, test_tag], after=[2, Bob, 30, false, 43, 
201, 9876543211, 3.15, 2.719, 123.45, 2021-01-02, 12:00:01, 2021-01-01T00:00, 
2021-01-01T00:00, 2021-01-01T00:00:00Z, [x, y], {k1 -> 100}, {f0: INT -> 77,  
[...]
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, null, null, null, null, null, null, null, null, null, null, null, 
null, null, null, null, null, null, test_tag], op=INSERT, meta=()}");
+    }
+
     BinaryRecordData generate(Schema schema, Object... fields) {
         return (new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
                 .generate(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
index 698203e91..7689c637d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
@@ -422,7 +422,11 @@ public class ValuesDatabase {
         private String buildPrimaryKeyStr(RecordData recordData) {
             StringBuilder stringBuilder = new StringBuilder();
             for (Integer primaryKeyIndex : primaryKeyIndexes) {
-                
stringBuilder.append(recordData.getString(primaryKeyIndex).toString()).append(",");
+                RecordData.FieldGetter fieldGetter =
+                        RecordData.createFieldGetter(
+                                columns.get(primaryKeyIndex).getType(), 
primaryKeyIndex);
+                Object value = fieldGetter.getFieldOrNull(recordData);
+                stringBuilder.append(value != null ? value.toString() : 
"null").append(",");
             }
             stringBuilder.deleteCharAt(stringBuilder.length() - 1);
             return stringBuilder.toString();
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index cb8748d12..7c6db1735 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.runtime.operators.transform;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
 import org.apache.flink.cdc.common.event.ChangeEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -391,8 +392,8 @@ public class PreTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
                             + "This is likely a bug, please consider filing an 
issue.",
                     tableId);
 
-            BinaryRecordData before = (BinaryRecordData) 
dataChangeEvent.before();
-            BinaryRecordData after = (BinaryRecordData) 
dataChangeEvent.after();
+            RecordData before = dataChangeEvent.before();
+            RecordData after = dataChangeEvent.after();
             if (before != null) {
                 BinaryRecordData projectedBefore = 
processor.processFillDataField(before);
                 dataChangeEvent = 
DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
index cd679fcd5..00caa1e6c 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
@@ -59,7 +59,7 @@ public class PreTransformProcessor {
         return new CreateTableEvent(createTableEvent.tableId(), schema);
     }
 
-    public BinaryRecordData processFillDataField(BinaryRecordData data) {
+    public BinaryRecordData processFillDataField(RecordData data) {
         List<Object> valueList = new ArrayList<>();
         List<Column> columns = 
tableChangeInfo.getPreTransformedSchema().getColumns();
         Map<String, RecordData.FieldGetter> sourceFieldGettersMap =
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java
new file mode 100644
index 000000000..4f5c406fb
--- /dev/null
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.serializer.data;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.types.variant.BinaryVariant;
+import org.apache.flink.cdc.common.types.variant.Variant;
+import 
org.apache.flink.cdc.runtime.serializer.data.binary.BinaryRecordDataSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Serializer for {@link GenericRecordData}. Uses a self-describing format 
where each field is
+ * prefixed with a type tag, so no schema information is needed at 
serialization time.
+ */
+@Internal
+public class GenericRecordDataSerializer {
+
+    // Type tags for self-describing format
+    private static final byte TAG_NULL = 0;
+    private static final byte TAG_BOOLEAN = 1;
+    private static final byte TAG_BYTE = 2;
+    private static final byte TAG_SHORT = 3;
+    private static final byte TAG_INT = 4;
+    private static final byte TAG_LONG = 5;
+    private static final byte TAG_FLOAT = 6;
+    private static final byte TAG_DOUBLE = 7;
+    private static final byte TAG_STRING = 8;
+    private static final byte TAG_BINARY = 9;
+    private static final byte TAG_DECIMAL = 10;
+    private static final byte TAG_TIMESTAMP = 11;
+    private static final byte TAG_ZONED_TIMESTAMP = 12;
+    private static final byte TAG_LOCAL_ZONED_TIMESTAMP = 13;
+    private static final byte TAG_DATE = 14;
+    private static final byte TAG_TIME = 15;
+    private static final byte TAG_GENERIC_RECORD = 16;
+    private static final byte TAG_BINARY_RECORD = 17;
+    private static final byte TAG_ARRAY = 18;
+    private static final byte TAG_MAP = 19;
+    private static final byte TAG_VARIANT = 20;
+
+    private GenericRecordDataSerializer() {}
+
+    /** Serializes a {@link GenericRecordData} to the given output view. */
+    public static void serialize(GenericRecordData record, DataOutputView 
target)
+            throws IOException {
+        int arity = record.getArity();
+        target.writeInt(arity);
+        for (int i = 0; i < arity; i++) {
+            serializeField(record.getField(i), target);
+        }
+    }
+
+    /** Deserializes a {@link GenericRecordData} from the given input view. */
+    public static GenericRecordData deserialize(DataInputView source) throws 
IOException {
+        int arity = source.readInt();
+        GenericRecordData record = new GenericRecordData(arity);
+        for (int i = 0; i < arity; i++) {
+            record.setField(i, deserializeField(source));
+        }
+        return record;
+    }
+
+    /** Creates a deep copy of the given {@link GenericRecordData}. */
+    public static GenericRecordData copy(GenericRecordData from) {
+        return copy(from, new GenericRecordData(from.getArity()));
+    }
+
+    public static GenericRecordData copy(GenericRecordData from, 
GenericRecordData reuse) {
+        int arity = from.getArity();
+        GenericRecordData target =
+                (reuse.getArity() == arity) ? reuse : new 
GenericRecordData(arity);
+        for (int i = 0; i < arity; i++) {
+            target.setField(i, copyField(from.getField(i)));
+        }
+        return target;
+    }
+
+    // ---- Field-level serialization ----
+
+    static void serializeField(Object field, DataOutputView target) throws 
IOException {
+        if (field == null) {
+            target.writeByte(TAG_NULL);
+        } else if (field instanceof Boolean) {
+            target.writeByte(TAG_BOOLEAN);
+            target.writeBoolean((Boolean) field);
+        } else if (field instanceof Byte) {
+            target.writeByte(TAG_BYTE);
+            target.writeByte((Byte) field);
+        } else if (field instanceof Short) {
+            target.writeByte(TAG_SHORT);
+            target.writeShort((Short) field);
+        } else if (field instanceof Integer) {
+            target.writeByte(TAG_INT);
+            target.writeInt((Integer) field);
+        } else if (field instanceof Long) {
+            target.writeByte(TAG_LONG);
+            target.writeLong((Long) field);
+        } else if (field instanceof Float) {
+            target.writeByte(TAG_FLOAT);
+            target.writeFloat((Float) field);
+        } else if (field instanceof Double) {
+            target.writeByte(TAG_DOUBLE);
+            target.writeDouble((Double) field);
+        } else if (field instanceof StringData) {
+            target.writeByte(TAG_STRING);
+            byte[] bytes = ((StringData) field).toBytes();
+            target.writeInt(bytes.length);
+            target.write(bytes);
+        } else if (field instanceof byte[]) {
+            target.writeByte(TAG_BINARY);
+            byte[] bytes = (byte[]) field;
+            target.writeInt(bytes.length);
+            target.write(bytes);
+        } else if (field instanceof DecimalData) {
+            target.writeByte(TAG_DECIMAL);
+            DecimalData decimal = (DecimalData) field;
+            // Use DecimalData's precision/scale (SQL DECIMAL(p,s)) instead of 
BigDecimal's
+            target.writeInt(decimal.precision());
+            target.writeInt(decimal.scale());
+            byte[] unscaled = decimal.toUnscaledBytes();
+            target.writeInt(unscaled.length);
+            target.write(unscaled);
+        } else if (field instanceof TimestampData) {
+            target.writeByte(TAG_TIMESTAMP);
+            TimestampData ts = (TimestampData) field;
+            target.writeLong(ts.getMillisecond());
+            target.writeInt(ts.getNanoOfMillisecond());
+        } else if (field instanceof ZonedTimestampData) {
+            target.writeByte(TAG_ZONED_TIMESTAMP);
+            ZonedTimestampData zts = (ZonedTimestampData) field;
+            target.writeLong(zts.getMillisecond());
+            target.writeInt(zts.getNanoOfMillisecond());
+            byte[] zoneBytes = 
zts.getZoneId().getBytes(StandardCharsets.UTF_8);
+            target.writeInt(zoneBytes.length);
+            target.write(zoneBytes);
+        } else if (field instanceof LocalZonedTimestampData) {
+            target.writeByte(TAG_LOCAL_ZONED_TIMESTAMP);
+            LocalZonedTimestampData lzts = (LocalZonedTimestampData) field;
+            target.writeLong(lzts.getEpochMillisecond());
+            target.writeInt(lzts.getEpochNanoOfMillisecond());
+        } else if (field instanceof DateData) {
+            target.writeByte(TAG_DATE);
+            target.writeInt(((DateData) field).toEpochDay());
+        } else if (field instanceof TimeData) {
+            target.writeByte(TAG_TIME);
+            target.writeInt(((TimeData) field).toMillisOfDay());
+        } else if (field instanceof GenericRecordData) {
+            target.writeByte(TAG_GENERIC_RECORD);
+            serialize((GenericRecordData) field, target);
+        } else if (field instanceof BinaryRecordData) {
+            target.writeByte(TAG_BINARY_RECORD);
+            BinaryRecordDataSerializer.INSTANCE.serialize((BinaryRecordData) 
field, target);
+        } else if (field instanceof ArrayData) {
+            target.writeByte(TAG_ARRAY);
+            serializeArrayData((ArrayData) field, target);
+        } else if (field instanceof MapData) {
+            target.writeByte(TAG_MAP);
+            serializeMapData((MapData) field, target);
+        } else if (field instanceof Variant) {
+            target.writeByte(TAG_VARIANT);
+            serializeVariant((Variant) field, target);
+        } else {
+            throw new IOException(
+                    "Unsupported field type in GenericRecordData: " + 
field.getClass().getName());
+        }
+    }
+
+    static Object deserializeField(DataInputView source) throws IOException {
+        byte tag = source.readByte();
+        switch (tag) {
+            case TAG_NULL:
+                return null;
+            case TAG_BOOLEAN:
+                return source.readBoolean();
+            case TAG_BYTE:
+                return source.readByte();
+            case TAG_SHORT:
+                return source.readShort();
+            case TAG_INT:
+                return source.readInt();
+            case TAG_LONG:
+                return source.readLong();
+            case TAG_FLOAT:
+                return source.readFloat();
+            case TAG_DOUBLE:
+                return source.readDouble();
+            case TAG_STRING:
+                {
+                    int len = source.readInt();
+                    byte[] bytes = new byte[len];
+                    source.readFully(bytes);
+                    return BinaryStringData.fromBytes(bytes);
+                }
+            case TAG_BINARY:
+                {
+                    int len = source.readInt();
+                    byte[] bytes = new byte[len];
+                    source.readFully(bytes);
+                    return bytes;
+                }
+            case TAG_DECIMAL:
+                {
+                    int precision = source.readInt();
+                    int scale = source.readInt();
+                    int len = source.readInt();
+                    byte[] unscaled = new byte[len];
+                    source.readFully(unscaled);
+                    return DecimalData.fromUnscaledBytes(unscaled, precision, 
scale);
+                }
+            case TAG_TIMESTAMP:
+                return TimestampData.fromMillis(source.readLong(), 
source.readInt());
+            case TAG_ZONED_TIMESTAMP:
+                {
+                    long millis = source.readLong();
+                    int nanos = source.readInt();
+                    int zoneLen = source.readInt();
+                    byte[] zoneBytes = new byte[zoneLen];
+                    source.readFully(zoneBytes);
+                    return ZonedTimestampData.of(
+                            millis, nanos, new String(zoneBytes, 
StandardCharsets.UTF_8));
+                }
+            case TAG_LOCAL_ZONED_TIMESTAMP:
+                return 
LocalZonedTimestampData.fromEpochMillis(source.readLong(), source.readInt());
+            case TAG_DATE:
+                return DateData.fromEpochDay(source.readInt());
+            case TAG_TIME:
+                return TimeData.fromMillisOfDay(source.readInt());
+            case TAG_GENERIC_RECORD:
+                return deserialize(source);
+            case TAG_BINARY_RECORD:
+                return BinaryRecordDataSerializer.INSTANCE.deserialize(source);
+            case TAG_ARRAY:
+                return deserializeArrayData(source);
+            case TAG_MAP:
+                return deserializeMapData(source);
+            case TAG_VARIANT:
+                return deserializeVariant(source);
+            default:
+                throw new IOException("Unknown field type tag: " + tag);
+        }
+    }
+
+    // ---- ArrayData serialization ----
+
+    private static void serializeArrayData(ArrayData arrayData, DataOutputView 
target)
+            throws IOException {
+        if (arrayData instanceof GenericArrayData) {
+            Object[] elements = ((GenericArrayData) arrayData).toObjectArray();
+            target.writeInt(elements.length);
+            for (Object element : elements) {
+                serializeField(element, target);
+            }
+        } else {
+            throw new IOException(
+                    "Serialization of non-generic ArrayData is not supported 
in GenericRecordDataSerializer. "
+                            + "Actual type: "
+                            + arrayData.getClass().getName());
+        }
+    }
+
+    private static ArrayData deserializeArrayData(DataInputView source) throws 
IOException {
+        int size = source.readInt();
+        Object[] elements = new Object[size];
+        for (int i = 0; i < size; i++) {
+            elements[i] = deserializeField(source);
+        }
+        return new GenericArrayData(elements);
+    }
+
+    // ---- MapData serialization ----
+
+    private static void serializeMapData(MapData mapData, DataOutputView 
target)
+            throws IOException {
+        if (mapData instanceof GenericMapData) {
+            ArrayData keyArray = mapData.keyArray();
+            ArrayData valueArray = mapData.valueArray();
+            if (!(keyArray instanceof GenericArrayData)
+                    || !(valueArray instanceof GenericArrayData)) {
+                throw new IOException(
+                        "MapData with non-generic key/value arrays is not 
supported in GenericRecordDataSerializer.");
+            }
+            int size = mapData.size();
+            target.writeInt(size);
+            Object[] keys = ((GenericArrayData) keyArray).toObjectArray();
+            Object[] values = ((GenericArrayData) valueArray).toObjectArray();
+            for (int i = 0; i < size; i++) {
+                serializeField(keys[i], target);
+                serializeField(values[i], target);
+            }
+        } else {
+            throw new IOException(
+                    "Serialization of non-generic MapData is not supported in 
GenericRecordDataSerializer. "
+                            + "Actual type: "
+                            + mapData.getClass().getName());
+        }
+    }
+
+    private static MapData deserializeMapData(DataInputView source) throws 
IOException {
+        int size = source.readInt();
+        Map<Object, Object> map = new LinkedHashMap<>(size);
+        for (int i = 0; i < size; i++) {
+            Object key = deserializeField(source);
+            Object value = deserializeField(source);
+            map.put(key, value);
+        }
+        return new GenericMapData(map);
+    }
+
+    // ---- Variant serialization ----
+
+    private static void serializeVariant(Variant variant, DataOutputView 
target)
+            throws IOException {
+        if (variant instanceof BinaryVariant) {
+            BinaryVariant bv = (BinaryVariant) variant;
+            byte[] value = bv.getValue();
+            byte[] metadata = bv.getMetadata();
+            target.writeInt(value.length);
+            target.write(value);
+            target.writeInt(metadata.length);
+            target.write(metadata);
+        } else {
+            throw new IOException("Unsupported Variant type: " + 
variant.getClass().getName());
+        }
+    }
+
+    private static Variant deserializeVariant(DataInputView source) throws 
IOException {
+        int valueLen = source.readInt();
+        byte[] value = new byte[valueLen];
+        source.readFully(value);
+        int metadataLen = source.readInt();
+        byte[] metadata = new byte[metadataLen];
+        source.readFully(metadata);
+        return new BinaryVariant(value, metadata);
+    }
+
+    // ---- Field copy ----
+
+    private static Object copyField(Object field) {
+        if (field == null) {
+            return null;
+        }
+        // Most CDC internal data types are immutable, so shallow copy is safe
+        if (field instanceof byte[]) {
+            return ((byte[]) field).clone();
+        } else if (field instanceof GenericRecordData) {
+            return copy((GenericRecordData) field);
+        } else if (field instanceof BinaryRecordData) {
+            return ((BinaryRecordData) field).copy();
+        } else if (field instanceof GenericArrayData) {
+            Object[] elements = ((GenericArrayData) field).toObjectArray();
+            Object[] copied = new Object[elements.length];
+            for (int i = 0; i < elements.length; i++) {
+                copied[i] = copyField(elements[i]);
+            }
+            return new GenericArrayData(copied);
+        } else if (field instanceof GenericMapData) {
+            GenericMapData mapData = (GenericMapData) field;
+            ArrayData keyArray = mapData.keyArray();
+            ArrayData valueArray = mapData.valueArray();
+            if (!(keyArray instanceof GenericArrayData)
+                    || !(valueArray instanceof GenericArrayData)) {
+                throw new IllegalArgumentException(
+                        "Expected GenericArrayData for key and value arrays in 
GenericMapData, but got: keyArray="
+                                + keyArray.getClass().getName()
+                                + ", valueArray="
+                                + valueArray.getClass().getName());
+            }
+            Object[] keys = ((GenericArrayData) keyArray).toObjectArray();
+            Object[] values = ((GenericArrayData) valueArray).toObjectArray();
+            Map<Object, Object> newMap = new LinkedHashMap<>(keys.length);
+            for (int i = 0; i < keys.length; i++) {
+                newMap.put(copyField(keys[i]), copyField(values[i]));
+            }
+            return new GenericMapData(newMap);
+        }
+        // Immutable types: Boolean, Byte, Short, Integer, Long, Float, Double,
+        // StringData, DecimalData, TimestampData, ZonedTimestampData,
+        // LocalZonedTimestampData, DateData, TimeData, Variant
+        return field;
+    }
+}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java
index 6348c5c29..42948e3fa 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.runtime.serializer.data;
 
 import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.cdc.common.data.GenericRecordData;
 import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
 import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton;
@@ -33,43 +34,90 @@ public class RecordDataSerializer extends 
TypeSerializerSingleton<RecordData> {
 
     private static final long serialVersionUID = 1L;
 
+    /** Type tag for BinaryRecordData serialization. */
+    private static final byte BINARY_RECORD_TYPE = 0;
+
+    /** Type tag for GenericRecordData serialization. */
+    private static final byte GENERIC_RECORD_TYPE = 1;
+
     private final BinaryRecordDataSerializer binarySerializer = 
BinaryRecordDataSerializer.INSTANCE;
 
     public static final RecordDataSerializer INSTANCE = new 
RecordDataSerializer();
 
     @Override
     public RecordData createInstance() {
-        // BinaryRecordData is the only implementation of RecordData
         return new BinaryRecordData(1);
     }
 
     @Override
     public void serialize(RecordData recordData, DataOutputView target) throws 
IOException {
-        // BinaryRecordData is the only implementation of RecordData
-        binarySerializer.serialize((BinaryRecordData) recordData, target);
+        if (recordData instanceof BinaryRecordData) {
+            target.writeByte(BINARY_RECORD_TYPE);
+            binarySerializer.serialize((BinaryRecordData) recordData, target);
+        } else if (recordData instanceof GenericRecordData) {
+            target.writeByte(GENERIC_RECORD_TYPE);
+            GenericRecordDataSerializer.serialize((GenericRecordData) 
recordData, target);
+        } else {
+            throw new IOException(
+                    "Unsupported RecordData type: " + 
recordData.getClass().getName());
+        }
     }
 
     @Override
     public RecordData deserialize(DataInputView source) throws IOException {
-        // BinaryRecordData is the only implementation of RecordData
-        return binarySerializer.deserialize(source);
+        byte type = source.readByte();
+        if (type == BINARY_RECORD_TYPE) {
+            return binarySerializer.deserialize(source);
+        } else if (type == GENERIC_RECORD_TYPE) {
+            return GenericRecordDataSerializer.deserialize(source);
+        } else {
+            throw new IOException("Unknown RecordData type tag: " + type);
+        }
     }
 
     @Override
     public RecordData deserialize(RecordData reuse, DataInputView source) 
throws IOException {
-        return binarySerializer.deserialize((BinaryRecordData) reuse, source);
+        byte type = source.readByte();
+        if (type == BINARY_RECORD_TYPE) {
+            if (reuse instanceof BinaryRecordData) {
+                return binarySerializer.deserialize((BinaryRecordData) reuse, 
source);
+            }
+            return binarySerializer.deserialize(source);
+        } else if (type == GENERIC_RECORD_TYPE) {
+            return GenericRecordDataSerializer.deserialize(source);
+        } else {
+            throw new IOException("Unknown RecordData type tag: " + type);
+        }
     }
 
     @Override
     public RecordData copy(RecordData from) {
-        // BinaryRecordData is the only implementation of RecordData
-        return ((BinaryRecordData) from).copy();
+        if (from instanceof BinaryRecordData) {
+            return ((BinaryRecordData) from).copy();
+        } else if (from instanceof GenericRecordData) {
+            return GenericRecordDataSerializer.copy((GenericRecordData) from);
+        } else {
+            throw new RuntimeException("Unsupported RecordData type: " + 
from.getClass().getName());
+        }
     }
 
     @Override
     public RecordData copy(RecordData from, RecordData reuse) {
-        // BinaryRecordData is the only implementation of RecordData
-        return ((BinaryRecordData) from).copy((BinaryRecordData) reuse);
+        if (from instanceof BinaryRecordData) {
+            BinaryRecordData reuseRecord =
+                    (reuse instanceof BinaryRecordData)
+                            ? (BinaryRecordData) reuse
+                            : new BinaryRecordData(from.getArity());
+            return ((BinaryRecordData) from).copy(reuseRecord);
+        } else if (from instanceof GenericRecordData) {
+            GenericRecordData reuseRecord =
+                    (reuse instanceof GenericRecordData)
+                            ? (GenericRecordData) reuse
+                            : new GenericRecordData(from.getArity());
+            return GenericRecordDataSerializer.copy((GenericRecordData) from, 
reuseRecord);
+        } else {
+            throw new RuntimeException("Unsupported RecordData type: " + 
from.getClass().getName());
+        }
     }
 
     @Override
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java
index f6afad756..b698c0845 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java
@@ -29,16 +29,22 @@ import org.apache.flink.cdc.common.data.StringData;
 import org.apache.flink.cdc.common.data.TimeData;
 import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DecimalType;
 import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.RowType;
 import org.apache.flink.cdc.common.types.TimeType;
 import org.apache.flink.cdc.common.types.TimestampType;
 import org.apache.flink.cdc.common.types.ZonedTimestampType;
 import org.apache.flink.cdc.common.types.variant.Variant;
+import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper;
 import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer;
 import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+
+import java.util.List;
 
 /**
  * Writer to write a composite data format, like row, array. 1. Invoke {@link 
#reset()}. 2. Write
@@ -171,7 +177,25 @@ public interface BinaryWriter {
                 writer.writeMap(pos, (MapData) o, (MapDataSerializer) 
serializer);
                 break;
             case ROW:
-                writer.writeRecord(pos, (RecordData) o, 
(TypeSerializer<RecordData>) serializer);
+                RecordData recordData = (RecordData) o;
+                if (!(recordData instanceof BinaryRecordData)) {
+                    RowType rowType = (RowType) type;
+                    List<DataType> childTypes = rowType.getChildren();
+                    int arity = recordData.getArity();
+                    Preconditions.checkArgument(
+                            arity == childTypes.size(),
+                            "RecordData arity (%s) does not match row type 
field count (%s)",
+                            arity,
+                            childTypes.size());
+                    Object[] fields = new Object[arity];
+                    for (int i = 0; i < arity; i++) {
+                        fields[i] =
+                                
RecordData.createFieldGetter(childTypes.get(i), i)
+                                        .getFieldOrNull(recordData);
+                    }
+                    recordData = new 
BinaryRecordDataGenerator(rowType).generate(fields);
+                }
+                writer.writeRecord(pos, recordData, 
(TypeSerializer<RecordData>) serializer);
                 break;
             case BINARY:
             case VARBINARY:
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java
index ed84c4665..15cbde474 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java
@@ -17,14 +17,34 @@
 
 package org.apache.flink.cdc.runtime.serializer.data;
 
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
 import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.RowType;
 import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
-/** A test for the {@link StringDataSerializer}. */
 class RecordDataSerializerTest extends SerializerTestBase<RecordData> {
 
     @Override
@@ -49,7 +69,304 @@ class RecordDataSerializerTest extends 
SerializerTestBase<RecordData> {
         return new RecordData[] {
             generator.generate(new Object[] {1L, 
BinaryStringData.fromString("test1")}),
             generator.generate(new Object[] {2L, 
BinaryStringData.fromString("test2")}),
-            generator.generate(new Object[] {3L, null})
+            generator.generate(new Object[] {3L, null}),
+            GenericRecordData.of(1L, BinaryStringData.fromString("test1")),
+            GenericRecordData.of(2L, BinaryStringData.fromString("test2")),
+            GenericRecordData.of(3L, null)
         };
     }
+
+    @Test
+    void testGenericRecordDataWithVariousTypes() throws Exception {
+        RecordDataSerializer serializer = RecordDataSerializer.INSTANCE;
+
+        GenericRecordData record =
+                GenericRecordData.of(
+                        true,
+                        (byte) 42,
+                        (short) 1024,
+                        123456,
+                        789L,
+                        3.14f,
+                        2.718281828,
+                        BinaryStringData.fromString("hello"),
+                        new byte[] {1, 2, 3},
+                        DecimalData.fromBigDecimal(new 
BigDecimal("12345.6789"), 10, 4),
+                        TimestampData.fromMillis(1609459200000L, 123456),
+                        
LocalZonedTimestampData.fromEpochMillis(1609459200000L, 654321),
+                        ZonedTimestampData.of(1609459200000L, 789012, "UTC"),
+                        DateData.fromEpochDay(18628),
+                        TimeData.fromMillisOfDay(43200000),
+                        null);
+
+        DataOutputSerializer out = new DataOutputSerializer(256);
+        serializer.serialize(record, out);
+        DataInputDeserializer in = new 
DataInputDeserializer(out.getCopyOfBuffer());
+        RecordData deserialized = serializer.deserialize(in);
+
+        assertThat(deserialized).isInstanceOf(GenericRecordData.class);
+        assertThat(deserialized.getArity()).isEqualTo(record.getArity());
+        assertThat(deserialized.getBoolean(0)).isTrue();
+        assertThat(deserialized.getByte(1)).isEqualTo((byte) 42);
+        assertThat(deserialized.getShort(2)).isEqualTo((short) 1024);
+        assertThat(deserialized.getInt(3)).isEqualTo(123456);
+        assertThat(deserialized.getLong(4)).isEqualTo(789L);
+        assertThat(deserialized.getFloat(5)).isEqualTo(3.14f);
+        assertThat(deserialized.getDouble(6)).isEqualTo(2.718281828);
+        assertThat(deserialized.getString(7).toString()).isEqualTo("hello");
+        assertThat(deserialized.getBinary(8)).isEqualTo(new byte[] {1, 2, 3});
+        assertThat(deserialized.getDecimal(9, 10, 4).toBigDecimal())
+                .isEqualByComparingTo(new BigDecimal("12345.6789"));
+        assertThat(deserialized.getTimestamp(10, 
6).getMillisecond()).isEqualTo(1609459200000L);
+        assertThat(deserialized.getTimestamp(10, 
6).getNanoOfMillisecond()).isEqualTo(123456);
+        assertThat(deserialized.getLocalZonedTimestampData(11, 
6).getEpochMillisecond())
+                .isEqualTo(1609459200000L);
+        assertThat(deserialized.getZonedTimestamp(12, 6).getMillisecond())
+                .isEqualTo(1609459200000L);
+        assertThat(deserialized.getZonedTimestamp(12, 
6).getNanoOfMillisecond()).isEqualTo(789012);
+        assertThat(deserialized.getZonedTimestamp(12, 
6).getZoneId()).isEqualTo("UTC");
+        assertThat(deserialized.getDate(13).toEpochDay()).isEqualTo(18628);
+        
assertThat(deserialized.getTime(14).toMillisOfDay()).isEqualTo(43200000);
+        assertThat(deserialized.isNullAt(15)).isTrue();
+    }
+
+    @Test
+    void testBinaryRecordDataWithVariousTypes() throws Exception {
+        RecordDataSerializer serializer = RecordDataSerializer.INSTANCE;
+
+        RowType rowType =
+                RowType.of(
+                        DataTypes.BOOLEAN(),
+                        DataTypes.TINYINT(),
+                        DataTypes.SMALLINT(),
+                        DataTypes.INT(),
+                        DataTypes.BIGINT(),
+                        DataTypes.FLOAT(),
+                        DataTypes.DOUBLE(),
+                        DataTypes.STRING(),
+                        DataTypes.BYTES(),
+                        DataTypes.DECIMAL(10, 4),
+                        DataTypes.TIMESTAMP(6),
+                        DataTypes.TIMESTAMP_LTZ(6),
+                        DataTypes.TIMESTAMP_TZ(6),
+                        DataTypes.DATE(),
+                        DataTypes.TIME(),
+                        DataTypes.STRING());
+
+        BinaryRecordData record =
+                new BinaryRecordDataGenerator(rowType)
+                        .generate(
+                                new Object[] {
+                                    true,
+                                    (byte) 42,
+                                    (short) 1024,
+                                    123456,
+                                    789L,
+                                    3.14f,
+                                    2.718281828,
+                                    BinaryStringData.fromString("hello"),
+                                    new byte[] {1, 2, 3},
+                                    DecimalData.fromBigDecimal(new 
BigDecimal("12345.6789"), 10, 4),
+                                    TimestampData.fromMillis(1609459200000L, 
123456),
+                                    
LocalZonedTimestampData.fromEpochMillis(1609459200000L, 654321),
+                                    ZonedTimestampData.of(1609459200000L, 
789012, "UTC"),
+                                    DateData.fromEpochDay(18628),
+                                    TimeData.fromMillisOfDay(43200000),
+                                    null
+                                });
+
+        DataOutputSerializer out = new DataOutputSerializer(256);
+        serializer.serialize(record, out);
+        DataInputDeserializer in = new 
DataInputDeserializer(out.getCopyOfBuffer());
+        RecordData deserialized = serializer.deserialize(in);
+
+        assertThat(deserialized).isInstanceOf(BinaryRecordData.class);
+        assertThat(deserialized.getArity()).isEqualTo(record.getArity());
+        assertThat(deserialized.getBoolean(0)).isTrue();
+        assertThat(deserialized.getByte(1)).isEqualTo((byte) 42);
+        assertThat(deserialized.getShort(2)).isEqualTo((short) 1024);
+        assertThat(deserialized.getInt(3)).isEqualTo(123456);
+        assertThat(deserialized.getLong(4)).isEqualTo(789L);
+        assertThat(deserialized.getFloat(5)).isEqualTo(3.14f);
+        assertThat(deserialized.getDouble(6)).isEqualTo(2.718281828);
+        assertThat(deserialized.getString(7).toString()).isEqualTo("hello");
+        assertThat(deserialized.getBinary(8)).isEqualTo(new byte[] {1, 2, 3});
+        assertThat(deserialized.getDecimal(9, 10, 4).toBigDecimal())
+                .isEqualByComparingTo(new BigDecimal("12345.6789"));
+        assertThat(deserialized.getTimestamp(10, 
6).getMillisecond()).isEqualTo(1609459200000L);
+        assertThat(deserialized.getTimestamp(10, 
6).getNanoOfMillisecond()).isEqualTo(123456);
+        assertThat(deserialized.getLocalZonedTimestampData(11, 
6).getEpochMillisecond())
+                .isEqualTo(1609459200000L);
+        assertThat(deserialized.getZonedTimestamp(12, 6).getMillisecond())
+                .isEqualTo(1609459200000L);
+        assertThat(deserialized.getDate(13).toEpochDay()).isEqualTo(18628);
+        
assertThat(deserialized.getTime(14).toMillisOfDay()).isEqualTo(43200000);
+        assertThat(deserialized.isNullAt(15)).isTrue();
+    }
+
+    @Test
+    void testGenericRecordDataWithNestedTypes() throws Exception {
+        RecordDataSerializer serializer = RecordDataSerializer.INSTANCE;
+
+        GenericRecordData nestedGeneric =
+                GenericRecordData.of(42, 
BinaryStringData.fromString("nested"));
+
+        BinaryRecordData nestedBinary =
+                new BinaryRecordDataGenerator(RowType.of(DataTypes.INT(), 
DataTypes.STRING()))
+                        .generate(new Object[] {99, 
BinaryStringData.fromString("binary-nested")});
+
+        GenericArrayData intArray = new GenericArrayData(new int[] {1, 2, 3, 
4, 5});
+        GenericArrayData stringArray =
+                new GenericArrayData(
+                        new Object[] {
+                            BinaryStringData.fromString("a"), 
BinaryStringData.fromString("b")
+                        });
+        GenericMapData map =
+                new GenericMapData(
+                        Map.of(
+                                BinaryStringData.fromString("k1"),
+                                100,
+                                BinaryStringData.fromString("k2"),
+                                200));
+
+        GenericRecordData record =
+                GenericRecordData.of(nestedGeneric, nestedBinary, intArray, 
stringArray, map);
+
+        DataOutputSerializer out = new DataOutputSerializer(512);
+        serializer.serialize(record, out);
+        DataInputDeserializer in = new 
DataInputDeserializer(out.getCopyOfBuffer());
+        RecordData deserialized = serializer.deserialize(in);
+
+        assertThat(deserialized).isInstanceOf(GenericRecordData.class);
+        assertThat(deserialized.getArity()).isEqualTo(5);
+
+        RecordData dNestedGeneric = deserialized.getRow(0, 2);
+        assertThat(dNestedGeneric.getInt(0)).isEqualTo(42);
+        assertThat(dNestedGeneric.getString(1).toString()).isEqualTo("nested");
+
+        RecordData dNestedBinary = deserialized.getRow(1, 2);
+        assertThat(dNestedBinary.getInt(0)).isEqualTo(99);
+        
assertThat(dNestedBinary.getString(1).toString()).isEqualTo("binary-nested");
+
+        ArrayData dIntArray = deserialized.getArray(2);
+        assertThat(dIntArray.size()).isEqualTo(5);
+        assertThat(dIntArray.getInt(0)).isEqualTo(1);
+        assertThat(dIntArray.getInt(4)).isEqualTo(5);
+
+        ArrayData dStringArray = deserialized.getArray(3);
+        assertThat(dStringArray.size()).isEqualTo(2);
+        assertThat(dStringArray.getString(0).toString()).isEqualTo("a");
+        assertThat(dStringArray.getString(1).toString()).isEqualTo("b");
+
+        MapData dMap = deserialized.getMap(4);
+        assertThat(dMap.size()).isEqualTo(2);
+    }
+
+    @Test
+    void testBinaryRecordDataWithNestedTypes() throws Exception {
+        RecordDataSerializer serializer = RecordDataSerializer.INSTANCE;
+
+        RowType rowType =
+                RowType.of(
+                        DataTypes.ARRAY(DataTypes.STRING()),
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+                        DataTypes.ROW(
+                                DataTypes.FIELD("f1", DataTypes.INT()),
+                                DataTypes.FIELD("f2", DataTypes.STRING())));
+
+        BinaryRecordData record =
+                new BinaryRecordDataGenerator(rowType)
+                        .generate(
+                                new Object[] {
+                                    new GenericArrayData(
+                                            new Object[] {
+                                                
BinaryStringData.fromString("x"),
+                                                
BinaryStringData.fromString("y"),
+                                                
BinaryStringData.fromString("z")
+                                            }),
+                                    new GenericMapData(
+                                            Map.of(
+                                                    
BinaryStringData.fromString("p"),
+                                                    10,
+                                                    
BinaryStringData.fromString("q"),
+                                                    20)),
+                                    new BinaryRecordDataGenerator(
+                                                    
RowType.of(DataTypes.INT(), DataTypes.STRING()))
+                                            .generate(
+                                                    new Object[] {
+                                                        77, 
BinaryStringData.fromString("inner")
+                                                    })
+                                });
+
+        DataOutputSerializer out = new DataOutputSerializer(512);
+        serializer.serialize(record, out);
+        DataInputDeserializer in = new 
DataInputDeserializer(out.getCopyOfBuffer());
+        RecordData deserialized = serializer.deserialize(in);
+
+        assertThat(deserialized).isInstanceOf(BinaryRecordData.class);
+
+        ArrayData array = deserialized.getArray(0);
+        assertThat(array.size()).isEqualTo(3);
+        assertThat(array.getString(0).toString()).isEqualTo("x");
+        assertThat(array.getString(2).toString()).isEqualTo("z");
+
+        MapData map = deserialized.getMap(1);
+        assertThat(map.size()).isEqualTo(2);
+
+        RecordData nested = deserialized.getRow(2, 2);
+        assertThat(nested.getInt(0)).isEqualTo(77);
+        assertThat(nested.getString(1).toString()).isEqualTo("inner");
+    }
+
+    @Test
+    void testGenericRecordDataCopy() {
+        RecordDataSerializer serializer = RecordDataSerializer.INSTANCE;
+
+        GenericRecordData record =
+                GenericRecordData.of(
+                        42, BinaryStringData.fromString("copy-test"), new 
byte[] {9, 8, 7});
+
+        RecordData copied = serializer.copy(record);
+
+        
assertThat(copied).isInstanceOf(GenericRecordData.class).isNotSameAs(record);
+        assertThat(copied.getInt(0)).isEqualTo(42);
+        assertThat(copied.getString(1).toString()).isEqualTo("copy-test");
+        assertThat(copied.getBinary(2)).isEqualTo(new byte[] {9, 8, 7});
+        assertThat(copied.getBinary(2)).isNotSameAs(record.getBinary(2));
+    }
+
+    @Test
+    void testDecimalDataPreservesPrecisionAndScale() throws Exception {
+        RecordDataSerializer serializer = RecordDataSerializer.INSTANCE;
+
+        DecimalData decimal1 = DecimalData.fromBigDecimal(new 
BigDecimal("1.23"), 20, 4);
+        DecimalData decimal2 = DecimalData.fromBigDecimal(new 
BigDecimal("42"), 15, 0);
+        DecimalData decimal3 = DecimalData.fromBigDecimal(new 
BigDecimal("0.0010"), 10, 4);
+
+        GenericRecordData record = GenericRecordData.of(decimal1, decimal2, 
decimal3);
+
+        DataOutputSerializer out = new DataOutputSerializer(256);
+        serializer.serialize(record, out);
+        DataInputDeserializer in = new 
DataInputDeserializer(out.getCopyOfBuffer());
+        RecordData deserialized = serializer.deserialize(in);
+
+        assertThat(deserialized).isInstanceOf(GenericRecordData.class);
+
+        DecimalData dDecimal1 = deserialized.getDecimal(0, 20, 4);
+        assertThat(dDecimal1.precision()).isEqualTo(20);
+        assertThat(dDecimal1.scale()).isEqualTo(4);
+        assertThat(dDecimal1.toBigDecimal()).isEqualByComparingTo(new 
BigDecimal("1.23"));
+
+        DecimalData dDecimal2 = deserialized.getDecimal(1, 15, 0);
+        assertThat(dDecimal2.precision()).isEqualTo(15);
+        assertThat(dDecimal2.scale()).isEqualTo(0);
+        assertThat(dDecimal2.toBigDecimal()).isEqualByComparingTo(new 
BigDecimal("42"));
+
+        DecimalData dDecimal3 = deserialized.getDecimal(2, 10, 4);
+        assertThat(dDecimal3.precision()).isEqualTo(10);
+        assertThat(dDecimal3.scale()).isEqualTo(4);
+        assertThat(dDecimal3.toBigDecimal()).isEqualByComparingTo(new 
BigDecimal("0.0010"));
+
+        assertThat(dDecimal2.isCompact()).isTrue();
+    }
 }

Reply via email to