This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 9cc3451dd [FLINK-35251][cdc][runtime] Fix bug of serializing derivation mapping in SchemaDerivation 9cc3451dd is described below commit 9cc3451ddf6546317c4c3af1efbc68c2d745ef7d Author: Qingsheng Ren <renqs...@gmail.com> AuthorDate: Sun Apr 28 08:30:48 2024 +0800 [FLINK-35251][cdc][runtime] Fix bug of serializing derivation mapping in SchemaDerivation This closes #3267. --- .../schema/coordinator/SchemaDerivation.java | 2 +- .../schema/coordinator/SchemaDerivationTest.java | 28 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java index baf1ad66c..026dda626 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java @@ -123,7 +123,7 @@ public class SchemaDerivation { TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE; // Serialize derivation mapping in SchemaDerivation Map<TableId, Set<TableId>> derivationMapping = schemaDerivation.getDerivationMapping(); - out.write(derivationMapping.size()); + out.writeInt(derivationMapping.size()); for (Map.Entry<TableId, Set<TableId>> entry : derivationMapping.entrySet()) { // Routed table ID TableId routedTableId = entry.getKey(); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java index 670286cb6..19fd5a4a1 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java @@ -36,11 +36,17 @@ import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.flink.cdc.common.testutils.assertions.EventAssertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -362,4 +368,26 @@ class SchemaDerivationTest { .isInstanceOf(IllegalStateException.class) .hasMessage("Incompatible types: \"INT\" and \"STRING\""); } + + @Test + void testSerde() throws Exception { + Map<TableId, Set<TableId>> derivationMapping = new HashMap<>(); + Set<TableId> originalTableIds = new HashSet<>(); + originalTableIds.add(TABLE_1); + originalTableIds.add(TABLE_2); + derivationMapping.put(MERGED_TABLE, originalTableIds); + SchemaDerivation schemaDerivation = + new SchemaDerivation(new SchemaManager(), ROUTES, derivationMapping); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos)) { + SchemaDerivation.serializeDerivationMapping(schemaDerivation, out); + byte[] serialized = baos.toByteArray(); + try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + DataInputStream in = new DataInputStream(bais)) { + Map<TableId, Set<TableId>> deserialized = + SchemaDerivation.deserializerDerivationMapping(in); + assertThat(deserialized).isEqualTo(derivationMapping); + } + } + } }