yuxiqian commented on code in PR #4324: URL: https://github.com/apache/flink-cdc/pull/4324#discussion_r2958555473
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java: ########## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.utils.TestCaseUtils; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.definition.RouteDef; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.definition.SourceDef; +import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.connectors.values.ValuesDatabase; +import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory; +import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.MountableFile; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.sql.Connection; +import java.sql.Statement; +import java.util.Collections; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER; + +/** A more complicated IT case for Evolving MySQL schema with gh-ost/pt-osc utility. */ +class MySqlOscITCase extends MySqlSourceTestBase { + private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; Review Comment: The latest docker image tag of percona-toolkit is 3.7.1. Can we test it against latest version? ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java: ########## @@ -530,8 +591,77 @@ public static boolean isOnLineSchemaChangeEvent(SourceRecord record) { } } + public static Optional<String> parseOnLineSchemaRenameEvent(SourceRecord record) { + if (!isSchemaChangeEvent(record)) { + return Optional.empty(); + } + Struct value = (Struct) record.value(); + ObjectMapper mapper = new ObjectMapper(); + + try { + String ddl = + mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) + .get(HistoryRecord.Fields.DDL_STATEMENTS) + .asText() + .toLowerCase(); + if (ddl.startsWith("rename table") || ddl.startsWith("rename /* gh-ost */ table")) { + LOG.info("Checking if DDL might be an OSC renaming event... {}", ddl); + List<String> tableNames = + Arrays.asList( + value.getStruct(Envelope.FieldName.SOURCE) + .getString(TABLE_NAME_KEY) + .split(",")); + if (tableNames.size() != 2) { + LOG.info( + "Table name {} is malformed, skip it.", + value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY)); + return Optional.empty(); + } + + String renamedFromTableName = + Collections.min(tableNames, Comparator.comparingInt(String::length)); + String renamedToTableName = + Collections.max(tableNames, Comparator.comparingInt(String::length)); + + LOG.info( + "Determined the shorter TableId {} is the renaming source.", + renamedFromTableName); + LOG.info( + "Determined the longer TableId {} is the renaming target.", + renamedToTableName); + + if (OSC_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) { + LOG.info( + "Renamed to TableId name {} matches OSC temporary TableId pattern, yield {}.", + renamedToTableName, + renamedFromTableName); + return Optional.of(renamedFromTableName); + } + + if (RDS_OGT_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) { + LOG.info( + "Renamed to TableId name {} matches RDS temporary TableId pattern, yield {}.", + renamedToTableName, + renamedFromTableName); + return Optional.of(renamedFromTableName); + } + + LOG.info( + "Renamed to TableId {} does not match any RegEx pattern, skip it.", + renamedToTableName); + } + return Optional.empty(); + } catch (JsonProcessingException e) { + return Optional.empty(); Review Comment: Should we add some error logs here? ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java: ########## @@ -530,8 +591,77 @@ public static boolean isOnLineSchemaChangeEvent(SourceRecord record) { } } + public static Optional<String> parseOnLineSchemaRenameEvent(SourceRecord record) { + if (!isSchemaChangeEvent(record)) { + return Optional.empty(); + } + Struct value = (Struct) record.value(); + ObjectMapper mapper = new ObjectMapper(); + + try { + String ddl = + mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) + .get(HistoryRecord.Fields.DDL_STATEMENTS) + .asText() + .toLowerCase(); + if (ddl.startsWith("rename table") || ddl.startsWith("rename /* gh-ost */ table")) { + LOG.info("Checking if DDL might be an OSC renaming event... {}", ddl); + List<String> tableNames = + Arrays.asList( + value.getStruct(Envelope.FieldName.SOURCE) + .getString(TABLE_NAME_KEY) + .split(",")); + if (tableNames.size() != 2) { + LOG.info( + "Table name {} is malformed, skip it.", + value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY)); + return Optional.empty(); + } + + String renamedFromTableName = + Collections.min(tableNames, Comparator.comparingInt(String::length)); + String renamedToTableName = + Collections.max(tableNames, Comparator.comparingInt(String::length)); + + LOG.info( + "Determined the shorter TableId {} is the renaming source.", + renamedFromTableName); + LOG.info( + "Determined the longer TableId {} is the renaming target.", + renamedToTableName); + + if (OSC_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) { + LOG.info( + "Renamed to TableId name {} matches OSC temporary TableId pattern, yield {}.", + renamedToTableName, + renamedFromTableName); + return Optional.of(renamedFromTableName); + } + + if (RDS_OGT_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) { + LOG.info( + "Renamed to TableId name {} matches RDS temporary TableId pattern, yield {}.", + renamedToTableName, + renamedFromTableName); + return Optional.of(renamedFromTableName); + } + + LOG.info( + "Renamed to TableId {} does not match any RegEx pattern, skip it.", + renamedToTableName); + } + return Optional.empty(); + } catch (JsonProcessingException e) { + return Optional.empty(); + } + } + private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$"); + private static final Pattern OSC_TEMP_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(del|old)$"); + private static final Pattern RDS_OGT_TEMP_TABLE_ID_PATTERN = + Pattern.compile("^tp_\\d*_del_(.*)$"); Review Comment: I suggest not to support Alibaba cloud RDS for now, as it could not be tested reliably. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java: ########## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.utils.TestCaseUtils; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.definition.RouteDef; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.definition.SourceDef; +import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.connectors.values.ValuesDatabase; +import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory; +import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.MountableFile; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.sql.Connection; +import java.sql.Statement; +import java.util.Collections; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER; + +/** A more complicated IT case for Evolving MySQL schema with gh-ost/pt-osc utility. */ +class MySqlOscITCase extends MySqlSourceTestBase { + private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + + protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER = + createPerconaToolkitContainer(); + + private final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + private static final String GH_OST_RESOURCE_NAME = + DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64") + ? "ghost-cli/gh-ost-binary-linux-amd64-20231207144046.tar.gz" + : "ghost-cli/gh-ost-binary-linux-arm64-20231207144046.tar.gz"; Review Comment: Shall we download the latest gh-ost cli ad-hoc, instead of committing files into repository? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
