This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cc3451dd [FLINK-35251][cdc][runtime] Fix bug of serializing 
derivation mapping in SchemaDerivation
9cc3451dd is described below

commit 9cc3451ddf6546317c4c3af1efbc68c2d745ef7d
Author: Qingsheng Ren <renqs...@gmail.com>
AuthorDate: Sun Apr 28 08:30:48 2024 +0800

    [FLINK-35251][cdc][runtime] Fix bug of serializing derivation mapping in 
SchemaDerivation
    
    This closes  #3267.
---
 .../schema/coordinator/SchemaDerivation.java       |  2 +-
 .../schema/coordinator/SchemaDerivationTest.java   | 28 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
index baf1ad66c..026dda626 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java
@@ -123,7 +123,7 @@ public class SchemaDerivation {
         TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
         // Serialize derivation mapping in SchemaDerivation
         Map<TableId, Set<TableId>> derivationMapping = 
schemaDerivation.getDerivationMapping();
-        out.write(derivationMapping.size());
+        out.writeInt(derivationMapping.size());
         for (Map.Entry<TableId, Set<TableId>> entry : 
derivationMapping.entrySet()) {
             // Routed table ID
             TableId routedTableId = entry.getKey();
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
index 670286cb6..19fd5a4a1 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java
@@ -36,11 +36,17 @@ import 
org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static 
org.apache.flink.cdc.common.testutils.assertions.EventAssertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -362,4 +368,26 @@ class SchemaDerivationTest {
                 .isInstanceOf(IllegalStateException.class)
                 .hasMessage("Incompatible types: \"INT\" and \"STRING\"");
     }
+
+    @Test
+    void testSerde() throws Exception {
+        Map<TableId, Set<TableId>> derivationMapping = new HashMap<>();
+        Set<TableId> originalTableIds = new HashSet<>();
+        originalTableIds.add(TABLE_1);
+        originalTableIds.add(TABLE_2);
+        derivationMapping.put(MERGED_TABLE, originalTableIds);
+        SchemaDerivation schemaDerivation =
+                new SchemaDerivation(new SchemaManager(), ROUTES, 
derivationMapping);
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
+            byte[] serialized = baos.toByteArray();
+            try (ByteArrayInputStream bais = new 
ByteArrayInputStream(serialized);
+                    DataInputStream in = new DataInputStream(bais)) {
+                Map<TableId, Set<TableId>> deserialized =
+                        SchemaDerivation.deserializerDerivationMapping(in);
+                assertThat(deserialized).isEqualTo(derivationMapping);
+            }
+        }
+    }
 }

Reply via email to