This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new c42e40d73 [FLINK-38139] Fix consecutive online schema change causes
job failure (#4324)
c42e40d73 is described below
commit c42e40d736c1a4e95974e26ebdbaeb5a2d26f8f1
Author: Junbo Wang <[email protected]>
AuthorDate: Fri Mar 20 18:11:17 2026 +0800
[FLINK-38139] Fix consecutive online schema change causes job failure
(#4324)
---
.../source/MySqlOnLineSchemaMigrationITCase.java | 2 +-
.../connectors/mysql/source/MySqlOscITCase.java | 441 +++++++++++++++++++++
.../src/test/resources/ddl/customer.sql | 31 ++
.../mysql/debezium/reader/BinlogSplitReader.java | 38 +-
.../source/utils/OnlineSchemaChangeUtils.java | 205 ++++++++++
.../connectors/mysql/source/utils/RecordUtils.java | 76 ----
.../MySqlOnLineSchemaMigrationSourceITCase.java | 8 +-
.../MySqlOnLineSchemaMigrationTableITCase.java | 8 +-
8 files changed, 722 insertions(+), 87 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java
index 39ce4c7c0..9b1cd77cc 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java
@@ -82,7 +82,7 @@ class MySqlOnLineSchemaMigrationITCase extends
MySqlSourceTestBase {
private static final MySqlContainer MYSQL8_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0,
"docker/server-gtids/expire-seconds/my.cnf");
- private static final String PERCONA_TOOLKIT =
"perconalab/percona-toolkit:3.5.7";
+ private static final String PERCONA_TOOLKIT =
"perconalab/percona-toolkit:3.7.1";
protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
createPerconaToolkitContainer();
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java
new file mode 100644
index 000000000..2907a0405
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.utils.TestCaseUtils;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.definition.RouteDef;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.definition.SourceDef;
+import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
+import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.connectors.values.ValuesDatabase;
+import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
+import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
+import static
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
+
+/** A more complicated IT case for Evolving MySQL schema with gh-ost/pt-osc
utility. */
+class MySqlOscITCase extends MySqlSourceTestBase {
+ private static final MySqlContainer MYSQL8_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0);
+
+ private static final String PERCONA_TOOLKIT =
"perconalab/percona-toolkit:3.7.1";
+
+ protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
+ createPerconaToolkitContainer();
+
+ private final UniqueDatabase customerDatabase =
+ new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER,
TEST_PASSWORD);
+
+ private final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ private static final String GH_OST_DOWNLOAD_LINK =
+
DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64")
+ ?
"https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz"
+ :
"https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz";
+
+ private final PrintStream standardOut = System.out;
+ private final ByteArrayOutputStream outCaptor = new
ByteArrayOutputStream();
+
+ @BeforeEach
+ void takeoverOutput() {
+ System.setOut(new PrintStream(outCaptor));
+ }
+
+ @AfterEach
+ protected void handInStdOut() {
+ System.setOut(standardOut);
+ outCaptor.reset();
+ }
+
+ @BeforeAll
+ static void beforeClass() {
+ LOG.info("Starting MySql8 containers...");
+ Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
+ Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join();
+ LOG.info("Container MySql8 is started.");
+ }
+
+ @AfterAll
+ static void afterClass() {
+ LOG.info("Stopping MySql8 containers...");
+ MYSQL8_CONTAINER.stop();
+ PERCONA_TOOLKIT_CONTAINER.stop();
+ LOG.info("Container MySql8 is stopped.");
+ }
+
+ @BeforeEach
+ void before() {
+ customerDatabase.createAndInitialize();
+ TestValuesTableFactory.clearAllData();
+ ValuesDatabase.clear();
+ env.setParallelism(4);
+ env.enableCheckpointing(200);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ }
+
+ @AfterEach
+ void after() {
+ customerDatabase.dropDatabase();
+ }
+
+ private static void installGhOstCli(Container<?> container) {
+ try {
+ execInContainer(
+ container,
+ "download gh-ost tarball",
+ "curl",
+ "-L",
+ "-o",
+ "/tmp/gh-ost.tar.gz",
+ GH_OST_DOWNLOAD_LINK);
+ execInContainer(
+ container, "unzip binary", "tar", "-xzvf",
"/tmp/gh-ost.tar.gz", "-C", "/bin");
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static GenericContainer<?> createPerconaToolkitContainer() {
+ GenericContainer<?> perconaToolkit =
+ new GenericContainer<>(PERCONA_TOOLKIT)
+ // keep container alive
+ .withCommand("tail", "-f", "/dev/null")
+ .withNetwork(NETWORK)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ return perconaToolkit;
+ }
+
+ private void insertRecordsPhase1(UniqueDatabase database, int startIndex,
int count)
+ throws Exception {
+ try (Connection connection = database.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ for (int i = startIndex; i < startIndex + count; i++) {
+ statement.execute(
+ String.format(
+ "insert into customers (id, name, address,
phone_number) values (%s, '%s', '%s', '%s');",
+ i, "flink_" + i, "Address Line #" + i,
1000000000L + i));
+ }
+ }
+ }
+
+ private void insertRecordsPhase2(UniqueDatabase database, int startIndex,
int count)
+ throws Exception {
+ try (Connection connection = database.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ for (int i = startIndex; i < startIndex + count; i++) {
+ statement.execute(
+ String.format(
+ "insert into customers (id, name, address,
phone_number, ext) values (%s, '%s', '%s', '%s', %s);",
+ i, "flink_" + i, "Address Line #" + i,
1000000000L + i, i));
+ }
+ }
+ }
+
+ private void insertRecordsPhase1ForAutoId(UniqueDatabase database, int
count) throws Exception {
+ try (Connection connection = database.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ for (int i = 0; i < count; i++) {
+ statement.execute(
+ String.format(
+ "insert into customers_auto_id (name, address,
phone_number) values ('%s', '%s', '%s');",
+ "flink_" + i, "Address Line #" + i,
1000000000L + i));
+ }
+ }
+ }
+
+ private void insertRecordsPhase2ForAutoId(UniqueDatabase database, int
count) throws Exception {
+ try (Connection connection = database.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ for (int i = 0; i < count; i++) {
+ statement.execute(
+ String.format(
+ "insert into customers_auto_id (name, address,
phone_number, ext) values ('%s', '%s', '%s', %s);",
+ "flink_ext_" + i, "Address Line Ext #" + i,
1000000000L + i, i));
+ }
+ }
+ }
+
+ @Test
+ void testGhOstSchemaMigration() throws Exception {
+ String databaseName = customerDatabase.getDatabaseName();
+
+ LOG.info("Step 1: Install gh-ost command line utility");
+ installGhOstCli(MYSQL8_CONTAINER);
+
+ Thread yamlJob = runJob(databaseName, "customers");
+ yamlJob.start();
+
+ LOG.info("Step 2: Start pipeline job");
+ insertRecordsPhase1(customerDatabase, 5000, 1000);
+
+ LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN");
+
+ Thread thread =
+ new Thread(
+ () -> {
+ try {
+ execInContainer(
+ MYSQL8_CONTAINER,
+ "evolve schema",
+ "gh-ost",
+ "--user=" + TEST_USER,
+ "--password=" + TEST_PASSWORD,
+ "--database=" + databaseName,
+ "--table=customers",
+ "--alter=add column ext int first",
+ "--allow-on-master", // because we
don't have a replica
+ "--initially-drop-old-table", // drop
previously generated
+ // temporary tables
+ "--execute");
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ thread.start();
+ insertRecordsPhase1(customerDatabase, 7000, 3000);
+
+ thread.join();
+ insertRecordsPhase2(customerDatabase, 12000, 1000);
+
+ try {
+ TestCaseUtils.repeatedCheck(
+ () ->
outCaptor.toString().split(System.lineSeparator()).length == 5023);
+ } catch (Exception e) {
+ LOG.error("Failed to verify results. Captured stdout: {}",
outCaptor.toString(), e);
+ } finally {
+ yamlJob.interrupt();
+ }
+ }
+
+ @Test
+ void testPtOscSchemaMigration() throws Exception {
+ String databaseName = customerDatabase.getDatabaseName();
+
+ LOG.info("Step 1: Install gh-ost command line utility");
+ installGhOstCli(MYSQL8_CONTAINER);
+
+ Thread yamlJob = runJob(databaseName, "customers");
+ yamlJob.start();
+
+ LOG.info("Step 2: Start pipeline job");
+ insertRecordsPhase1(customerDatabase, 5000, 1000);
+
+ LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN");
+
+ Thread thread =
+ new Thread(
+ () -> {
+ try {
+ execInContainer(
+ PERCONA_TOOLKIT_CONTAINER,
+ "evolve schema",
+ "pt-online-schema-change",
+ "--user=" + TEST_USER,
+ "--host=" +
INTER_CONTAINER_MYSQL_ALIAS,
+ "--password=" + TEST_PASSWORD,
+ "P=3306,t=customers,D=" + databaseName,
+ "--alter",
+ "add column ext int first",
+ "--charset=utf8",
+ "--recursion-method=NONE", // Do not
look for slave nodes
+ "--print",
+ "--execute");
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ LOG.info("Insertion Phase 1 finishes");
+ thread.start();
+ insertRecordsPhase1(customerDatabase, 7000, 3000);
+ LOG.info("Insertion Phase 2 finishes");
+
+ thread.join();
+ insertRecordsPhase2(customerDatabase, 12000, 1000);
+ LOG.info("Insertion Phase 3 finishes");
+
+ try {
+ TestCaseUtils.repeatedCheck(
+ () ->
outCaptor.toString().split(System.lineSeparator()).length == 5023);
+ } catch (Exception e) {
+ LOG.error("Failed to verify results. Captured stdout: {}",
outCaptor.toString(), e);
+ } finally {
+ yamlJob.interrupt();
+ }
+ }
+
+ @Test
+ void testPtOscSchemaMigrationWithAutoIncrementId() throws Exception {
+ String databaseName = customerDatabase.getDatabaseName();
+
+ LOG.info("Step 1: Start pipeline job for auto increment id table");
+
+ Thread yamlJob = runJob(databaseName, "customers_auto_id");
+ yamlJob.start();
+
+ LOG.info("Step 2: Insert initial records (Phase 1)");
+ insertRecordsPhase1ForAutoId(customerDatabase, 1000);
+
+ LOG.info("Step 3: Evolve schema with pt-osc - ADD COLUMN");
+
+ Thread thread =
+ new Thread(
+ () -> {
+ try {
+ execInContainer(
+ PERCONA_TOOLKIT_CONTAINER,
+ "evolve schema",
+ "pt-online-schema-change",
+ "--user=" + TEST_USER,
+ "--host=" +
INTER_CONTAINER_MYSQL_ALIAS,
+ "--password=" + TEST_PASSWORD,
+ "P=3306,t=customers_auto_id,D=" +
databaseName,
+ "--alter",
+ "add column ext int first",
+ "--charset=utf8",
+ "--recursion-method=NONE", // Do not
look for slave nodes
+ "--print",
+ "--execute");
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ LOG.info("Insertion Phase 1 finishes");
+ thread.start();
+ insertRecordsPhase1ForAutoId(customerDatabase, 3000);
+ LOG.info("Insertion Phase 2 finishes");
+
+ thread.join();
+ insertRecordsPhase2ForAutoId(customerDatabase, 1000);
+ LOG.info("Insertion Phase 3 finishes");
+
+ // Initial 21 records + Phase1 1000 + Phase2 3000 + Phase3 1000 = 5021
+ try {
+ TestCaseUtils.repeatedCheck(
+ () ->
outCaptor.toString().split(System.lineSeparator()).length == 5021);
+ } catch (Exception e) {
+ LOG.error("Failed to verify results. Captured stdout: {}",
outCaptor.toString(), e);
+ } finally {
+ yamlJob.interrupt();
+ }
+ }
+
+ private static void execInContainer(Container<?> container, String prompt,
String... commands)
+ throws IOException, InterruptedException {
+ {
+ LOG.info(
+ "Starting to {} with the following command: `{}`",
+ prompt,
+ String.join(" ", commands));
+ Container.ExecResult execResult =
container.execInContainer(commands);
+ if (execResult.getExitCode() == 0) {
+ LOG.info("Successfully {}. Stdout: {}", prompt,
execResult.getStdout());
+ } else {
+ LOG.error(
+ "Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}",
+ prompt,
+ execResult.getExitCode(),
+ execResult.getStdout(),
+ execResult.getStderr());
+ throw new IOException("Failed to execute commands: " +
String.join(" ", commands));
+ }
+ }
+ }
+
+ private Thread runJob(String databaseName, String tableName) {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup MySQL source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(MySqlDataSourceOptions.HOSTNAME,
MYSQL8_CONTAINER.getHost());
+ sourceConfig.set(MySqlDataSourceOptions.PORT,
MYSQL8_CONTAINER.getDatabasePort());
+ sourceConfig.set(MySqlDataSourceOptions.USERNAME, TEST_USER);
+ sourceConfig.set(MySqlDataSourceOptions.PASSWORD, TEST_PASSWORD);
+ sourceConfig.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC");
+ sourceConfig.set(MySqlDataSourceOptions.TABLES, databaseName + "." +
tableName);
+ sourceConfig.set(MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES,
true);
+
+ SourceDef sourceDef =
+ new SourceDef(MySqlDataSourceFactory.IDENTIFIER, "MySQL
Source", sourceConfig);
+
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.singletonList(
+ new RouteDef(
+ databaseName + "." + tableName,
+ "sink_db.sink_tbl",
+ null,
+ null)),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ PipelineExecution execution = composer.compose(pipelineDef);
+ return new Thread(
+ () -> {
+ try {
+ execution.execute();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql
index e4df63f1a..b840f4358 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql
@@ -326,3 +326,34 @@ CREATE TABLE default_value_test (
INSERT INTO default_value_test
VALUES (1,'user1','Shanghai',123567),
(2,'user2','Shanghai',123567);
+
+-- table has auto increment primary key for pt-osc testing
+CREATE TABLE customers_auto_id (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ address VARCHAR(1024),
+ phone_number VARCHAR(512)
+);
+
+INSERT INTO customers_auto_id
+VALUES (default, 'user_1', 'Shanghai', '123567891234'),
+ (default, 'user_2', 'Shanghai', '123567891234'),
+ (default, 'user_3', 'Shanghai', '123567891234'),
+ (default, 'user_4', 'Shanghai', '123567891234'),
+ (default, 'user_5', 'Shanghai', '123567891234'),
+ (default, 'user_6', 'Shanghai', '123567891234'),
+ (default, 'user_7', 'Shanghai', '123567891234'),
+ (default, 'user_8', 'Shanghai', '123567891234'),
+ (default, 'user_9', 'Shanghai', '123567891234'),
+ (default, 'user_10', 'Shanghai', '123567891234'),
+ (default, 'user_11', 'Shanghai', '123567891234'),
+ (default, 'user_12', 'Shanghai', '123567891234'),
+ (default, 'user_13', 'Shanghai', '123567891234'),
+ (default, 'user_14', 'Shanghai', '123567891234'),
+ (default, 'user_15', 'Shanghai', '123567891234'),
+ (default, 'user_16', 'Shanghai', '123567891234'),
+ (default, 'user_17', 'Shanghai', '123567891234'),
+ (default, 'user_18', 'Shanghai', '123567891234'),
+ (default, 'user_19', 'Shanghai', '123567891234'),
+ (default, 'user_20', 'Shanghai', '123567891234'),
+ (default, 'user_21', 'Shanghai', '123567891234');
\ No newline at end of file
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index 77682534f..b1e6d1dfc 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -27,6 +27,7 @@ import
org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
+import
org.apache.flink.cdc.connectors.mysql.source.utils.OnlineSchemaChangeUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.SplitKeyUtils;
import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
@@ -92,6 +93,7 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
new StoppableChangeEventSourceContext();
private final boolean isParsingOnLineSchemaChanges;
private final boolean isBackfillSkipped;
+ private final Map<String, List<SourceRecord>> pendingSchemaChangeEvents;
private static final long READER_CLOSE_TIMEOUT = 30L;
@@ -114,6 +116,7 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
this.isParsingOnLineSchemaChanges =
statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges();
this.isBackfillSkipped =
statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill();
+ this.pendingSchemaChangeEvents = new HashMap<>();
}
public void submitSplit(MySqlSplit mySqlSplit) {
@@ -181,8 +184,35 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
Optional<SourceRecord> oscRecord =
parseOnLineSchemaChangeEvent(event.getRecord());
if (oscRecord.isPresent()) {
- sourceRecords.add(oscRecord.get());
- continue;
+ TableId tableId =
RecordUtils.getTableId(oscRecord.get());
+ if (tableId != null) {
+ LOG.info(
+ "Received the start event of online schema
change: {}. Save it for later.",
+ oscRecord.get());
+ pendingSchemaChangeEvents
+ .computeIfAbsent(tableId.toString(), k ->
new ArrayList<>())
+ .add(oscRecord.get());
+ continue;
+ }
+ }
+
+ Optional<String> finishedTables =
+
OnlineSchemaChangeUtils.parseOnLineSchemaRenameEvent(event.getRecord());
+ if (finishedTables.isPresent()) {
+ TableId tableId =
RecordUtils.getTableId(event.getRecord());
+ String finishedTableId = tableId.catalog() + "." +
finishedTables.get();
+ LOG.info(
+ "Received the ending event of table {}. Emit
corresponding DDL event now.",
+ finishedTableId);
+
+ if
(pendingSchemaChangeEvents.containsKey(finishedTableId)) {
+
sourceRecords.addAll(pendingSchemaChangeEvents.remove(finishedTableId));
+ } else {
+ LOG.error(
+ "Error: met an unexpected osc finish
event. Current pending events: {}, Record: {}",
+ pendingSchemaChangeEvents,
+ event);
+ }
}
}
if (shouldEmit(event.getRecord())) {
@@ -228,11 +258,11 @@ public class BinlogSplitReader implements
DebeziumReader<SourceRecords, MySqlSpl
}
private Optional<SourceRecord> parseOnLineSchemaChangeEvent(SourceRecord
sourceRecord) {
- if (RecordUtils.isOnLineSchemaChangeEvent(sourceRecord)) {
+ if (OnlineSchemaChangeUtils.isOnLineSchemaChangeEvent(sourceRecord)) {
// This is a gh-ost initialized schema change event and should be
emitted if the
// peeled tableId matches the predicate.
TableId originalTableId = RecordUtils.getTableId(sourceRecord);
- TableId peeledTableId = RecordUtils.peelTableId(originalTableId);
+ TableId peeledTableId =
OnlineSchemaChangeUtils.peelTableId(originalTableId);
if (capturedTableFilter.test(peeledTableId)) {
return Optional.of(
RecordUtils.setTableId(sourceRecord, originalTableId,
peeledTableId));
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/OnlineSchemaChangeUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/OnlineSchemaChangeUtils.java
new file mode 100644
index 000000000..aef47a4d3
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/OnlineSchemaChangeUtils.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.utils;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.debezium.data.Envelope;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.HistoryRecord;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
+import static
org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.HISTORY_RECORD_FIELD;
+
+/** Utility class for handling gh-ost/pt-osc online schema change events. */
+public class OnlineSchemaChangeUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OnlineSchemaChangeUtils.class);
+
+ private OnlineSchemaChangeUtils() {}
+
+ /**
+ * Pattern matching gh-ost shadow table ({@code _<name>_gho}) and pt-osc
new table ({@code
+ * _<name>_new}), which carry the actual ALTER DDL during an online schema
change.
+ */
+ private static final Pattern OSC_TABLE_ID_PATTERN =
Pattern.compile("^_(.*)_(gho|new)$");
+
+ /**
+ * Pattern matching gh-ost delete table ({@code _<name>_del}) and pt-osc
old table ({@code
+ * _<name>_old}), which are the temporary backup tables created during an
online schema change.
+ */
+ private static final Pattern OSC_TEMP_TABLE_ID_PATTERN =
Pattern.compile("^_(.*)_(del|old)$");
+
+ /**
+ * Checks whether the given source record is a gh-ost/pt-osc initiated
schema change event by
+ * inspecting the ALTER DDL statement targeting a shadow/new table.
+ *
+ * <p>There will be these schema change events generated in total during
one transaction.
+ *
+ * <p>gh-ost:
+ *
+ * <pre>
+ * DROP TABLE IF EXISTS `db`.`_tb1_gho`
+ * DROP TABLE IF EXISTS `db`.`_tb1_del`
+ * DROP TABLE IF EXISTS `db`.`_tb1_ghc`
+ * create /* gh-ost */ table `db`.`_tb1_ghc` ...
+ * create /* gh-ost */ table `db`.`_tb1_gho` like `db`.`tb1`
+ * alter /* gh-ost */ table `db`.`_tb1_gho` add column c varchar(255)
+ * alter /* gh-ost */ table `db`.`_tb1_gho` AUTO_INCREMENT=N (only
present when the table has an AUTO_INCREMENT column)
+ * create /* gh-ost */ table `db`.`_tb1_del` ...
+ * DROP TABLE IF EXISTS `db`.`_tb1_del`
+ * rename /* gh-ost */ table `db`.`tb1` to `db`.`_tb1_del`
+ * rename /* gh-ost */ table `db`.`_tb1_gho` to `db`.`tb1`
+ * DROP TABLE IF EXISTS `db`.`_tb1_ghc`
+ * DROP TABLE IF EXISTS `db`.`_tb1_del`
+ * </pre>
+ *
+ * <p>pt-osc:
+ *
+ * <pre>
+ * CREATE TABLE `db`.`_test_tb1_new`
+ * ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50)
+ * CREATE TRIGGER `pt_osc_db_test_tb1_del`...
+ * CREATE TRIGGER `pt_osc_db_test_tb1_upd`...
+ * CREATE TRIGGER `pt_osc_db_test_tb1_ins`...
+ * ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change */
+ * RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`,
`db`.`_test_tb1_new` TO `db`.`test_tb1`
+ * DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server */
+ * DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del`
+ * DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd`
+ * DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins`
+ * </pre>
+ *
+ * <p>Among all these, only the ALTER statement targeting the {@code
_gho}/{@code _new} table is
+ * stored temporarily, and emitted when the subsequent RENAME TABLE event
arrives.
+ */
+ public static boolean isOnLineSchemaChangeEvent(SourceRecord record) {
+ if (!RecordUtils.isSchemaChangeEvent(record)) {
+ return false;
+ }
+ Struct value = (Struct) record.value();
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ String ddl =
+ mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
+ .get(HistoryRecord.Fields.DDL_STATEMENTS)
+ .asText()
+ .toLowerCase();
+ if (ddl.startsWith("alter")) {
+ String tableName =
+
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY);
+ return OSC_TABLE_ID_PATTERN.matcher(tableName).matches();
+ }
+ return false;
+ } catch (JsonProcessingException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Parses a gh-ost/pt-osc RENAME TABLE event and returns the original
(user-facing) table name
+ * if the event represents the completion of an online schema change.
+ *
+ * @return the original table name if the record is an OSC completion
rename, or {@link
+ * Optional#empty()} otherwise.
+ */
+ public static Optional<String> parseOnLineSchemaRenameEvent(SourceRecord
record) {
+ if (!RecordUtils.isSchemaChangeEvent(record)) {
+ return Optional.empty();
+ }
+ Struct value = (Struct) record.value();
+ ObjectMapper mapper = new ObjectMapper();
+
+ try {
+ String ddl =
+ mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
+ .get(HistoryRecord.Fields.DDL_STATEMENTS)
+ .asText()
+ .toLowerCase();
+ if (ddl.startsWith("rename table") || ddl.startsWith("rename /*
gh-ost */ table")) {
+ LOG.info("Checking if DDL might be an OSC renaming event...
{}", ddl);
+ List<String> tableNames =
+ Arrays.asList(
+ value.getStruct(Envelope.FieldName.SOURCE)
+ .getString(TABLE_NAME_KEY)
+ .split(","));
+ if (tableNames.size() != 2) {
+ LOG.info(
+ "Table name {} is malformed, skip it.",
+
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY));
+ return Optional.empty();
+ }
+
+ String renamedFromTableName =
+ Collections.min(tableNames,
Comparator.comparingInt(String::length));
+ String renamedToTableName =
+ Collections.max(tableNames,
Comparator.comparingInt(String::length));
+
+ LOG.info(
+ "Determined the shorter TableId {} is the renaming
source.",
+ renamedFromTableName);
+ LOG.info(
+ "Determined the longer TableId {} is the renaming
target.",
+ renamedToTableName);
+
+ if
(OSC_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) {
+ LOG.info(
+ "Renamed to TableId name {} matches OSC temporary
TableId pattern, yield {}.",
+ renamedToTableName,
+ renamedFromTableName);
+ return Optional.of(renamedFromTableName);
+ }
+
+ LOG.info(
+ "Renamed to TableId {} does not match any RegEx
pattern, skip it.",
+ renamedToTableName);
+ }
+ return Optional.empty();
+ } catch (JsonProcessingException e) {
+ LOG.warn("Failed to parse schema change event {}", value, e);
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Peels out a gh-ost/pt-osc mangled {@link TableId} back to the original
user-facing one.
+ *
+ * <p>For example, {@code _customers_gho} → {@code customers}, {@code
_orders_new} → {@code
+ * orders}.
+ */
+ public static TableId peelTableId(TableId tableId) {
+ Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table());
+ if (matchingResult.matches()) {
+ return new TableId(tableId.catalog(), tableId.schema(),
matchingResult.group(1));
+ }
+ return tableId;
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
index bf4d4f29f..1f9a041d7 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
@@ -25,9 +25,6 @@ import
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitI
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.table.types.logical.RowType;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
import io.debezium.data.Envelope;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
@@ -51,8 +48,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
@@ -469,75 +464,4 @@ public class RecordUtils {
}
return Optional.empty();
}
-
- /**
- * This utility method checks if given source record is a gh-ost/pt-osc
initiated schema change
- * event by checking the "alter" ddl.
- */
- public static boolean isOnLineSchemaChangeEvent(SourceRecord record) {
- if (!isSchemaChangeEvent(record)) {
- return false;
- }
- Struct value = (Struct) record.value();
- ObjectMapper mapper = new ObjectMapper();
- try {
- // There will be these schema change events generated in total
during one transaction.
- //
- // gh-ost:
- // DROP TABLE IF EXISTS `db`.`_tb1_gho`
- // DROP TABLE IF EXISTS `db`.`_tb1_del`
- // DROP TABLE IF EXISTS `db`.`_tb1_ghc`
- // create /* gh-ost */ table `db`.`_tb1_ghc` ...
- // create /* gh-ost */ table `db`.`_tb1_gho` like `db`.`tb1`
- // alter /* gh-ost */ table `db`.`_tb1_gho` add column c
varchar(255)
- // create /* gh-ost */ table `db`.`_tb1_del` ...
- // DROP TABLE IF EXISTS `db`.`_tb1_del`
- // rename /* gh-ost */ table `db`.`tb1` to `db`.`_tb1_del`
- // rename /* gh-ost */ table `db`.`_tb1_gho` to `db`.`tb1`
- // DROP TABLE IF EXISTS `db`.`_tb1_ghc`
- // DROP TABLE IF EXISTS `db`.`_tb1_del`
- //
- // pt-osc:
- // CREATE TABLE `db`.`_test_tb1_new`
- // ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50)
- // CREATE TRIGGER `pt_osc_db_test_tb1_del`...
- // CREATE TRIGGER `pt_osc_db_test_tb1_upd`...
- // CREATE TRIGGER `pt_osc_db_test_tb1_ins`...
- // ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change */
- // RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`,
`db`.`_test_tb1_new` TO
- // `db`.`test_tb1`
- // DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server */
- // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del`
- // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd`
- // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins`
- //
- // Among all these, we only need the "ALTER" one that happens on
the `_gho`/`_new`
- // table.
- String ddl =
- mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
- .get(HistoryRecord.Fields.DDL_STATEMENTS)
- .asText()
- .toLowerCase();
- if (ddl.startsWith("alter")) {
- String tableName =
-
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY);
- return OSC_TABLE_ID_PATTERN.matcher(tableName).matches();
- }
-
- return false;
- } catch (JsonProcessingException e) {
- return false;
- }
- }
-
- private static final Pattern OSC_TABLE_ID_PATTERN =
Pattern.compile("^_(.*)_(gho|new)$");
-
- /** This utility method peels out gh-ost/pt-osc mangled tableId to the
original one. */
- public static TableId peelTableId(TableId tableId) {
- Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table());
- if (matchingResult.matches()) {
- return new TableId(tableId.catalog(), tableId.schema(),
matchingResult.group(1));
- }
- return tableId;
- }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java
index ec9da16d1..145bf4c0e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java
@@ -77,7 +77,7 @@ class MySqlOnLineSchemaMigrationSourceITCase extends
MySqlSourceTestBase {
private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";
- private static final String PERCONA_TOOLKIT =
"perconalab/percona-toolkit:3.5.7";
+ private static final String PERCONA_TOOLKIT =
"perconalab/percona-toolkit:3.7.1";
protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
createPerconaToolkitContainer();
@@ -174,7 +174,8 @@ class MySqlOnLineSchemaMigrationSourceITCase extends
MySqlSourceTestBase {
.serverId(getServerId())
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("UTC")
- .includeSchemaChanges(true) // output the schema
changes as well
+ .includeSchemaChanges(true)
+ .parseOnLineSchemaChanges(true)
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -334,7 +335,8 @@ class MySqlOnLineSchemaMigrationSourceITCase extends
MySqlSourceTestBase {
.serverId(getServerId())
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("UTC")
- .includeSchemaChanges(true) // output the schema
changes as well
+ .includeSchemaChanges(true)
+ .parseOnLineSchemaChanges(true)
.build();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java
index 8bc09a480..752b54f24 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java
@@ -78,7 +78,7 @@ class MySqlOnLineSchemaMigrationTableITCase extends
MySqlSourceTestBase {
private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";
- private static final String PERCONA_TOOLKIT =
"perconalab/percona-toolkit:3.5.7";
+ private static final String PERCONA_TOOLKIT =
"perconalab/percona-toolkit:3.7.1";
protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
createPerconaToolkitContainer();
@@ -177,7 +177,8 @@ class MySqlOnLineSchemaMigrationTableITCase extends
MySqlSourceTestBase {
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' =
'%s',"
+ " 'server-time-zone' = 'UTC',"
- + " 'server-id' = '%s'"
+ + " 'server-id' = '%s',"
+ + " 'scan.parse.online.schema.changes.enabled'
= 'true'"
+ ")",
MYSQL8_CONTAINER.getHost(),
MYSQL8_CONTAINER.getDatabasePort(),
@@ -346,7 +347,8 @@ class MySqlOnLineSchemaMigrationTableITCase extends
MySqlSourceTestBase {
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' =
'%s',"
+ " 'server-time-zone' = 'UTC',"
- + " 'server-id' = '%s'"
+ + " 'server-id' = '%s',"
+ + " 'scan.parse.online.schema.changes.enabled'
= 'true'"
+ ")",
MYSQL8_CONTAINER.getHost(),
MYSQL8_CONTAINER.getDatabasePort(),