This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new c42e40d73 [FLINK-38139] Fix consecutive online schema change causes 
job failure (#4324)
c42e40d73 is described below

commit c42e40d736c1a4e95974e26ebdbaeb5a2d26f8f1
Author: Junbo Wang <[email protected]>
AuthorDate: Fri Mar 20 18:11:17 2026 +0800

    [FLINK-38139] Fix consecutive online schema change causes job failure 
(#4324)
---
 .../source/MySqlOnLineSchemaMigrationITCase.java   |   2 +-
 .../connectors/mysql/source/MySqlOscITCase.java    | 441 +++++++++++++++++++++
 .../src/test/resources/ddl/customer.sql            |  31 ++
 .../mysql/debezium/reader/BinlogSplitReader.java   |  38 +-
 .../source/utils/OnlineSchemaChangeUtils.java      | 205 ++++++++++
 .../connectors/mysql/source/utils/RecordUtils.java |  76 ----
 .../MySqlOnLineSchemaMigrationSourceITCase.java    |   8 +-
 .../MySqlOnLineSchemaMigrationTableITCase.java     |   8 +-
 8 files changed, 722 insertions(+), 87 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java
index 39ce4c7c0..9b1cd77cc 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java
@@ -82,7 +82,7 @@ class MySqlOnLineSchemaMigrationITCase extends 
MySqlSourceTestBase {
     private static final MySqlContainer MYSQL8_CONTAINER =
             createMySqlContainer(MySqlVersion.V8_0, 
"docker/server-gtids/expire-seconds/my.cnf");
 
-    private static final String PERCONA_TOOLKIT = 
"perconalab/percona-toolkit:3.5.7";
+    private static final String PERCONA_TOOLKIT = 
"perconalab/percona-toolkit:3.7.1";
 
     protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
             createPerconaToolkitContainer();
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java
new file mode 100644
index 000000000..2907a0405
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.utils.TestCaseUtils;
+import org.apache.flink.cdc.composer.PipelineExecution;
+import org.apache.flink.cdc.composer.definition.PipelineDef;
+import org.apache.flink.cdc.composer.definition.RouteDef;
+import org.apache.flink.cdc.composer.definition.SinkDef;
+import org.apache.flink.cdc.composer.definition.SourceDef;
+import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
+import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.connectors.values.ValuesDatabase;
+import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
+import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
+
+/** A more complicated IT case for Evolving MySQL schema with gh-ost/pt-osc 
utility. */
+class MySqlOscITCase extends MySqlSourceTestBase {
+    private static final MySqlContainer MYSQL8_CONTAINER = 
createMySqlContainer(MySqlVersion.V8_0);
+
+    private static final String PERCONA_TOOLKIT = 
"perconalab/percona-toolkit:3.7.1";
+
+    protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
+            createPerconaToolkitContainer();
+
+    private final UniqueDatabase customerDatabase =
+            new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, 
TEST_PASSWORD);
+
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+
+    private static final String GH_OST_DOWNLOAD_LINK =
+            
DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64")
+                    ? 
"https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz";
+                    : 
"https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz";;
+
+    private final PrintStream standardOut = System.out;
+    private final ByteArrayOutputStream outCaptor = new 
ByteArrayOutputStream();
+
+    @BeforeEach
+    void takeoverOutput() {
+        System.setOut(new PrintStream(outCaptor));
+    }
+
+    @AfterEach
+    protected void handInStdOut() {
+        System.setOut(standardOut);
+        outCaptor.reset();
+    }
+
+    @BeforeAll
+    static void beforeClass() {
+        LOG.info("Starting MySql8 containers...");
+        Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
+        Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join();
+        LOG.info("Container MySql8 is started.");
+    }
+
+    @AfterAll
+    static void afterClass() {
+        LOG.info("Stopping MySql8 containers...");
+        MYSQL8_CONTAINER.stop();
+        PERCONA_TOOLKIT_CONTAINER.stop();
+        LOG.info("Container MySql8 is stopped.");
+    }
+
+    @BeforeEach
+    void before() {
+        customerDatabase.createAndInitialize();
+        TestValuesTableFactory.clearAllData();
+        ValuesDatabase.clear();
+        env.setParallelism(4);
+        env.enableCheckpointing(200);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+    }
+
+    @AfterEach
+    void after() {
+        customerDatabase.dropDatabase();
+    }
+
+    private static void installGhOstCli(Container<?> container) {
+        try {
+            execInContainer(
+                    container,
+                    "download gh-ost tarball",
+                    "curl",
+                    "-L",
+                    "-o",
+                    "/tmp/gh-ost.tar.gz",
+                    GH_OST_DOWNLOAD_LINK);
+            execInContainer(
+                    container, "unzip binary", "tar", "-xzvf", 
"/tmp/gh-ost.tar.gz", "-C", "/bin");
+        } catch (IOException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static GenericContainer<?> createPerconaToolkitContainer() {
+        GenericContainer<?> perconaToolkit =
+                new GenericContainer<>(PERCONA_TOOLKIT)
+                        // keep container alive
+                        .withCommand("tail", "-f", "/dev/null")
+                        .withNetwork(NETWORK)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+        return perconaToolkit;
+    }
+
+    private void insertRecordsPhase1(UniqueDatabase database, int startIndex, 
int count)
+            throws Exception {
+        try (Connection connection = database.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            for (int i = startIndex; i < startIndex + count; i++) {
+                statement.execute(
+                        String.format(
+                                "insert into customers (id, name, address, 
phone_number) values (%s, '%s', '%s', '%s');",
+                                i, "flink_" + i, "Address Line #" + i, 
1000000000L + i));
+            }
+        }
+    }
+
+    private void insertRecordsPhase2(UniqueDatabase database, int startIndex, 
int count)
+            throws Exception {
+        try (Connection connection = database.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            for (int i = startIndex; i < startIndex + count; i++) {
+                statement.execute(
+                        String.format(
+                                "insert into customers (id, name, address, 
phone_number, ext) values (%s, '%s', '%s', '%s', %s);",
+                                i, "flink_" + i, "Address Line #" + i, 
1000000000L + i, i));
+            }
+        }
+    }
+
+    private void insertRecordsPhase1ForAutoId(UniqueDatabase database, int 
count) throws Exception {
+        try (Connection connection = database.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            for (int i = 0; i < count; i++) {
+                statement.execute(
+                        String.format(
+                                "insert into customers_auto_id (name, address, 
phone_number) values ('%s', '%s', '%s');",
+                                "flink_" + i, "Address Line #" + i, 
1000000000L + i));
+            }
+        }
+    }
+
+    private void insertRecordsPhase2ForAutoId(UniqueDatabase database, int 
count) throws Exception {
+        try (Connection connection = database.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            for (int i = 0; i < count; i++) {
+                statement.execute(
+                        String.format(
+                                "insert into customers_auto_id (name, address, 
phone_number, ext) values ('%s', '%s', '%s', %s);",
+                                "flink_ext_" + i, "Address Line Ext #" + i, 
1000000000L + i, i));
+            }
+        }
+    }
+
+    @Test
+    void testGhOstSchemaMigration() throws Exception {
+        String databaseName = customerDatabase.getDatabaseName();
+
+        LOG.info("Step 1: Install gh-ost command line utility");
+        installGhOstCli(MYSQL8_CONTAINER);
+
+        Thread yamlJob = runJob(databaseName, "customers");
+        yamlJob.start();
+
+        LOG.info("Step 2: Start pipeline job");
+        insertRecordsPhase1(customerDatabase, 5000, 1000);
+
+        LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN");
+
+        Thread thread =
+                new Thread(
+                        () -> {
+                            try {
+                                execInContainer(
+                                        MYSQL8_CONTAINER,
+                                        "evolve schema",
+                                        "gh-ost",
+                                        "--user=" + TEST_USER,
+                                        "--password=" + TEST_PASSWORD,
+                                        "--database=" + databaseName,
+                                        "--table=customers",
+                                        "--alter=add column ext int first",
+                                        "--allow-on-master", // because we 
don't have a replica
+                                        "--initially-drop-old-table", // drop 
previously generated
+                                        // temporary tables
+                                        "--execute");
+                            } catch (IOException | InterruptedException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        thread.start();
+        insertRecordsPhase1(customerDatabase, 7000, 3000);
+
+        thread.join();
+        insertRecordsPhase2(customerDatabase, 12000, 1000);
+
+        try {
+            TestCaseUtils.repeatedCheck(
+                    () -> 
outCaptor.toString().split(System.lineSeparator()).length == 5023);
+        } catch (Exception e) {
+            LOG.error("Failed to verify results. Captured stdout: {}", 
outCaptor.toString(), e);
+        } finally {
+            yamlJob.interrupt();
+        }
+    }
+
+    @Test
+    void testPtOscSchemaMigration() throws Exception {
+        String databaseName = customerDatabase.getDatabaseName();
+
+        LOG.info("Step 1: Install gh-ost command line utility");
+        installGhOstCli(MYSQL8_CONTAINER);
+
+        Thread yamlJob = runJob(databaseName, "customers");
+        yamlJob.start();
+
+        LOG.info("Step 2: Start pipeline job");
+        insertRecordsPhase1(customerDatabase, 5000, 1000);
+
+        LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN");
+
+        Thread thread =
+                new Thread(
+                        () -> {
+                            try {
+                                execInContainer(
+                                        PERCONA_TOOLKIT_CONTAINER,
+                                        "evolve schema",
+                                        "pt-online-schema-change",
+                                        "--user=" + TEST_USER,
+                                        "--host=" + 
INTER_CONTAINER_MYSQL_ALIAS,
+                                        "--password=" + TEST_PASSWORD,
+                                        "P=3306,t=customers,D=" + databaseName,
+                                        "--alter",
+                                        "add column ext int first",
+                                        "--charset=utf8",
+                                        "--recursion-method=NONE", // Do not 
look for slave nodes
+                                        "--print",
+                                        "--execute");
+                            } catch (IOException | InterruptedException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        LOG.info("Insertion Phase 1 finishes");
+        thread.start();
+        insertRecordsPhase1(customerDatabase, 7000, 3000);
+        LOG.info("Insertion Phase 2 finishes");
+
+        thread.join();
+        insertRecordsPhase2(customerDatabase, 12000, 1000);
+        LOG.info("Insertion Phase 3 finishes");
+
+        try {
+            TestCaseUtils.repeatedCheck(
+                    () -> 
outCaptor.toString().split(System.lineSeparator()).length == 5023);
+        } catch (Exception e) {
+            LOG.error("Failed to verify results. Captured stdout: {}", 
outCaptor.toString(), e);
+        } finally {
+            yamlJob.interrupt();
+        }
+    }
+
+    @Test
+    void testPtOscSchemaMigrationWithAutoIncrementId() throws Exception {
+        String databaseName = customerDatabase.getDatabaseName();
+
+        LOG.info("Step 1: Start pipeline job for auto increment id table");
+
+        Thread yamlJob = runJob(databaseName, "customers_auto_id");
+        yamlJob.start();
+
+        LOG.info("Step 2: Insert initial records (Phase 1)");
+        insertRecordsPhase1ForAutoId(customerDatabase, 1000);
+
+        LOG.info("Step 3: Evolve schema with pt-osc - ADD COLUMN");
+
+        Thread thread =
+                new Thread(
+                        () -> {
+                            try {
+                                execInContainer(
+                                        PERCONA_TOOLKIT_CONTAINER,
+                                        "evolve schema",
+                                        "pt-online-schema-change",
+                                        "--user=" + TEST_USER,
+                                        "--host=" + 
INTER_CONTAINER_MYSQL_ALIAS,
+                                        "--password=" + TEST_PASSWORD,
+                                        "P=3306,t=customers_auto_id,D=" + 
databaseName,
+                                        "--alter",
+                                        "add column ext int first",
+                                        "--charset=utf8",
+                                        "--recursion-method=NONE", // Do not 
look for slave nodes
+                                        "--print",
+                                        "--execute");
+                            } catch (IOException | InterruptedException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        LOG.info("Insertion Phase 1 finishes");
+        thread.start();
+        insertRecordsPhase1ForAutoId(customerDatabase, 3000);
+        LOG.info("Insertion Phase 2 finishes");
+
+        thread.join();
+        insertRecordsPhase2ForAutoId(customerDatabase, 1000);
+        LOG.info("Insertion Phase 3 finishes");
+
+        // Initial 21 records + Phase1 1000 + Phase2 3000 + Phase3 1000 = 5021
+        try {
+            TestCaseUtils.repeatedCheck(
+                    () -> 
outCaptor.toString().split(System.lineSeparator()).length == 5021);
+        } catch (Exception e) {
+            LOG.error("Failed to verify results. Captured stdout: {}", 
outCaptor.toString(), e);
+        } finally {
+            yamlJob.interrupt();
+        }
+    }
+
+    private static void execInContainer(Container<?> container, String prompt, 
String... commands)
+            throws IOException, InterruptedException {
+        {
+            LOG.info(
+                    "Starting to {} with the following command: `{}`",
+                    prompt,
+                    String.join(" ", commands));
+            Container.ExecResult execResult = 
container.execInContainer(commands);
+            if (execResult.getExitCode() == 0) {
+                LOG.info("Successfully {}. Stdout: {}", prompt, 
execResult.getStdout());
+            } else {
+                LOG.error(
+                        "Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}",
+                        prompt,
+                        execResult.getExitCode(),
+                        execResult.getStdout(),
+                        execResult.getStderr());
+                throw new IOException("Failed to execute commands: " + 
String.join(" ", commands));
+            }
+        }
+    }
+
+    private Thread runJob(String databaseName, String tableName) {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup MySQL source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(MySqlDataSourceOptions.HOSTNAME, 
MYSQL8_CONTAINER.getHost());
+        sourceConfig.set(MySqlDataSourceOptions.PORT, 
MYSQL8_CONTAINER.getDatabasePort());
+        sourceConfig.set(MySqlDataSourceOptions.USERNAME, TEST_USER);
+        sourceConfig.set(MySqlDataSourceOptions.PASSWORD, TEST_PASSWORD);
+        sourceConfig.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC");
+        sourceConfig.set(MySqlDataSourceOptions.TABLES, databaseName + "." + 
tableName);
+        sourceConfig.set(MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES, 
true);
+
+        SourceDef sourceDef =
+                new SourceDef(MySqlDataSourceFactory.IDENTIFIER, "MySQL 
Source", sourceConfig);
+
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.singletonList(
+                                new RouteDef(
+                                        databaseName + "." + tableName,
+                                        "sink_db.sink_tbl",
+                                        null,
+                                        null)),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        PipelineExecution execution = composer.compose(pipelineDef);
+        return new Thread(
+                () -> {
+                    try {
+                        execution.execute();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql
index e4df63f1a..b840f4358 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql
@@ -326,3 +326,34 @@ CREATE TABLE default_value_test (
 INSERT INTO default_value_test
 VALUES (1,'user1','Shanghai',123567),
        (2,'user2','Shanghai',123567);
+
+-- table has auto increment primary key for pt-osc testing
+CREATE TABLE customers_auto_id (
+  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+  name VARCHAR(255) NOT NULL DEFAULT 'flink',
+  address VARCHAR(1024),
+  phone_number VARCHAR(512)
+);
+
+INSERT INTO customers_auto_id
+VALUES (default, 'user_1', 'Shanghai', '123567891234'),
+       (default, 'user_2', 'Shanghai', '123567891234'),
+       (default, 'user_3', 'Shanghai', '123567891234'),
+       (default, 'user_4', 'Shanghai', '123567891234'),
+       (default, 'user_5', 'Shanghai', '123567891234'),
+       (default, 'user_6', 'Shanghai', '123567891234'),
+       (default, 'user_7', 'Shanghai', '123567891234'),
+       (default, 'user_8', 'Shanghai', '123567891234'),
+       (default, 'user_9', 'Shanghai', '123567891234'),
+       (default, 'user_10', 'Shanghai', '123567891234'),
+       (default, 'user_11', 'Shanghai', '123567891234'),
+       (default, 'user_12', 'Shanghai', '123567891234'),
+       (default, 'user_13', 'Shanghai', '123567891234'),
+       (default, 'user_14', 'Shanghai', '123567891234'),
+       (default, 'user_15', 'Shanghai', '123567891234'),
+       (default, 'user_16', 'Shanghai', '123567891234'),
+       (default, 'user_17', 'Shanghai', '123567891234'),
+       (default, 'user_18', 'Shanghai', '123567891234'),
+       (default, 'user_19', 'Shanghai', '123567891234'),
+       (default, 'user_20', 'Shanghai', '123567891234'),
+       (default, 'user_21', 'Shanghai', '123567891234');
\ No newline at end of file
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
index 77682534f..b1e6d1dfc 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
 import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
 import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
+import 
org.apache.flink.cdc.connectors.mysql.source.utils.OnlineSchemaChangeUtils;
 import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
 import org.apache.flink.cdc.connectors.mysql.source.utils.SplitKeyUtils;
 import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
@@ -92,6 +93,7 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
             new StoppableChangeEventSourceContext();
     private final boolean isParsingOnLineSchemaChanges;
     private final boolean isBackfillSkipped;
+    private final Map<String, List<SourceRecord>> pendingSchemaChangeEvents;
 
     private static final long READER_CLOSE_TIMEOUT = 30L;
 
@@ -114,6 +116,7 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
         this.isParsingOnLineSchemaChanges =
                 
statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges();
         this.isBackfillSkipped = 
statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill();
+        this.pendingSchemaChangeEvents = new HashMap<>();
     }
 
     public void submitSplit(MySqlSplit mySqlSplit) {
@@ -181,8 +184,35 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
                     Optional<SourceRecord> oscRecord =
                             parseOnLineSchemaChangeEvent(event.getRecord());
                     if (oscRecord.isPresent()) {
-                        sourceRecords.add(oscRecord.get());
-                        continue;
+                        TableId tableId = 
RecordUtils.getTableId(oscRecord.get());
+                        if (tableId != null) {
+                            LOG.info(
+                                    "Received the start event of online schema 
change: {}. Save it for later.",
+                                    oscRecord.get());
+                            pendingSchemaChangeEvents
+                                    .computeIfAbsent(tableId.toString(), k -> 
new ArrayList<>())
+                                    .add(oscRecord.get());
+                            continue;
+                        }
+                    }
+
+                    Optional<String> finishedTables =
+                            
OnlineSchemaChangeUtils.parseOnLineSchemaRenameEvent(event.getRecord());
+                    if (finishedTables.isPresent()) {
+                        TableId tableId = 
RecordUtils.getTableId(event.getRecord());
+                        String finishedTableId = tableId.catalog() + "." + 
finishedTables.get();
+                        LOG.info(
+                                "Received the ending event of table {}. Emit 
corresponding DDL event now.",
+                                finishedTableId);
+
+                        if 
(pendingSchemaChangeEvents.containsKey(finishedTableId)) {
+                            
sourceRecords.addAll(pendingSchemaChangeEvents.remove(finishedTableId));
+                        } else {
+                            LOG.error(
+                                    "Error: met an unexpected osc finish 
event. Current pending events: {}, Record: {}",
+                                    pendingSchemaChangeEvents,
+                                    event);
+                        }
                     }
                 }
                 if (shouldEmit(event.getRecord())) {
@@ -228,11 +258,11 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
     }
 
     private Optional<SourceRecord> parseOnLineSchemaChangeEvent(SourceRecord 
sourceRecord) {
-        if (RecordUtils.isOnLineSchemaChangeEvent(sourceRecord)) {
+        if (OnlineSchemaChangeUtils.isOnLineSchemaChangeEvent(sourceRecord)) {
             // This is a gh-ost initialized schema change event and should be 
emitted if the
             // peeled tableId matches the predicate.
             TableId originalTableId = RecordUtils.getTableId(sourceRecord);
-            TableId peeledTableId = RecordUtils.peelTableId(originalTableId);
+            TableId peeledTableId = 
OnlineSchemaChangeUtils.peelTableId(originalTableId);
             if (capturedTableFilter.test(peeledTableId)) {
                 return Optional.of(
                         RecordUtils.setTableId(sourceRecord, originalTableId, 
peeledTableId));
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/OnlineSchemaChangeUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/OnlineSchemaChangeUtils.java
new file mode 100644
index 000000000..aef47a4d3
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/OnlineSchemaChangeUtils.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.utils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.debezium.data.Envelope;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.HistoryRecord;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
+import static 
org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.HISTORY_RECORD_FIELD;
+
+/** Utility class for handling gh-ost/pt-osc online schema change events. */
+public class OnlineSchemaChangeUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OnlineSchemaChangeUtils.class);
+
+    private OnlineSchemaChangeUtils() {}
+
+    /**
+     * Pattern matching gh-ost shadow table ({@code _<name>_gho}) and pt-osc 
new table ({@code
+     * _<name>_new}), which carry the actual ALTER DDL during an online schema 
change.
+     */
+    private static final Pattern OSC_TABLE_ID_PATTERN = 
Pattern.compile("^_(.*)_(gho|new)$");
+
+    /**
+     * Pattern matching gh-ost delete table ({@code _<name>_del}) and pt-osc 
old table ({@code
+     * _<name>_old}), which are the temporary backup tables created during an 
online schema change.
+     */
+    private static final Pattern OSC_TEMP_TABLE_ID_PATTERN = 
Pattern.compile("^_(.*)_(del|old)$");
+
+    /**
+     * Checks whether the given source record is a gh-ost/pt-osc initiated 
schema change event by
+     * inspecting the ALTER DDL statement targeting a shadow/new table.
+     *
+     * <p>There will be these schema change events generated in total during 
one transaction.
+     *
+     * <p>gh-ost:
+     *
+     * <pre>
+     * DROP TABLE IF EXISTS `db`.`_tb1_gho`
+     * DROP TABLE IF EXISTS `db`.`_tb1_del`
+     * DROP TABLE IF EXISTS `db`.`_tb1_ghc`
+     * create /* gh-ost *&#47; table `db`.`_tb1_ghc` ...
+     * create /* gh-ost *&#47; table `db`.`_tb1_gho` like `db`.`tb1`
+     * alter /* gh-ost *&#47; table `db`.`_tb1_gho` add column c varchar(255)
+     * alter /* gh-ost *&#47; table `db`.`_tb1_gho` AUTO_INCREMENT=N  (only 
present when the table has an AUTO_INCREMENT column)
+     * create /* gh-ost *&#47; table `db`.`_tb1_del` ...
+     * DROP TABLE IF EXISTS `db`.`_tb1_del`
+     * rename /* gh-ost *&#47; table `db`.`tb1` to `db`.`_tb1_del`
+     * rename /* gh-ost *&#47; table `db`.`_tb1_gho` to `db`.`tb1`
+     * DROP TABLE IF EXISTS `db`.`_tb1_ghc`
+     * DROP TABLE IF EXISTS `db`.`_tb1_del`
+     * </pre>
+     *
+     * <p>pt-osc:
+     *
+     * <pre>
+     * CREATE TABLE `db`.`_test_tb1_new`
+     * ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50)
+     * CREATE TRIGGER `pt_osc_db_test_tb1_del`...
+     * CREATE TRIGGER `pt_osc_db_test_tb1_upd`...
+     * CREATE TRIGGER `pt_osc_db_test_tb1_ins`...
+     * ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change *&#47;
+     * RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`, 
`db`.`_test_tb1_new` TO `db`.`test_tb1`
+     * DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server *&#47;
+     * DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del`
+     * DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd`
+     * DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins`
+     * </pre>
+     *
+     * <p>Among all these, only the ALTER statement targeting the {@code 
_gho}/{@code _new} table is
+     * stored temporarily, and emitted when the subsequent RENAME TABLE event 
arrives.
+     */
+    public static boolean isOnLineSchemaChangeEvent(SourceRecord record) {
+        if (!RecordUtils.isSchemaChangeEvent(record)) {
+            return false;
+        }
+        Struct value = (Struct) record.value();
+        ObjectMapper mapper = new ObjectMapper();
+        try {
+            String ddl =
+                    mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
+                            .get(HistoryRecord.Fields.DDL_STATEMENTS)
+                            .asText()
+                            .toLowerCase();
+            if (ddl.startsWith("alter")) {
+                String tableName =
+                        
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY);
+                return OSC_TABLE_ID_PATTERN.matcher(tableName).matches();
+            }
+            return false;
+        } catch (JsonProcessingException e) {
+            return false;
+        }
+    }
+
+    /**
+     * Parses a gh-ost/pt-osc RENAME TABLE event and returns the original 
(user-facing) table name
+     * if the event represents the completion of an online schema change.
+     *
+     * @return the original table name if the record is an OSC completion 
rename, or {@link
+     *     Optional#empty()} otherwise.
+     */
+    public static Optional<String> parseOnLineSchemaRenameEvent(SourceRecord 
record) {
+        if (!RecordUtils.isSchemaChangeEvent(record)) {
+            return Optional.empty();
+        }
+        Struct value = (Struct) record.value();
+        ObjectMapper mapper = new ObjectMapper();
+
+        try {
+            String ddl =
+                    mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
+                            .get(HistoryRecord.Fields.DDL_STATEMENTS)
+                            .asText()
+                            .toLowerCase();
+            if (ddl.startsWith("rename table") || ddl.startsWith("rename /* 
gh-ost */ table")) {
+                LOG.info("Checking if DDL might be an OSC renaming event... 
{}", ddl);
+                List<String> tableNames =
+                        Arrays.asList(
+                                value.getStruct(Envelope.FieldName.SOURCE)
+                                        .getString(TABLE_NAME_KEY)
+                                        .split(","));
+                if (tableNames.size() != 2) {
+                    LOG.info(
+                            "Table name {} is malformed, skip it.",
+                            
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY));
+                    return Optional.empty();
+                }
+
+                String renamedFromTableName =
+                        Collections.min(tableNames, 
Comparator.comparingInt(String::length));
+                String renamedToTableName =
+                        Collections.max(tableNames, 
Comparator.comparingInt(String::length));
+
+                LOG.info(
+                        "Determined the shorter TableId {} is the renaming 
source.",
+                        renamedFromTableName);
+                LOG.info(
+                        "Determined the longer TableId {} is the renaming 
target.",
+                        renamedToTableName);
+
+                if 
(OSC_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) {
+                    LOG.info(
+                            "Renamed to TableId name {} matches OSC temporary 
TableId pattern, yield {}.",
+                            renamedToTableName,
+                            renamedFromTableName);
+                    return Optional.of(renamedFromTableName);
+                }
+
+                LOG.info(
+                        "Renamed to TableId {} does not match any RegEx 
pattern, skip it.",
+                        renamedToTableName);
+            }
+            return Optional.empty();
+        } catch (JsonProcessingException e) {
+            LOG.warn("Failed to parse schema change event {}", value, e);
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * Peels out a gh-ost/pt-osc mangled {@link TableId} back to the original 
user-facing one.
+     *
+     * <p>For example, {@code _customers_gho} → {@code customers}, {@code 
_orders_new} → {@code
+     * orders}.
+     */
+    public static TableId peelTableId(TableId tableId) {
+        Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table());
+        if (matchingResult.matches()) {
+            return new TableId(tableId.catalog(), tableId.schema(), 
matchingResult.group(1));
+        }
+        return tableId;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
index bf4d4f29f..1f9a041d7 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
@@ -25,9 +25,6 @@ import 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitI
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
 import org.apache.flink.table.types.logical.RowType;
 
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
 import io.debezium.data.Envelope;
 import io.debezium.document.Document;
 import io.debezium.document.DocumentReader;
@@ -51,8 +48,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
@@ -469,75 +464,4 @@ public class RecordUtils {
         }
         return Optional.empty();
     }
-
-    /**
-     * This utility method checks if given source record is a gh-ost/pt-osc 
initiated schema change
-     * event by checking the "alter" ddl.
-     */
-    public static boolean isOnLineSchemaChangeEvent(SourceRecord record) {
-        if (!isSchemaChangeEvent(record)) {
-            return false;
-        }
-        Struct value = (Struct) record.value();
-        ObjectMapper mapper = new ObjectMapper();
-        try {
-            // There will be these schema change events generated in total 
during one transaction.
-            //
-            // gh-ost:
-            // DROP TABLE IF EXISTS `db`.`_tb1_gho`
-            // DROP TABLE IF EXISTS `db`.`_tb1_del`
-            // DROP TABLE IF EXISTS `db`.`_tb1_ghc`
-            // create /* gh-ost */ table `db`.`_tb1_ghc` ...
-            // create /* gh-ost */ table `db`.`_tb1_gho` like `db`.`tb1`
-            // alter /* gh-ost */ table `db`.`_tb1_gho` add column c 
varchar(255)
-            // create /* gh-ost */ table `db`.`_tb1_del` ...
-            // DROP TABLE IF EXISTS `db`.`_tb1_del`
-            // rename /* gh-ost */ table `db`.`tb1` to `db`.`_tb1_del`
-            // rename /* gh-ost */ table `db`.`_tb1_gho` to `db`.`tb1`
-            // DROP TABLE IF EXISTS `db`.`_tb1_ghc`
-            // DROP TABLE IF EXISTS `db`.`_tb1_del`
-            //
-            // pt-osc:
-            // CREATE TABLE `db`.`_test_tb1_new`
-            // ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50)
-            // CREATE TRIGGER `pt_osc_db_test_tb1_del`...
-            // CREATE TRIGGER `pt_osc_db_test_tb1_upd`...
-            // CREATE TRIGGER `pt_osc_db_test_tb1_ins`...
-            // ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change */
-            // RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`, 
`db`.`_test_tb1_new` TO
-            // `db`.`test_tb1`
-            // DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server */
-            // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del`
-            // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd`
-            // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins`
-            //
-            // Among all these, we only need the "ALTER" one that happens on 
the `_gho`/`_new`
-            // table.
-            String ddl =
-                    mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
-                            .get(HistoryRecord.Fields.DDL_STATEMENTS)
-                            .asText()
-                            .toLowerCase();
-            if (ddl.startsWith("alter")) {
-                String tableName =
-                        
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY);
-                return OSC_TABLE_ID_PATTERN.matcher(tableName).matches();
-            }
-
-            return false;
-        } catch (JsonProcessingException e) {
-            return false;
-        }
-    }
-
-    private static final Pattern OSC_TABLE_ID_PATTERN = 
Pattern.compile("^_(.*)_(gho|new)$");
-
-    /** This utility method peels out gh-ost/pt-osc mangled tableId to the 
original one. */
-    public static TableId peelTableId(TableId tableId) {
-        Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table());
-        if (matchingResult.matches()) {
-            return new TableId(tableId.catalog(), tableId.schema(), 
matchingResult.group(1));
-        }
-        return tableId;
-    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java
index ec9da16d1..145bf4c0e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java
@@ -77,7 +77,7 @@ class MySqlOnLineSchemaMigrationSourceITCase extends 
MySqlSourceTestBase {
     private static final String TEST_USER = "mysqluser";
     private static final String TEST_PASSWORD = "mysqlpw";
 
-    private static final String PERCONA_TOOLKIT = 
"perconalab/percona-toolkit:3.5.7";
+    private static final String PERCONA_TOOLKIT = 
"perconalab/percona-toolkit:3.7.1";
 
     protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
             createPerconaToolkitContainer();
@@ -174,7 +174,8 @@ class MySqlOnLineSchemaMigrationSourceITCase extends 
MySqlSourceTestBase {
                         .serverId(getServerId())
                         .deserializer(new JsonDebeziumDeserializationSchema())
                         .serverTimeZone("UTC")
-                        .includeSchemaChanges(true) // output the schema 
changes as well
+                        .includeSchemaChanges(true)
+                        .parseOnLineSchemaChanges(true)
                         .build();
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -334,7 +335,8 @@ class MySqlOnLineSchemaMigrationSourceITCase extends 
MySqlSourceTestBase {
                         .serverId(getServerId())
                         .deserializer(new JsonDebeziumDeserializationSchema())
                         .serverTimeZone("UTC")
-                        .includeSchemaChanges(true) // output the schema 
changes as well
+                        .includeSchemaChanges(true)
+                        .parseOnLineSchemaChanges(true)
                         .build();
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java
index 8bc09a480..752b54f24 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java
@@ -78,7 +78,7 @@ class MySqlOnLineSchemaMigrationTableITCase extends 
MySqlSourceTestBase {
     private static final String TEST_USER = "mysqluser";
     private static final String TEST_PASSWORD = "mysqlpw";
 
-    private static final String PERCONA_TOOLKIT = 
"perconalab/percona-toolkit:3.5.7";
+    private static final String PERCONA_TOOLKIT = 
"perconalab/percona-toolkit:3.7.1";
 
     protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
             createPerconaToolkitContainer();
@@ -177,7 +177,8 @@ class MySqlOnLineSchemaMigrationTableITCase extends 
MySqlSourceTestBase {
                                 + " 'table-name' = '%s',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'%s',"
                                 + " 'server-time-zone' = 'UTC',"
-                                + " 'server-id' = '%s'"
+                                + " 'server-id' = '%s',"
+                                + " 'scan.parse.online.schema.changes.enabled' 
= 'true'"
                                 + ")",
                         MYSQL8_CONTAINER.getHost(),
                         MYSQL8_CONTAINER.getDatabasePort(),
@@ -346,7 +347,8 @@ class MySqlOnLineSchemaMigrationTableITCase extends 
MySqlSourceTestBase {
                                 + " 'table-name' = '%s',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'%s',"
                                 + " 'server-time-zone' = 'UTC',"
-                                + " 'server-id' = '%s'"
+                                + " 'server-id' = '%s',"
+                                + " 'scan.parse.online.schema.changes.enabled' 
= 'true'"
                                 + ")",
                         MYSQL8_CONTAINER.getHost(),
                         MYSQL8_CONTAINER.getDatabasePort(),

Reply via email to