This is an automated email from the ASF dual-hosted git repository.
lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new bb5982b93 [FLINK-39417] Fix GenericRecordData could not be
[de]serialized in pipeline (#4374)
bb5982b93 is described below
commit bb5982b93824e6857889a76591ebd24c9f0bdb60
Author: yuxiqian <[email protected]>
AuthorDate: Thu Apr 30 11:11:03 2026 +0800
[FLINK-39417] Fix GenericRecordData could not be [de]serialized in pipeline
(#4374)
---
.../flink/FlinkPipelineComposerITCase.java | 234 ++++++++++++
.../cdc/connectors/values/ValuesDatabase.java | 6 +-
.../operators/transform/PreTransformOperator.java | 5 +-
.../operators/transform/PreTransformProcessor.java | 2 +-
.../data/GenericRecordDataSerializer.java | 416 +++++++++++++++++++++
.../serializer/data/RecordDataSerializer.java | 68 +++-
.../serializer/data/writer/BinaryWriter.java | 26 +-
.../serializer/data/RecordDataSerializerTest.java | 321 +++++++++++++++-
8 files changed, 1061 insertions(+), 17 deletions(-)
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 47e8fae57..80606a38a 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -18,8 +18,13 @@
package org.apache.flink.cdc.composer.flink;
import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.DateData;
import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.TimeData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
@@ -72,6 +77,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -1733,6 +1739,234 @@ class FlinkPipelineComposerITCase {
.isGreaterThan(0);
}
+ @ParameterizedTest
+ @EnumSource
+ void testGenericRecordDataEndToEnd(ValuesDataSink.SinkApi sinkApi) throws
Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+ TableId myTable1 = TableId.tableId("default_namespace",
"default_schema", "mytable1");
+ Schema table1Schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .physicalColumn("col_bool", DataTypes.BOOLEAN())
+ .physicalColumn("col_tinyint", DataTypes.TINYINT())
+ .physicalColumn("col_smallint", DataTypes.SMALLINT())
+ .physicalColumn("col_bigint", DataTypes.BIGINT())
+ .physicalColumn("col_float", DataTypes.FLOAT())
+ .physicalColumn("col_double", DataTypes.DOUBLE())
+ .physicalColumn("col_decimal", DataTypes.DECIMAL(10,
2))
+ .physicalColumn("col_date", DataTypes.DATE())
+ .physicalColumn("col_time", DataTypes.TIME())
+ .physicalColumn("col_timestamp",
DataTypes.TIMESTAMP(3))
+ .physicalColumn("col_timestamp_ltz",
DataTypes.TIMESTAMP_LTZ(3))
+ .physicalColumn("col_timestamp_tz",
DataTypes.TIMESTAMP_TZ(3))
+ .physicalColumn("col_array",
DataTypes.ARRAY(DataTypes.STRING()))
+ .physicalColumn(
+ "col_map", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT()))
+ .physicalColumn(
+ "col_row",
+ DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1",
DataTypes.STRING())))
+ .primaryKey("id")
+ .build();
+
+ GenericArrayData testArray =
+ new GenericArrayData(
+ new Object[] {
+ BinaryStringData.fromString("x"),
BinaryStringData.fromString("y")
+ });
+ GenericMapData testMap = new
GenericMapData(Map.of(BinaryStringData.fromString("k1"), 100));
+ GenericRecordData testRow = GenericRecordData.of(77,
BinaryStringData.fromString("inner"));
+ DecimalData testDecimal = DecimalData.fromBigDecimal(new
BigDecimal("123.45"), 10, 2);
+ TimestampData testTs = TimestampData.fromMillis(1609459200000L);
+ LocalZonedTimestampData testTsLtz =
LocalZonedTimestampData.fromEpochMillis(1609459200000L);
+ ZonedTimestampData testTsTz = ZonedTimestampData.of(1609459200000L, 0,
"UTC");
+
+ List<Event> events = new ArrayList<>();
+ events.add(new CreateTableEvent(myTable1, table1Schema));
+ events.add(
+ DataChangeEvent.insertEvent(
+ myTable1,
+ GenericRecordData.of(
+ 1,
+ BinaryStringData.fromString("Alice"),
+ 18,
+ true,
+ (byte) 1,
+ (short) 100,
+ 1000L,
+ 1.0f,
+ 1.0,
+ testDecimal,
+ DateData.fromEpochDay(18628),
+ TimeData.fromMillisOfDay(43200000),
+ testTs,
+ testTsLtz,
+ testTsTz,
+ testArray,
+ testMap,
+ testRow)));
+ events.add(
+ DataChangeEvent.insertEvent(
+ myTable1,
+ GenericRecordData.of(
+ 2,
+ BinaryStringData.fromString("Bob"),
+ 20,
+ true,
+ (byte) 42,
+ (short) 200,
+ 9876543210L,
+ 3.14f,
+ 2.718,
+ testDecimal,
+ DateData.fromEpochDay(18628),
+ TimeData.fromMillisOfDay(43200000),
+ testTs,
+ testTsLtz,
+ testTsTz,
+ testArray,
+ testMap,
+ testRow)));
+ events.add(
+ DataChangeEvent.updateEvent(
+ myTable1,
+ GenericRecordData.of(
+ 2,
+ BinaryStringData.fromString("Bob"),
+ 20,
+ true,
+ (byte) 42,
+ (short) 200,
+ 9876543210L,
+ 3.14f,
+ 2.718,
+ testDecimal,
+ DateData.fromEpochDay(18628),
+ TimeData.fromMillisOfDay(43200000),
+ testTs,
+ testTsLtz,
+ testTsTz,
+ testArray,
+ testMap,
+ testRow),
+ GenericRecordData.of(
+ 2,
+ BinaryStringData.fromString("Bob"),
+ 30,
+ false,
+ (byte) 43,
+ (short) 201,
+ 9876543211L,
+ 3.15f,
+ 2.719,
+ testDecimal,
+ DateData.fromEpochDay(18629),
+ TimeData.fromMillisOfDay(43201000),
+ testTs,
+ testTsLtz,
+ testTsTz,
+ testArray,
+ testMap,
+ testRow)));
+ events.add(
+ DataChangeEvent.insertEvent(
+ myTable1,
+ GenericRecordData.of(
+ 3, null, null, null, null, null, null, null,
null, null, null, null,
+ null, null, null, null, null, null)));
+ events.add(
+ DataChangeEvent.deleteEvent(
+ myTable1,
+ GenericRecordData.of(
+ 1,
+ BinaryStringData.fromString("Alice"),
+ 18,
+ true,
+ (byte) 1,
+ (short) 100,
+ 1000L,
+ 1.0f,
+ 1.0,
+ testDecimal,
+ DateData.fromEpochDay(18628),
+ TimeData.fromMillisOfDay(43200000),
+ testTs,
+ testTsLtz,
+ testTsTz,
+ testArray,
+ testMap,
+ testRow)));
+
+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ TransformDef transformDef =
+ new TransformDef(
+ "default_namespace.default_schema.mytable1",
+ "*, 'test_tag' as tag",
+ "id <> 1",
+ null,
+ null,
+ null,
+ "",
+ null);
+
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ new
ArrayList<>(Collections.singletonList(transformDef)),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ List<String> results = ValuesDatabase.getResults(myTable1);
+ assertThat(results).hasSize(2);
+ assertThat(results)
+ .anySatisfy(
+ r ->
+ assertThat(r)
+ .startsWith(
+
"default_namespace.default_schema.mytable1:id=2;name=Bob;age=30;col_bool=false;col_tinyint=43;col_smallint=201;col_bigint=9876543211;col_float=3.15;col_double=2.719;col_decimal=123.45;col_date=2021-01-02;col_time=12:00:01;col_timestamp=2021-01-01T00:00;col_timestamp_ltz=2021-01-01T00:00;col_timestamp_tz=2021-01-01T00:00:00Z;")
+ .endsWith("tag=test_tag"))
+ .anySatisfy(
+ r ->
+ assertThat(r)
+ .isEqualTo(
+
"default_namespace.default_schema.mytable1:id=3;name=;age=;col_bool=;col_tinyint=;col_smallint=;col_bigint=;col_float=;col_double=;col_decimal=;col_date=;col_time=;col_timestamp=;col_timestamp_ltz=;col_timestamp_tz=;col_array=;col_map=;col_row=;tag=test_tag"));
+
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+ assertThat(Arrays.asList(outputEvents))
+ .containsExactly(
+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`col_bool`
BOOLEAN,`col_tinyint` TINYINT,`col_smallint` SMALLINT,`col_bigint`
BIGINT,`col_float` FLOAT,`col_double` DOUBLE,`col_decimal` DECIMAL(10,
2),`col_date` DATE,`col_time` TIME(0),`col_timestamp`
TIMESTAMP(3),`col_timestamp_ltz` TIMESTAMP_LTZ(3),`col_timestamp_tz`
TIMESTAMP(3) WITH TIME ZONE,`col_array` ARRAY<STRING>,`col_map` MAP<S [...]
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[2, Bob, 20, true, 42, 200, 9876543210, 3.14, 2.718, 123.45, 2021-01-01,
12:00, 2021-01-01T00:00, 2021-01-01T00:00, 2021-01-01T00:00:00Z, [x, y], {k1 ->
100}, {f0: INT -> 77, f1: STRING -> inner}, test_tag], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2,
Bob, 20, true, 42, 200, 9876543210, 3.14, 2.718, 123.45, 2021-01-01, 12:00,
2021-01-01T00:00, 2021-01-01T00:00, 2021-01-01T00:00:00Z, [x, y], {k1 -> 100},
{f0: INT -> 77, f1: STRING -> inner}, test_tag], after=[2, Bob, 30, false, 43,
201, 9876543211, 3.15, 2.719, 123.45, 2021-01-02, 12:00:01, 2021-01-01T00:00,
2021-01-01T00:00, 2021-01-01T00:00:00Z, [x, y], {k1 -> 100}, {f0: INT -> 77,
[...]
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[3, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, test_tag], op=INSERT, meta=()}");
+ }
+
BinaryRecordData generate(Schema schema, Object... fields) {
return (new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
.generate(
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
index 698203e91..7689c637d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java
@@ -422,7 +422,11 @@ public class ValuesDatabase {
private String buildPrimaryKeyStr(RecordData recordData) {
StringBuilder stringBuilder = new StringBuilder();
for (Integer primaryKeyIndex : primaryKeyIndexes) {
-
stringBuilder.append(recordData.getString(primaryKeyIndex).toString()).append(",");
+ RecordData.FieldGetter fieldGetter =
+ RecordData.createFieldGetter(
+ columns.get(primaryKeyIndex).getType(),
primaryKeyIndex);
+ Object value = fieldGetter.getFieldOrNull(recordData);
+ stringBuilder.append(value != null ? value.toString() :
"null").append(",");
}
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
return stringBuilder.toString();
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index cb8748d12..7c6db1735 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.runtime.operators.transform;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -391,8 +392,8 @@ public class PreTransformOperator extends
AbstractStreamOperatorAdapter<Event>
+ "This is likely a bug, please consider filing an
issue.",
tableId);
- BinaryRecordData before = (BinaryRecordData)
dataChangeEvent.before();
- BinaryRecordData after = (BinaryRecordData)
dataChangeEvent.after();
+ RecordData before = dataChangeEvent.before();
+ RecordData after = dataChangeEvent.after();
if (before != null) {
BinaryRecordData projectedBefore =
processor.processFillDataField(before);
dataChangeEvent =
DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
index cd679fcd5..00caa1e6c 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
@@ -59,7 +59,7 @@ public class PreTransformProcessor {
return new CreateTableEvent(createTableEvent.tableId(), schema);
}
- public BinaryRecordData processFillDataField(BinaryRecordData data) {
+ public BinaryRecordData processFillDataField(RecordData data) {
List<Object> valueList = new ArrayList<>();
List<Column> columns =
tableChangeInfo.getPreTransformedSchema().getColumns();
Map<String, RecordData.FieldGetter> sourceFieldGettersMap =
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java
new file mode 100644
index 000000000..4f5c406fb
--- /dev/null
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.serializer.data;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.types.variant.BinaryVariant;
+import org.apache.flink.cdc.common.types.variant.Variant;
+import
org.apache.flink.cdc.runtime.serializer.data.binary.BinaryRecordDataSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Serializer for {@link GenericRecordData}. Uses a self-describing format
where each field is
+ * prefixed with a type tag, so no schema information is needed at
serialization time.
+ */
+@Internal
+public class GenericRecordDataSerializer {
+
+ // Type tags for self-describing format
+ private static final byte TAG_NULL = 0;
+ private static final byte TAG_BOOLEAN = 1;
+ private static final byte TAG_BYTE = 2;
+ private static final byte TAG_SHORT = 3;
+ private static final byte TAG_INT = 4;
+ private static final byte TAG_LONG = 5;
+ private static final byte TAG_FLOAT = 6;
+ private static final byte TAG_DOUBLE = 7;
+ private static final byte TAG_STRING = 8;
+ private static final byte TAG_BINARY = 9;
+ private static final byte TAG_DECIMAL = 10;
+ private static final byte TAG_TIMESTAMP = 11;
+ private static final byte TAG_ZONED_TIMESTAMP = 12;
+ private static final byte TAG_LOCAL_ZONED_TIMESTAMP = 13;
+ private static final byte TAG_DATE = 14;
+ private static final byte TAG_TIME = 15;
+ private static final byte TAG_GENERIC_RECORD = 16;
+ private static final byte TAG_BINARY_RECORD = 17;
+ private static final byte TAG_ARRAY = 18;
+ private static final byte TAG_MAP = 19;
+ private static final byte TAG_VARIANT = 20;
+
+ private GenericRecordDataSerializer() {}
+
+ /** Serializes a {@link GenericRecordData} to the given output view. */
+ public static void serialize(GenericRecordData record, DataOutputView
target)
+ throws IOException {
+ int arity = record.getArity();
+ target.writeInt(arity);
+ for (int i = 0; i < arity; i++) {
+ serializeField(record.getField(i), target);
+ }
+ }
+
+ /** Deserializes a {@link GenericRecordData} from the given input view. */
+ public static GenericRecordData deserialize(DataInputView source) throws
IOException {
+ int arity = source.readInt();
+ GenericRecordData record = new GenericRecordData(arity);
+ for (int i = 0; i < arity; i++) {
+ record.setField(i, deserializeField(source));
+ }
+ return record;
+ }
+
+ /** Creates a deep copy of the given {@link GenericRecordData}. */
+ public static GenericRecordData copy(GenericRecordData from) {
+ return copy(from, new GenericRecordData(from.getArity()));
+ }
+
+ public static GenericRecordData copy(GenericRecordData from,
GenericRecordData reuse) {
+ int arity = from.getArity();
+ GenericRecordData target =
+ (reuse.getArity() == arity) ? reuse : new
GenericRecordData(arity);
+ for (int i = 0; i < arity; i++) {
+ target.setField(i, copyField(from.getField(i)));
+ }
+ return target;
+ }
+
+ // ---- Field-level serialization ----
+
+ static void serializeField(Object field, DataOutputView target) throws
IOException {
+ if (field == null) {
+ target.writeByte(TAG_NULL);
+ } else if (field instanceof Boolean) {
+ target.writeByte(TAG_BOOLEAN);
+ target.writeBoolean((Boolean) field);
+ } else if (field instanceof Byte) {
+ target.writeByte(TAG_BYTE);
+ target.writeByte((Byte) field);
+ } else if (field instanceof Short) {
+ target.writeByte(TAG_SHORT);
+ target.writeShort((Short) field);
+ } else if (field instanceof Integer) {
+ target.writeByte(TAG_INT);
+ target.writeInt((Integer) field);
+ } else if (field instanceof Long) {
+ target.writeByte(TAG_LONG);
+ target.writeLong((Long) field);
+ } else if (field instanceof Float) {
+ target.writeByte(TAG_FLOAT);
+ target.writeFloat((Float) field);
+ } else if (field instanceof Double) {
+ target.writeByte(TAG_DOUBLE);
+ target.writeDouble((Double) field);
+ } else if (field instanceof StringData) {
+ target.writeByte(TAG_STRING);
+ byte[] bytes = ((StringData) field).toBytes();
+ target.writeInt(bytes.length);
+ target.write(bytes);
+ } else if (field instanceof byte[]) {
+ target.writeByte(TAG_BINARY);
+ byte[] bytes = (byte[]) field;
+ target.writeInt(bytes.length);
+ target.write(bytes);
+ } else if (field instanceof DecimalData) {
+ target.writeByte(TAG_DECIMAL);
+ DecimalData decimal = (DecimalData) field;
+ // Use DecimalData's precision/scale (SQL DECIMAL(p,s)) instead of
BigDecimal's
+ target.writeInt(decimal.precision());
+ target.writeInt(decimal.scale());
+ byte[] unscaled = decimal.toUnscaledBytes();
+ target.writeInt(unscaled.length);
+ target.write(unscaled);
+ } else if (field instanceof TimestampData) {
+ target.writeByte(TAG_TIMESTAMP);
+ TimestampData ts = (TimestampData) field;
+ target.writeLong(ts.getMillisecond());
+ target.writeInt(ts.getNanoOfMillisecond());
+ } else if (field instanceof ZonedTimestampData) {
+ target.writeByte(TAG_ZONED_TIMESTAMP);
+ ZonedTimestampData zts = (ZonedTimestampData) field;
+ target.writeLong(zts.getMillisecond());
+ target.writeInt(zts.getNanoOfMillisecond());
+ byte[] zoneBytes =
zts.getZoneId().getBytes(StandardCharsets.UTF_8);
+ target.writeInt(zoneBytes.length);
+ target.write(zoneBytes);
+ } else if (field instanceof LocalZonedTimestampData) {
+ target.writeByte(TAG_LOCAL_ZONED_TIMESTAMP);
+ LocalZonedTimestampData lzts = (LocalZonedTimestampData) field;
+ target.writeLong(lzts.getEpochMillisecond());
+ target.writeInt(lzts.getEpochNanoOfMillisecond());
+ } else if (field instanceof DateData) {
+ target.writeByte(TAG_DATE);
+ target.writeInt(((DateData) field).toEpochDay());
+ } else if (field instanceof TimeData) {
+ target.writeByte(TAG_TIME);
+ target.writeInt(((TimeData) field).toMillisOfDay());
+ } else if (field instanceof GenericRecordData) {
+ target.writeByte(TAG_GENERIC_RECORD);
+ serialize((GenericRecordData) field, target);
+ } else if (field instanceof BinaryRecordData) {
+ target.writeByte(TAG_BINARY_RECORD);
+ BinaryRecordDataSerializer.INSTANCE.serialize((BinaryRecordData)
field, target);
+ } else if (field instanceof ArrayData) {
+ target.writeByte(TAG_ARRAY);
+ serializeArrayData((ArrayData) field, target);
+ } else if (field instanceof MapData) {
+ target.writeByte(TAG_MAP);
+ serializeMapData((MapData) field, target);
+ } else if (field instanceof Variant) {
+ target.writeByte(TAG_VARIANT);
+ serializeVariant((Variant) field, target);
+ } else {
+ throw new IOException(
+ "Unsupported field type in GenericRecordData: " +
field.getClass().getName());
+ }
+ }
+
+ static Object deserializeField(DataInputView source) throws IOException {
+ byte tag = source.readByte();
+ switch (tag) {
+ case TAG_NULL:
+ return null;
+ case TAG_BOOLEAN:
+ return source.readBoolean();
+ case TAG_BYTE:
+ return source.readByte();
+ case TAG_SHORT:
+ return source.readShort();
+ case TAG_INT:
+ return source.readInt();
+ case TAG_LONG:
+ return source.readLong();
+ case TAG_FLOAT:
+ return source.readFloat();
+ case TAG_DOUBLE:
+ return source.readDouble();
+ case TAG_STRING:
+ {
+ int len = source.readInt();
+ byte[] bytes = new byte[len];
+ source.readFully(bytes);
+ return BinaryStringData.fromBytes(bytes);
+ }
+ case TAG_BINARY:
+ {
+ int len = source.readInt();
+ byte[] bytes = new byte[len];
+ source.readFully(bytes);
+ return bytes;
+ }
+ case TAG_DECIMAL:
+ {
+ int precision = source.readInt();
+ int scale = source.readInt();
+ int len = source.readInt();
+ byte[] unscaled = new byte[len];
+ source.readFully(unscaled);
+ return DecimalData.fromUnscaledBytes(unscaled, precision,
scale);
+ }
+ case TAG_TIMESTAMP:
+ return TimestampData.fromMillis(source.readLong(),
source.readInt());
+ case TAG_ZONED_TIMESTAMP:
+ {
+ long millis = source.readLong();
+ int nanos = source.readInt();
+ int zoneLen = source.readInt();
+ byte[] zoneBytes = new byte[zoneLen];
+ source.readFully(zoneBytes);
+ return ZonedTimestampData.of(
+ millis, nanos, new String(zoneBytes,
StandardCharsets.UTF_8));
+ }
+ case TAG_LOCAL_ZONED_TIMESTAMP:
+ return
LocalZonedTimestampData.fromEpochMillis(source.readLong(), source.readInt());
+ case TAG_DATE:
+ return DateData.fromEpochDay(source.readInt());
+ case TAG_TIME:
+ return TimeData.fromMillisOfDay(source.readInt());
+ case TAG_GENERIC_RECORD:
+ return deserialize(source);
+ case TAG_BINARY_RECORD:
+ return BinaryRecordDataSerializer.INSTANCE.deserialize(source);
+ case TAG_ARRAY:
+ return deserializeArrayData(source);
+ case TAG_MAP:
+ return deserializeMapData(source);
+ case TAG_VARIANT:
+ return deserializeVariant(source);
+ default:
+ throw new IOException("Unknown field type tag: " + tag);
+ }
+ }
+
+ // ---- ArrayData serialization ----
+
+ private static void serializeArrayData(ArrayData arrayData, DataOutputView
target)
+ throws IOException {
+ if (arrayData instanceof GenericArrayData) {
+ Object[] elements = ((GenericArrayData) arrayData).toObjectArray();
+ target.writeInt(elements.length);
+ for (Object element : elements) {
+ serializeField(element, target);
+ }
+ } else {
+ throw new IOException(
+ "Serialization of non-generic ArrayData is not supported
in GenericRecordDataSerializer. "
+ + "Actual type: "
+ + arrayData.getClass().getName());
+ }
+ }
+
+ private static ArrayData deserializeArrayData(DataInputView source) throws
IOException {
+ int size = source.readInt();
+ Object[] elements = new Object[size];
+ for (int i = 0; i < size; i++) {
+ elements[i] = deserializeField(source);
+ }
+ return new GenericArrayData(elements);
+ }
+
+ // ---- MapData serialization ----
+
+ private static void serializeMapData(MapData mapData, DataOutputView
target)
+ throws IOException {
+ if (mapData instanceof GenericMapData) {
+ ArrayData keyArray = mapData.keyArray();
+ ArrayData valueArray = mapData.valueArray();
+ if (!(keyArray instanceof GenericArrayData)
+ || !(valueArray instanceof GenericArrayData)) {
+ throw new IOException(
+ "MapData with non-generic key/value arrays is not
supported in GenericRecordDataSerializer.");
+ }
+ int size = mapData.size();
+ target.writeInt(size);
+ Object[] keys = ((GenericArrayData) keyArray).toObjectArray();
+ Object[] values = ((GenericArrayData) valueArray).toObjectArray();
+ for (int i = 0; i < size; i++) {
+ serializeField(keys[i], target);
+ serializeField(values[i], target);
+ }
+ } else {
+ throw new IOException(
+ "Serialization of non-generic MapData is not supported in
GenericRecordDataSerializer. "
+ + "Actual type: "
+ + mapData.getClass().getName());
+ }
+ }
+
+ private static MapData deserializeMapData(DataInputView source) throws
IOException {
+ int size = source.readInt();
+ Map<Object, Object> map = new LinkedHashMap<>(size);
+ for (int i = 0; i < size; i++) {
+ Object key = deserializeField(source);
+ Object value = deserializeField(source);
+ map.put(key, value);
+ }
+ return new GenericMapData(map);
+ }
+
+ // ---- Variant serialization ----
+
+ private static void serializeVariant(Variant variant, DataOutputView
target)
+ throws IOException {
+ if (variant instanceof BinaryVariant) {
+ BinaryVariant bv = (BinaryVariant) variant;
+ byte[] value = bv.getValue();
+ byte[] metadata = bv.getMetadata();
+ target.writeInt(value.length);
+ target.write(value);
+ target.writeInt(metadata.length);
+ target.write(metadata);
+ } else {
+ throw new IOException("Unsupported Variant type: " +
variant.getClass().getName());
+ }
+ }
+
+ private static Variant deserializeVariant(DataInputView source) throws
IOException {
+ int valueLen = source.readInt();
+ byte[] value = new byte[valueLen];
+ source.readFully(value);
+ int metadataLen = source.readInt();
+ byte[] metadata = new byte[metadataLen];
+ source.readFully(metadata);
+ return new BinaryVariant(value, metadata);
+ }
+
+ // ---- Field copy ----
+
+ private static Object copyField(Object field) {
+ if (field == null) {
+ return null;
+ }
+ // Most CDC internal data types are immutable, so shallow copy is safe
+ if (field instanceof byte[]) {
+ return ((byte[]) field).clone();
+ } else if (field instanceof GenericRecordData) {
+ return copy((GenericRecordData) field);
+ } else if (field instanceof BinaryRecordData) {
+ return ((BinaryRecordData) field).copy();
+ } else if (field instanceof GenericArrayData) {
+ Object[] elements = ((GenericArrayData) field).toObjectArray();
+ Object[] copied = new Object[elements.length];
+ for (int i = 0; i < elements.length; i++) {
+ copied[i] = copyField(elements[i]);
+ }
+ return new GenericArrayData(copied);
+ } else if (field instanceof GenericMapData) {
+ GenericMapData mapData = (GenericMapData) field;
+ ArrayData keyArray = mapData.keyArray();
+ ArrayData valueArray = mapData.valueArray();
+ if (!(keyArray instanceof GenericArrayData)
+ || !(valueArray instanceof GenericArrayData)) {
+ throw new IllegalArgumentException(
+ "Expected GenericArrayData for key and value arrays in
GenericMapData, but got: keyArray="
+ + keyArray.getClass().getName()
+ + ", valueArray="
+ + valueArray.getClass().getName());
+ }
+ Object[] keys = ((GenericArrayData) keyArray).toObjectArray();
+ Object[] values = ((GenericArrayData) valueArray).toObjectArray();
+ Map<Object, Object> newMap = new LinkedHashMap<>(keys.length);
+ for (int i = 0; i < keys.length; i++) {
+ newMap.put(copyField(keys[i]), copyField(values[i]));
+ }
+ return new GenericMapData(newMap);
+ }
+ // Immutable types: Boolean, Byte, Short, Integer, Long, Float, Double,
+ // StringData, DecimalData, TimestampData, ZonedTimestampData,
+ // LocalZonedTimestampData, DateData, TimeData, Variant
+ return field;
+ }
+}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java
index 6348c5c29..42948e3fa 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.runtime.serializer.data;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.cdc.common.data.GenericRecordData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton;
@@ -33,43 +34,90 @@ public class RecordDataSerializer extends
TypeSerializerSingleton<RecordData> {
private static final long serialVersionUID = 1L;
+ /** Type tag for BinaryRecordData serialization. */
+ private static final byte BINARY_RECORD_TYPE = 0;
+
+ /** Type tag for GenericRecordData serialization. */
+ private static final byte GENERIC_RECORD_TYPE = 1;
+
private final BinaryRecordDataSerializer binarySerializer =
BinaryRecordDataSerializer.INSTANCE;
public static final RecordDataSerializer INSTANCE = new
RecordDataSerializer();
@Override
public RecordData createInstance() {
- // BinaryRecordData is the only implementation of RecordData
return new BinaryRecordData(1);
}
@Override
public void serialize(RecordData recordData, DataOutputView target) throws
IOException {
- // BinaryRecordData is the only implementation of RecordData
- binarySerializer.serialize((BinaryRecordData) recordData, target);
+ if (recordData instanceof BinaryRecordData) {
+ target.writeByte(BINARY_RECORD_TYPE);
+ binarySerializer.serialize((BinaryRecordData) recordData, target);
+ } else if (recordData instanceof GenericRecordData) {
+ target.writeByte(GENERIC_RECORD_TYPE);
+ GenericRecordDataSerializer.serialize((GenericRecordData)
recordData, target);
+ } else {
+ throw new IOException(
+ "Unsupported RecordData type: " +
recordData.getClass().getName());
+ }
}
@Override
public RecordData deserialize(DataInputView source) throws IOException {
- // BinaryRecordData is the only implementation of RecordData
- return binarySerializer.deserialize(source);
+ byte type = source.readByte();
+ if (type == BINARY_RECORD_TYPE) {
+ return binarySerializer.deserialize(source);
+ } else if (type == GENERIC_RECORD_TYPE) {
+ return GenericRecordDataSerializer.deserialize(source);
+ } else {
+ throw new IOException("Unknown RecordData type tag: " + type);
+ }
}
@Override
public RecordData deserialize(RecordData reuse, DataInputView source)
throws IOException {
- return binarySerializer.deserialize((BinaryRecordData) reuse, source);
+ byte type = source.readByte();
+ if (type == BINARY_RECORD_TYPE) {
+ if (reuse instanceof BinaryRecordData) {
+ return binarySerializer.deserialize((BinaryRecordData) reuse,
source);
+ }
+ return binarySerializer.deserialize(source);
+ } else if (type == GENERIC_RECORD_TYPE) {
+ return GenericRecordDataSerializer.deserialize(source);
+ } else {
+ throw new IOException("Unknown RecordData type tag: " + type);
+ }
}
@Override
public RecordData copy(RecordData from) {
- // BinaryRecordData is the only implementation of RecordData
- return ((BinaryRecordData) from).copy();
+ if (from instanceof BinaryRecordData) {
+ return ((BinaryRecordData) from).copy();
+ } else if (from instanceof GenericRecordData) {
+ return GenericRecordDataSerializer.copy((GenericRecordData) from);
+ } else {
+ throw new RuntimeException("Unsupported RecordData type: " +
from.getClass().getName());
+ }
}
@Override
public RecordData copy(RecordData from, RecordData reuse) {
- // BinaryRecordData is the only implementation of RecordData
- return ((BinaryRecordData) from).copy((BinaryRecordData) reuse);
+ if (from instanceof BinaryRecordData) {
+ BinaryRecordData reuseRecord =
+ (reuse instanceof BinaryRecordData)
+ ? (BinaryRecordData) reuse
+ : new BinaryRecordData(from.getArity());
+ return ((BinaryRecordData) from).copy(reuseRecord);
+ } else if (from instanceof GenericRecordData) {
+ GenericRecordData reuseRecord =
+ (reuse instanceof GenericRecordData)
+ ? (GenericRecordData) reuse
+ : new GenericRecordData(from.getArity());
+ return GenericRecordDataSerializer.copy((GenericRecordData) from,
reuseRecord);
+ } else {
+ throw new RuntimeException("Unsupported RecordData type: " +
from.getClass().getName());
+ }
}
@Override
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java
index f6afad756..b698c0845 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java
@@ -29,16 +29,22 @@ import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.TimeData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.common.types.TimeType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.types.variant.Variant;
+import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper;
import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+
+import java.util.List;
/**
* Writer to write a composite data format, like row, array. 1. Invoke {@link
#reset()}. 2. Write
@@ -171,7 +177,25 @@ public interface BinaryWriter {
writer.writeMap(pos, (MapData) o, (MapDataSerializer)
serializer);
break;
case ROW:
- writer.writeRecord(pos, (RecordData) o,
(TypeSerializer<RecordData>) serializer);
+ RecordData recordData = (RecordData) o;
+ if (!(recordData instanceof BinaryRecordData)) {
+ RowType rowType = (RowType) type;
+ List<DataType> childTypes = rowType.getChildren();
+ int arity = recordData.getArity();
+ Preconditions.checkArgument(
+ arity == childTypes.size(),
+ "RecordData arity (%s) does not match row type
field count (%s)",
+ arity,
+ childTypes.size());
+ Object[] fields = new Object[arity];
+ for (int i = 0; i < arity; i++) {
+ fields[i] =
+
RecordData.createFieldGetter(childTypes.get(i), i)
+ .getFieldOrNull(recordData);
+ }
+ recordData = new
BinaryRecordDataGenerator(rowType).generate(fields);
+ }
+ writer.writeRecord(pos, recordData,
(TypeSerializer<RecordData>) serializer);
break;
case BINARY:
case VARBINARY:
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java
index ed84c4665..15cbde474 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java
@@ -17,14 +17,34 @@
package org.apache.flink.cdc.runtime.serializer.data;
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.DateData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.GenericRecordData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.TimeData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
-/** A test for the {@link StringDataSerializer}. */
class RecordDataSerializerTest extends SerializerTestBase<RecordData> {
@Override
@@ -49,7 +69,304 @@ class RecordDataSerializerTest extends
SerializerTestBase<RecordData> {
return new RecordData[] {
generator.generate(new Object[] {1L,
BinaryStringData.fromString("test1")}),
generator.generate(new Object[] {2L,
BinaryStringData.fromString("test2")}),
- generator.generate(new Object[] {3L, null})
+ generator.generate(new Object[] {3L, null}),
+ GenericRecordData.of(1L, BinaryStringData.fromString("test1")),
+ GenericRecordData.of(2L, BinaryStringData.fromString("test2")),
+ GenericRecordData.of(3L, null)
};
}
+
+ @Test
+ void testGenericRecordDataWithVariousTypes() throws Exception {
+ RecordDataSerializer serializer = RecordDataSerializer.INSTANCE;
+
+ GenericRecordData record =
+ GenericRecordData.of(
+ true,
+ (byte) 42,
+ (short) 1024,
+ 123456,
+ 789L,
+ 3.14f,
+ 2.718281828,
+ BinaryStringData.fromString("hello"),
+ new byte[] {1, 2, 3},
+ DecimalData.fromBigDecimal(new
BigDecimal("12345.6789"), 10, 4),
+ TimestampData.fromMillis(1609459200000L, 123456),
+
LocalZonedTimestampData.fromEpochMillis(1609459200000L, 654321),
+ ZonedTimestampData.of(1609459200000L, 789012, "UTC"),
+ DateData.fromEpochDay(18628),
+ TimeData.fromMillisOfDay(43200000),
+ null);
+
+ DataOutputSerializer out = new DataOutputSerializer(256);
+ serializer.serialize(record, out);
+ DataInputDeserializer in = new
DataInputDeserializer(out.getCopyOfBuffer());
+ RecordData deserialized = serializer.deserialize(in);
+
+ assertThat(deserialized).isInstanceOf(GenericRecordData.class);
+ assertThat(deserialized.getArity()).isEqualTo(record.getArity());
+ assertThat(deserialized.getBoolean(0)).isTrue();
+ assertThat(deserialized.getByte(1)).isEqualTo((byte) 42);
+ assertThat(deserialized.getShort(2)).isEqualTo((short) 1024);
+ assertThat(deserialized.getInt(3)).isEqualTo(123456);
+ assertThat(deserialized.getLong(4)).isEqualTo(789L);
+ assertThat(deserialized.getFloat(5)).isEqualTo(3.14f);
+ assertThat(deserialized.getDouble(6)).isEqualTo(2.718281828);
+ assertThat(deserialized.getString(7).toString()).isEqualTo("hello");
+ assertThat(deserialized.getBinary(8)).isEqualTo(new byte[] {1, 2, 3});
+ assertThat(deserialized.getDecimal(9, 10, 4).toBigDecimal())
+ .isEqualByComparingTo(new BigDecimal("12345.6789"));
+ assertThat(deserialized.getTimestamp(10,
6).getMillisecond()).isEqualTo(1609459200000L);
+ assertThat(deserialized.getTimestamp(10,
6).getNanoOfMillisecond()).isEqualTo(123456);
+ assertThat(deserialized.getLocalZonedTimestampData(11,
6).getEpochMillisecond())
+ .isEqualTo(1609459200000L);
+ assertThat(deserialized.getZonedTimestamp(12, 6).getMillisecond())
+ .isEqualTo(1609459200000L);
+ assertThat(deserialized.getZonedTimestamp(12,
6).getNanoOfMillisecond()).isEqualTo(789012);
+ assertThat(deserialized.getZonedTimestamp(12,
6).getZoneId()).isEqualTo("UTC");
+ assertThat(deserialized.getDate(13).toEpochDay()).isEqualTo(18628);
+
assertThat(deserialized.getTime(14).toMillisOfDay()).isEqualTo(43200000);
+ assertThat(deserialized.isNullAt(15)).isTrue();
+ }
+
+ @Test
+ void testBinaryRecordDataWithVariousTypes() throws Exception {
+ RecordDataSerializer serializer = RecordDataSerializer.INSTANCE;
+
+ RowType rowType =
+ RowType.of(
+ DataTypes.BOOLEAN(),
+ DataTypes.TINYINT(),
+ DataTypes.SMALLINT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.FLOAT(),
+ DataTypes.DOUBLE(),
+ DataTypes.STRING(),
+ DataTypes.BYTES(),
+ DataTypes.DECIMAL(10, 4),
+ DataTypes.TIMESTAMP(6),
+ DataTypes.TIMESTAMP_LTZ(6),
+ DataTypes.TIMESTAMP_TZ(6),
+ DataTypes.DATE(),
+ DataTypes.TIME(),
+ DataTypes.STRING());
+
+ BinaryRecordData record =
+ new BinaryRecordDataGenerator(rowType)
+ .generate(
+ new Object[] {
+ true,
+ (byte) 42,
+ (short) 1024,
+ 123456,
+ 789L,
+ 3.14f,
+ 2.718281828,
+ BinaryStringData.fromString("hello"),
+ new byte[] {1, 2, 3},
+ DecimalData.fromBigDecimal(new
BigDecimal("12345.6789"), 10, 4),
+ TimestampData.fromMillis(1609459200000L,
123456),
+
LocalZonedTimestampData.fromEpochMillis(1609459200000L, 654321),
+ ZonedTimestampData.of(1609459200000L,
789012, "UTC"),
+ DateData.fromEpochDay(18628),
+ TimeData.fromMillisOfDay(43200000),
+ null
+ });
+
+ DataOutputSerializer out = new DataOutputSerializer(256);
+ serializer.serialize(record, out);
+ DataInputDeserializer in = new
DataInputDeserializer(out.getCopyOfBuffer());
+ RecordData deserialized = serializer.deserialize(in);
+
+ assertThat(deserialized).isInstanceOf(BinaryRecordData.class);
+ assertThat(deserialized.getArity()).isEqualTo(record.getArity());
+ assertThat(deserialized.getBoolean(0)).isTrue();
+ assertThat(deserialized.getByte(1)).isEqualTo((byte) 42);
+ assertThat(deserialized.getShort(2)).isEqualTo((short) 1024);
+ assertThat(deserialized.getInt(3)).isEqualTo(123456);
+ assertThat(deserialized.getLong(4)).isEqualTo(789L);
+ assertThat(deserialized.getFloat(5)).isEqualTo(3.14f);
+ assertThat(deserialized.getDouble(6)).isEqualTo(2.718281828);
+ assertThat(deserialized.getString(7).toString()).isEqualTo("hello");
+ assertThat(deserialized.getBinary(8)).isEqualTo(new byte[] {1, 2, 3});
+ assertThat(deserialized.getDecimal(9, 10, 4).toBigDecimal())
+ .isEqualByComparingTo(new BigDecimal("12345.6789"));
+ assertThat(deserialized.getTimestamp(10,
6).getMillisecond()).isEqualTo(1609459200000L);
+ assertThat(deserialized.getTimestamp(10,
6).getNanoOfMillisecond()).isEqualTo(123456);
+ assertThat(deserialized.getLocalZonedTimestampData(11,
6).getEpochMillisecond())
+ .isEqualTo(1609459200000L);
+ assertThat(deserialized.getZonedTimestamp(12, 6).getMillisecond())
+ .isEqualTo(1609459200000L);
+ assertThat(deserialized.getDate(13).toEpochDay()).isEqualTo(18628);
+
assertThat(deserialized.getTime(14).toMillisOfDay()).isEqualTo(43200000);
+ assertThat(deserialized.isNullAt(15)).isTrue();
+ }
+
+ @Test
+ void testGenericRecordDataWithNestedTypes() throws Exception {
+ RecordDataSerializer serializer = RecordDataSerializer.INSTANCE;
+
+ GenericRecordData nestedGeneric =
+ GenericRecordData.of(42,
BinaryStringData.fromString("nested"));
+
+ BinaryRecordData nestedBinary =
+ new BinaryRecordDataGenerator(RowType.of(DataTypes.INT(),
DataTypes.STRING()))
+ .generate(new Object[] {99,
BinaryStringData.fromString("binary-nested")});
+
+ GenericArrayData intArray = new GenericArrayData(new int[] {1, 2, 3,
4, 5});
+ GenericArrayData stringArray =
+ new GenericArrayData(
+ new Object[] {
+ BinaryStringData.fromString("a"),
BinaryStringData.fromString("b")
+ });
+ GenericMapData map =
+ new GenericMapData(
+ Map.of(
+ BinaryStringData.fromString("k1"),
+ 100,
+ BinaryStringData.fromString("k2"),
+ 200));
+
+ GenericRecordData record =
+ GenericRecordData.of(nestedGeneric, nestedBinary, intArray,
stringArray, map);
+
+ DataOutputSerializer out = new DataOutputSerializer(512);
+ serializer.serialize(record, out);
+ DataInputDeserializer in = new
DataInputDeserializer(out.getCopyOfBuffer());
+ RecordData deserialized = serializer.deserialize(in);
+
+ assertThat(deserialized).isInstanceOf(GenericRecordData.class);
+ assertThat(deserialized.getArity()).isEqualTo(5);
+
+ RecordData dNestedGeneric = deserialized.getRow(0, 2);
+ assertThat(dNestedGeneric.getInt(0)).isEqualTo(42);
+ assertThat(dNestedGeneric.getString(1).toString()).isEqualTo("nested");
+
+ RecordData dNestedBinary = deserialized.getRow(1, 2);
+ assertThat(dNestedBinary.getInt(0)).isEqualTo(99);
+
assertThat(dNestedBinary.getString(1).toString()).isEqualTo("binary-nested");
+
+ ArrayData dIntArray = deserialized.getArray(2);
+ assertThat(dIntArray.size()).isEqualTo(5);
+ assertThat(dIntArray.getInt(0)).isEqualTo(1);
+ assertThat(dIntArray.getInt(4)).isEqualTo(5);
+
+ ArrayData dStringArray = deserialized.getArray(3);
+ assertThat(dStringArray.size()).isEqualTo(2);
+ assertThat(dStringArray.getString(0).toString()).isEqualTo("a");
+ assertThat(dStringArray.getString(1).toString()).isEqualTo("b");
+
+ MapData dMap = deserialized.getMap(4);
+ assertThat(dMap.size()).isEqualTo(2);
+ }
+
+ @Test
+ void testBinaryRecordDataWithNestedTypes() throws Exception {
+ RecordDataSerializer serializer = RecordDataSerializer.INSTANCE;
+
+ RowType rowType =
+ RowType.of(
+ DataTypes.ARRAY(DataTypes.STRING()),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()),
+ DataTypes.ROW(
+ DataTypes.FIELD("f1", DataTypes.INT()),
+ DataTypes.FIELD("f2", DataTypes.STRING())));
+
+ BinaryRecordData record =
+ new BinaryRecordDataGenerator(rowType)
+ .generate(
+ new Object[] {
+ new GenericArrayData(
+ new Object[] {
+
BinaryStringData.fromString("x"),
+
BinaryStringData.fromString("y"),
+
BinaryStringData.fromString("z")
+ }),
+ new GenericMapData(
+ Map.of(
+
BinaryStringData.fromString("p"),
+ 10,
+
BinaryStringData.fromString("q"),
+ 20)),
+ new BinaryRecordDataGenerator(
+
RowType.of(DataTypes.INT(), DataTypes.STRING()))
+ .generate(
+ new Object[] {
+ 77,
BinaryStringData.fromString("inner")
+ })
+ });
+
+ DataOutputSerializer out = new DataOutputSerializer(512);
+ serializer.serialize(record, out);
+ DataInputDeserializer in = new
DataInputDeserializer(out.getCopyOfBuffer());
+ RecordData deserialized = serializer.deserialize(in);
+
+ assertThat(deserialized).isInstanceOf(BinaryRecordData.class);
+
+ ArrayData array = deserialized.getArray(0);
+ assertThat(array.size()).isEqualTo(3);
+ assertThat(array.getString(0).toString()).isEqualTo("x");
+ assertThat(array.getString(2).toString()).isEqualTo("z");
+
+ MapData map = deserialized.getMap(1);
+ assertThat(map.size()).isEqualTo(2);
+
+ RecordData nested = deserialized.getRow(2, 2);
+ assertThat(nested.getInt(0)).isEqualTo(77);
+ assertThat(nested.getString(1).toString()).isEqualTo("inner");
+ }
+
+ @Test
+ void testGenericRecordDataCopy() {
+ RecordDataSerializer serializer = RecordDataSerializer.INSTANCE;
+
+ GenericRecordData record =
+ GenericRecordData.of(
+ 42, BinaryStringData.fromString("copy-test"), new
byte[] {9, 8, 7});
+
+ RecordData copied = serializer.copy(record);
+
+
assertThat(copied).isInstanceOf(GenericRecordData.class).isNotSameAs(record);
+ assertThat(copied.getInt(0)).isEqualTo(42);
+ assertThat(copied.getString(1).toString()).isEqualTo("copy-test");
+ assertThat(copied.getBinary(2)).isEqualTo(new byte[] {9, 8, 7});
+ assertThat(copied.getBinary(2)).isNotSameAs(record.getBinary(2));
+ }
+
+ @Test
+ void testDecimalDataPreservesPrecisionAndScale() throws Exception {
+ RecordDataSerializer serializer = RecordDataSerializer.INSTANCE;
+
+ DecimalData decimal1 = DecimalData.fromBigDecimal(new
BigDecimal("1.23"), 20, 4);
+ DecimalData decimal2 = DecimalData.fromBigDecimal(new
BigDecimal("42"), 15, 0);
+ DecimalData decimal3 = DecimalData.fromBigDecimal(new
BigDecimal("0.0010"), 10, 4);
+
+ GenericRecordData record = GenericRecordData.of(decimal1, decimal2,
decimal3);
+
+ DataOutputSerializer out = new DataOutputSerializer(256);
+ serializer.serialize(record, out);
+ DataInputDeserializer in = new
DataInputDeserializer(out.getCopyOfBuffer());
+ RecordData deserialized = serializer.deserialize(in);
+
+ assertThat(deserialized).isInstanceOf(GenericRecordData.class);
+
+ DecimalData dDecimal1 = deserialized.getDecimal(0, 20, 4);
+ assertThat(dDecimal1.precision()).isEqualTo(20);
+ assertThat(dDecimal1.scale()).isEqualTo(4);
+ assertThat(dDecimal1.toBigDecimal()).isEqualByComparingTo(new
BigDecimal("1.23"));
+
+ DecimalData dDecimal2 = deserialized.getDecimal(1, 15, 0);
+ assertThat(dDecimal2.precision()).isEqualTo(15);
+ assertThat(dDecimal2.scale()).isEqualTo(0);
+ assertThat(dDecimal2.toBigDecimal()).isEqualByComparingTo(new
BigDecimal("42"));
+
+ DecimalData dDecimal3 = deserialized.getDecimal(2, 10, 4);
+ assertThat(dDecimal3.precision()).isEqualTo(10);
+ assertThat(dDecimal3.scale()).isEqualTo(4);
+ assertThat(dDecimal3.toBigDecimal()).isEqualByComparingTo(new
BigDecimal("0.0010"));
+
+ assertThat(dDecimal2.isCompact()).isTrue();
+ }
}