This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 768d528d9 [minor][test] Add Flink CDC 3.1.1 version to migration test version list 768d528d9 is described below commit 768d528d9c96ebdcb8b7385dda34c43a5fb75794 Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Thu Jul 25 16:27:07 2024 +0800 [minor][test] Add Flink CDC 3.1.1 version to migration test version list This closes #3426. --- .../flink-cdc-migration-testcases/pom.xml | 6 ++ .../cdc/migration/tests/MigrationTestBase.java | 4 + .../flink-cdc-release-3.1.1/pom.xml | 87 ++++++++++++++++ .../cdc/migration/tests/MigrationMockBase.java | 27 +++++ .../tests/SchemaManagerMigrationMock.java | 68 ++++++++++++ .../tests/SchemaRegistryMigrationMock.java | 116 +++++++++++++++++++++ .../tests/TableChangeInfoMigrationMock.java | 60 +++++++++++ flink-cdc-migration-tests/pom.xml | 1 + 8 files changed, 369 insertions(+) diff --git a/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml b/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml index 4be8f9c12..12c468a4b 100644 --- a/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml +++ b/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml @@ -52,6 +52,12 @@ limitations under the License. <version>${revision}</version> <scope>compile</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cdc-release-3.1.1</artifactId> + <version>${revision}</version> + <scope>compile</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cdc-release-snapshot</artifactId> diff --git a/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java index 2172f011e..e1020442b 100644 --- a/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java +++ b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java @@ -35,6 +35,7 @@ public class MigrationTestBase { v3_0_0, v3_0_1, v3_1_0, + v3_1_1, SNAPSHOT; public String getShadedClassPrefix() { @@ -45,6 +46,8 @@ public class MigrationTestBase { return "com.ververica.cdc.v3_0_1"; case v3_1_0: return "org.apache.flink.cdc.v3_1_0"; + case v3_1_1: + return "org.apache.flink.cdc.v3_1_1"; case SNAPSHOT: return "org.apache.flink.cdc.snapshot"; default: @@ -58,6 +61,7 @@ public class MigrationTestBase { FlinkCdcVersion.v3_0_0, FlinkCdcVersion.v3_0_1, FlinkCdcVersion.v3_1_0, + FlinkCdcVersion.v3_1_1, FlinkCdcVersion.SNAPSHOT); public static List<FlinkCdcVersion> getAllVersions() { diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/pom.xml b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/pom.xml new file mode 100644 index 000000000..751b0f9d8 --- /dev/null +++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/pom.xml @@ -0,0 +1,87 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cdc-migration-tests</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>flink-cdc-release-3.1.1</artifactId> + <name>flink-cdc-release-3.1.1</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cdc-base</artifactId> + <version>3.1.1</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cdc-common</artifactId> + <version>3.1.1</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cdc-runtime</artifactId> + <version>3.1.1</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.2.4</version> + <executions> + <execution> + <id>shade-flink-cdc</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>org.apache.flink.cdc</pattern> + <shadedPattern>org.apache.flink.cdc.v3_1_1</shadedPattern> + <excludes>META-INF/*.SF,META-INF/*.DSA,META-INF/*.RSA</excludes> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java new file mode 100644 index 000000000..3f52615db --- /dev/null +++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.migration.tests; + +/** Base classes for migration test cases. */ +public interface MigrationMockBase { + int getSerializerVersion(); + + byte[] serializeObject() throws Exception; + + boolean deserializeAndCheckObject(int v, byte[] b) throws Exception; +} diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java new file mode 100644 index 000000000..c4f0788dd --- /dev/null +++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.migration.tests; + +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager; + +import java.util.Collections; +import java.util.SortedMap; +import java.util.TreeMap; + +/** Dummy classes for migration test. Called via reflection. */ +public class SchemaManagerMigrationMock implements MigrationMockBase { + private static final TableId DUMMY_TABLE_ID = + TableId.tableId("dummyNamespace", "dummySchema", "dummyTable"); + private static final Schema DUMMY_SCHEMA = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.DOUBLE()) + .primaryKey("id", "name") + .build(); + + private static final String SCHEMA_MANAGER = + "runtime.operators.schema.coordinator.SchemaManager"; + + public SchemaManager generateDummyObject() { + SortedMap<Integer, Schema> schemaVersions = new TreeMap<>(); + schemaVersions.put(1, DUMMY_SCHEMA); + schemaVersions.put(2, DUMMY_SCHEMA); + schemaVersions.put(3, DUMMY_SCHEMA); + return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions)); + } + + @Override + public int getSerializerVersion() { + return SchemaManager.SERIALIZER.getVersion(); + } + + @Override + public byte[] serializeObject() throws Exception { + return SchemaManager.SERIALIZER.serialize(generateDummyObject()); + } + + @Override + public boolean deserializeAndCheckObject(int version, byte[] serialized) throws Exception { + Object expected = generateDummyObject(); + Object actual = SchemaManager.SERIALIZER.deserialize(version, serialized); + return expected.equals(actual); + } +} diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java new file mode 100644 index 000000000..93269abec --- /dev/null +++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.migration.tests; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaDerivation; +import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager; +import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; + +/** Dummy classes for migration test. Called via reflection. */ +public class SchemaRegistryMigrationMock implements MigrationMockBase { + private static final TableId DUMMY_TABLE_ID = + TableId.tableId("dummyNamespace", "dummySchema", "dummyTable"); + private static final Schema DUMMY_SCHEMA = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.DOUBLE()) + .primaryKey("id", "name") + .build(); + + public SchemaManager generateDummySchemaManager() { + SortedMap<Integer, Schema> schemaVersions = new TreeMap<>(); + schemaVersions.put(1, DUMMY_SCHEMA); + schemaVersions.put(2, DUMMY_SCHEMA); + schemaVersions.put(3, DUMMY_SCHEMA); + return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions)); + } + + public SchemaRegistry generateSchemaRegistry() { + return new SchemaRegistry("Dummy Name", null, e -> {}, new ArrayList<>()); + } + + private SchemaManager getSchemaManager(SchemaRegistry schemaRegistry) throws Exception { + Field field = SchemaRegistry.class.getDeclaredField("schemaManager"); + field.setAccessible(true); + return (SchemaManager) field.get(schemaRegistry); + } + + private void setSchemaManager(SchemaRegistry schemaRegistry, SchemaManager schemaManager) + throws Exception { + Field field = SchemaRegistry.class.getDeclaredField("schemaManager"); + field.setAccessible(true); + field.set(schemaRegistry, schemaManager); + } + + private SchemaDerivation getSchemaDerivation(SchemaRegistry schemaRegistry) throws Exception { + Field field = SchemaRegistry.class.getDeclaredField("schemaDerivation"); + field.setAccessible(true); + return (SchemaDerivation) field.get(schemaRegistry); + } + + private List<Tuple2<Selectors, TableId>> getSchemaRoutes(SchemaRegistry schemaRegistry) + throws Exception { + SchemaDerivation schemaDerivation = getSchemaDerivation(schemaRegistry); + Field field = SchemaDerivation.class.getDeclaredField("routes"); + field.setAccessible(true); + return (List<Tuple2<Selectors, TableId>>) field.get(schemaDerivation); + } + + @Override + public int getSerializerVersion() { + return -1; + } + + @Override + public byte[] serializeObject() throws Exception { + CompletableFuture<byte[]> future = new CompletableFuture<>(); + SchemaRegistry registry = generateSchemaRegistry(); + setSchemaManager(registry, generateDummySchemaManager()); + + registry.checkpointCoordinator(0, future); + + while (!future.isDone()) { + Thread.sleep(1000); + } + return future.get(); + } + + @Override + public boolean deserializeAndCheckObject(int v, byte[] b) throws Exception { + SchemaRegistry expected = generateSchemaRegistry(); + setSchemaManager(expected, generateDummySchemaManager()); + SchemaRegistry actual = generateSchemaRegistry(); + actual.resetToCheckpoint(0, b); + return getSchemaManager(expected).equals(getSchemaManager(actual)) + && getSchemaRoutes(expected).equals(getSchemaRoutes(actual)); + } +} diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java new file mode 100644 index 000000000..6a14a2be2 --- /dev/null +++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.migration.tests; + +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.runtime.operators.transform.TableChangeInfo; + +/** Dummy classes for migration test. Called via reflection. */ +public class TableChangeInfoMigrationMock implements MigrationMockBase { + private static final TableId DUMMY_TABLE_ID = + TableId.tableId("dummyNamespace", "dummySchema", "dummyTable"); + private static final Schema DUMMY_SCHEMA = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.DOUBLE()) + .primaryKey("id", "name") + .build(); + + public TableChangeInfo generateDummyObject() { + return TableChangeInfo.of(DUMMY_TABLE_ID, DUMMY_SCHEMA, DUMMY_SCHEMA); + } + + @Override + public int getSerializerVersion() { + return TableChangeInfo.SERIALIZER.getVersion(); + } + + @Override + public byte[] serializeObject() throws Exception { + return TableChangeInfo.SERIALIZER.serialize(generateDummyObject()); + } + + @Override + public boolean deserializeAndCheckObject(int version, byte[] bytes) throws Exception { + TableChangeInfo expected = generateDummyObject(); + TableChangeInfo actual = TableChangeInfo.SERIALIZER.deserialize(version, bytes); + + return expected.getTableId().equals(actual.getTableId()) + && expected.getOriginalSchema().equals(actual.getOriginalSchema()) + && expected.getTransformedSchema().equals(actual.getTransformedSchema()); + } +} diff --git a/flink-cdc-migration-tests/pom.xml b/flink-cdc-migration-tests/pom.xml index da36ef502..269c5c0db 100644 --- a/flink-cdc-migration-tests/pom.xml +++ b/flink-cdc-migration-tests/pom.xml @@ -33,6 +33,7 @@ limitations under the License. <module>flink-cdc-release-3.0.0</module> <module>flink-cdc-release-3.0.1</module> <module>flink-cdc-release-3.1.0</module> + <module>flink-cdc-release-3.1.1</module> <module>flink-cdc-release-snapshot</module> <module>flink-cdc-migration-testcases</module> </modules>