This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 768d528d9 [minor][test] Add Flink CDC 3.1.1 version to migration test 
version list
768d528d9 is described below

commit 768d528d9c96ebdcb8b7385dda34c43a5fb75794
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Thu Jul 25 16:27:07 2024 +0800

    [minor][test] Add Flink CDC 3.1.1 version to migration test version list
    
    This closes #3426.
---
 .../flink-cdc-migration-testcases/pom.xml          |   6 ++
 .../cdc/migration/tests/MigrationTestBase.java     |   4 +
 .../flink-cdc-release-3.1.1/pom.xml                |  87 ++++++++++++++++
 .../cdc/migration/tests/MigrationMockBase.java     |  27 +++++
 .../tests/SchemaManagerMigrationMock.java          |  68 ++++++++++++
 .../tests/SchemaRegistryMigrationMock.java         | 116 +++++++++++++++++++++
 .../tests/TableChangeInfoMigrationMock.java        |  60 +++++++++++
 flink-cdc-migration-tests/pom.xml                  |   1 +
 8 files changed, 369 insertions(+)

diff --git a/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml 
b/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml
index 4be8f9c12..12c468a4b 100644
--- a/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml
+++ b/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml
@@ -52,6 +52,12 @@ limitations under the License.
             <version>${revision}</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-cdc-release-3.1.1</artifactId>
+            <version>${revision}</version>
+            <scope>compile</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-cdc-release-snapshot</artifactId>
diff --git 
a/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java
 
b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java
index 2172f011e..e1020442b 100644
--- 
a/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java
+++ 
b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java
@@ -35,6 +35,7 @@ public class MigrationTestBase {
         v3_0_0,
         v3_0_1,
         v3_1_0,
+        v3_1_1,
         SNAPSHOT;
 
         public String getShadedClassPrefix() {
@@ -45,6 +46,8 @@ public class MigrationTestBase {
                     return "com.ververica.cdc.v3_0_1";
                 case v3_1_0:
                     return "org.apache.flink.cdc.v3_1_0";
+                case v3_1_1:
+                    return "org.apache.flink.cdc.v3_1_1";
                 case SNAPSHOT:
                     return "org.apache.flink.cdc.snapshot";
                 default:
@@ -58,6 +61,7 @@ public class MigrationTestBase {
                     FlinkCdcVersion.v3_0_0,
                     FlinkCdcVersion.v3_0_1,
                     FlinkCdcVersion.v3_1_0,
+                    FlinkCdcVersion.v3_1_1,
                     FlinkCdcVersion.SNAPSHOT);
 
     public static List<FlinkCdcVersion> getAllVersions() {
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/pom.xml 
b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/pom.xml
new file mode 100644
index 000000000..751b0f9d8
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-cdc-migration-tests</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>flink-cdc-release-3.1.1</artifactId>
+    <name>flink-cdc-release-3.1.1</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-cdc-base</artifactId>
+            <version>3.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-cdc-common</artifactId>
+            <version>3.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-cdc-runtime</artifactId>
+            <version>3.1.1</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.2.4</version>
+                <executions>
+                    <execution>
+                        <id>shade-flink-cdc</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.flink.cdc</pattern>
+                                    
<shadedPattern>org.apache.flink.cdc.v3_1_1</shadedPattern>
+                                    
<excludes>META-INF/*.SF,META-INF/*.DSA,META-INF/*.RSA</excludes>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git 
a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java
 
b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java
new file mode 100644
index 000000000..3f52615db
--- /dev/null
+++ 
b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+/** Base classes for migration test cases. */
+public interface MigrationMockBase {
+    int getSerializerVersion();
+
+    byte[] serializeObject() throws Exception;
+
+    boolean deserializeAndCheckObject(int v, byte[] b) throws Exception;
+}
diff --git 
a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java
 
b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java
new file mode 100644
index 000000000..c4f0788dd
--- /dev/null
+++ 
b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
+
+import java.util.Collections;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class SchemaManagerMigrationMock implements MigrationMockBase {
+    private static final TableId DUMMY_TABLE_ID =
+            TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+    private static final Schema DUMMY_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("id", DataTypes.INT())
+                    .physicalColumn("name", DataTypes.STRING())
+                    .physicalColumn("age", DataTypes.DOUBLE())
+                    .primaryKey("id", "name")
+                    .build();
+
+    private static final String SCHEMA_MANAGER =
+            "runtime.operators.schema.coordinator.SchemaManager";
+
+    public SchemaManager generateDummyObject() {
+        SortedMap<Integer, Schema> schemaVersions = new TreeMap<>();
+        schemaVersions.put(1, DUMMY_SCHEMA);
+        schemaVersions.put(2, DUMMY_SCHEMA);
+        schemaVersions.put(3, DUMMY_SCHEMA);
+        return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, 
schemaVersions));
+    }
+
+    @Override
+    public int getSerializerVersion() {
+        return SchemaManager.SERIALIZER.getVersion();
+    }
+
+    @Override
+    public byte[] serializeObject() throws Exception {
+        return SchemaManager.SERIALIZER.serialize(generateDummyObject());
+    }
+
+    @Override
+    public boolean deserializeAndCheckObject(int version, byte[] serialized) 
throws Exception {
+        Object expected = generateDummyObject();
+        Object actual = SchemaManager.SERIALIZER.deserialize(version, 
serialized);
+        return expected.equals(actual);
+    }
+}
diff --git 
a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
 
b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
new file mode 100644
index 000000000..93269abec
--- /dev/null
+++ 
b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.schema.Selectors;
+import org.apache.flink.cdc.common.types.DataTypes;
+import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaDerivation;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
+import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class SchemaRegistryMigrationMock implements MigrationMockBase {
+    private static final TableId DUMMY_TABLE_ID =
+            TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+    private static final Schema DUMMY_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("id", DataTypes.INT())
+                    .physicalColumn("name", DataTypes.STRING())
+                    .physicalColumn("age", DataTypes.DOUBLE())
+                    .primaryKey("id", "name")
+                    .build();
+
+    public SchemaManager generateDummySchemaManager() {
+        SortedMap<Integer, Schema> schemaVersions = new TreeMap<>();
+        schemaVersions.put(1, DUMMY_SCHEMA);
+        schemaVersions.put(2, DUMMY_SCHEMA);
+        schemaVersions.put(3, DUMMY_SCHEMA);
+        return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, 
schemaVersions));
+    }
+
+    public SchemaRegistry generateSchemaRegistry() {
+        return new SchemaRegistry("Dummy Name", null, e -> {}, new 
ArrayList<>());
+    }
+
+    private SchemaManager getSchemaManager(SchemaRegistry schemaRegistry) 
throws Exception {
+        Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
+        field.setAccessible(true);
+        return (SchemaManager) field.get(schemaRegistry);
+    }
+
+    private void setSchemaManager(SchemaRegistry schemaRegistry, SchemaManager 
schemaManager)
+            throws Exception {
+        Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
+        field.setAccessible(true);
+        field.set(schemaRegistry, schemaManager);
+    }
+
+    private SchemaDerivation getSchemaDerivation(SchemaRegistry 
schemaRegistry) throws Exception {
+        Field field = 
SchemaRegistry.class.getDeclaredField("schemaDerivation");
+        field.setAccessible(true);
+        return (SchemaDerivation) field.get(schemaRegistry);
+    }
+
+    private List<Tuple2<Selectors, TableId>> getSchemaRoutes(SchemaRegistry 
schemaRegistry)
+            throws Exception {
+        SchemaDerivation schemaDerivation = 
getSchemaDerivation(schemaRegistry);
+        Field field = SchemaDerivation.class.getDeclaredField("routes");
+        field.setAccessible(true);
+        return (List<Tuple2<Selectors, TableId>>) field.get(schemaDerivation);
+    }
+
+    @Override
+    public int getSerializerVersion() {
+        return -1;
+    }
+
+    @Override
+    public byte[] serializeObject() throws Exception {
+        CompletableFuture<byte[]> future = new CompletableFuture<>();
+        SchemaRegistry registry = generateSchemaRegistry();
+        setSchemaManager(registry, generateDummySchemaManager());
+
+        registry.checkpointCoordinator(0, future);
+
+        while (!future.isDone()) {
+            Thread.sleep(1000);
+        }
+        return future.get();
+    }
+
+    @Override
+    public boolean deserializeAndCheckObject(int v, byte[] b) throws Exception 
{
+        SchemaRegistry expected = generateSchemaRegistry();
+        setSchemaManager(expected, generateDummySchemaManager());
+        SchemaRegistry actual = generateSchemaRegistry();
+        actual.resetToCheckpoint(0, b);
+        return getSchemaManager(expected).equals(getSchemaManager(actual))
+                && getSchemaRoutes(expected).equals(getSchemaRoutes(actual));
+    }
+}
diff --git 
a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java
 
b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java
new file mode 100644
index 000000000..6a14a2be2
--- /dev/null
+++ 
b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.operators.transform.TableChangeInfo;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class TableChangeInfoMigrationMock implements MigrationMockBase {
+    private static final TableId DUMMY_TABLE_ID =
+            TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+    private static final Schema DUMMY_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("id", DataTypes.INT())
+                    .physicalColumn("name", DataTypes.STRING())
+                    .physicalColumn("age", DataTypes.DOUBLE())
+                    .primaryKey("id", "name")
+                    .build();
+
+    public TableChangeInfo generateDummyObject() {
+        return TableChangeInfo.of(DUMMY_TABLE_ID, DUMMY_SCHEMA, DUMMY_SCHEMA);
+    }
+
+    @Override
+    public int getSerializerVersion() {
+        return TableChangeInfo.SERIALIZER.getVersion();
+    }
+
+    @Override
+    public byte[] serializeObject() throws Exception {
+        return TableChangeInfo.SERIALIZER.serialize(generateDummyObject());
+    }
+
+    @Override
+    public boolean deserializeAndCheckObject(int version, byte[] bytes) throws 
Exception {
+        TableChangeInfo expected = generateDummyObject();
+        TableChangeInfo actual = 
TableChangeInfo.SERIALIZER.deserialize(version, bytes);
+
+        return expected.getTableId().equals(actual.getTableId())
+                && 
expected.getOriginalSchema().equals(actual.getOriginalSchema())
+                && 
expected.getTransformedSchema().equals(actual.getTransformedSchema());
+    }
+}
diff --git a/flink-cdc-migration-tests/pom.xml 
b/flink-cdc-migration-tests/pom.xml
index da36ef502..269c5c0db 100644
--- a/flink-cdc-migration-tests/pom.xml
+++ b/flink-cdc-migration-tests/pom.xml
@@ -33,6 +33,7 @@ limitations under the License.
         <module>flink-cdc-release-3.0.0</module>
         <module>flink-cdc-release-3.0.1</module>
         <module>flink-cdc-release-3.1.0</module>
+        <module>flink-cdc-release-3.1.1</module>
         <module>flink-cdc-release-snapshot</module>
         <module>flink-cdc-migration-testcases</module>
     </modules>

Reply via email to