This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 78d6adf0f [FLINK-39207][mysql] Fix mysql cdc could get stuck in 
backfill binlog reading when reuse snapshot split reader (#4311)
78d6adf0f is described below

commit 78d6adf0ff835f3e53c8537ad66104da58171ff5
Author: chengcongchina <[email protected]>
AuthorDate: Fri Mar 20 11:30:07 2026 +0800

    [FLINK-39207][mysql] Fix mysql cdc could get stuck in backfill binlog 
reading when reuse snapshot split reader (#4311)
---
 .../mysql/debezium/reader/SnapshotSplitReader.java |  7 +-
 .../reader/StoppableChangeEventSourceContext.java  |  4 ++
 .../debezium/reader/SnapshotSplitReaderTest.java   | 77 ++++++++++++++++++++--
 3 files changed, 80 insertions(+), 8 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
index b7b23a18e..e35492370 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
@@ -153,7 +153,7 @@ public class SnapshotSplitReader implements 
DebeziumReader<SourceRecords, MySqlS
         executorService.execute(
                 () -> {
                     try {
-                        currentTaskRunning = true;
+                        startCurrentTask();
                         final SnapshotSplitChangeEventSourceContextImpl 
sourceContext =
                                 new 
SnapshotSplitChangeEventSourceContextImpl();
 
@@ -432,6 +432,11 @@ public class SnapshotSplitReader implements 
DebeziumReader<SourceRecords, MySqlS
         }
     }
 
+    private void startCurrentTask() {
+        currentTaskRunning = true;
+        changeEventSourceContext.startChangeEventSource();
+    }
+
     private void stopCurrentTask() {
         currentTaskRunning = false;
         changeEventSourceContext.stopChangeEventSource();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/StoppableChangeEventSourceContext.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/StoppableChangeEventSourceContext.java
index bc9c9f981..96c5eb6b3 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/StoppableChangeEventSourceContext.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/StoppableChangeEventSourceContext.java
@@ -28,6 +28,10 @@ public class StoppableChangeEventSourceContext
 
     private volatile boolean isRunning = true;
 
+    public void startChangeEventSource() {
+        isRunning = true;
+    }
+
     public void stopChangeEventSource() {
         isRunning = false;
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
index 157138618..6598ed126 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
@@ -38,8 +38,8 @@ import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.sql.SQLException;
@@ -64,18 +64,17 @@ class SnapshotSplitReaderTest extends MySqlSourceTestBase {
     private static BinaryLogClient binaryLogClient;
     private static MySqlConnection mySqlConnection;
 
-    @BeforeAll
-    public static void init() {
+    @BeforeEach
+    public void beforeEach() {
         customerDatabase.createAndInitialize();
-        customer3_0Database.createAndInitialize();
         MySqlSourceConfig sourceConfig =
                 getConfig(customerDatabase, new String[] {"customers"}, 10);
         binaryLogClient = 
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
         mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
     }
 
-    @AfterAll
-    public static void afterClass() throws Exception {
+    @AfterEach
+    public void afterEach() throws Exception {
         if (mySqlConnection != null) {
             mySqlConnection.close();
         }
@@ -83,6 +82,7 @@ class SnapshotSplitReaderTest extends MySqlSourceTestBase {
         if (binaryLogClient != null) {
             binaryLogClient.disconnect();
         }
+        customerDatabase.dropDatabase();
     }
 
     @Test
@@ -113,6 +113,7 @@ class SnapshotSplitReaderTest extends MySqlSourceTestBase {
 
     @Test
     void testReadSingleSnapshotSplitWithDotName() throws Exception {
+        customer3_0Database.createAndInitialize();
         MySqlSourceConfig sourceConfig =
                 getConfig(customer3_0Database, new String[] {"customers3.0"}, 
4);
         BinaryLogClient binaryLogClient =
@@ -146,6 +147,9 @@ class SnapshotSplitReaderTest extends MySqlSourceTestBase {
                 };
         List<String> actual =
                 readTableSnapshotSplits(mySqlSplits, statefulTaskContext, 1, 
dataType);
+        mySqlConnection.close();
+        binaryLogClient.disconnect();
+
         assertEqualsInAnyOrder(Arrays.asList(expected), actual);
     }
 
@@ -565,6 +569,65 @@ class SnapshotSplitReaderTest extends MySqlSourceTestBase {
         assertEqualsInAnyOrder(Arrays.asList(expected), actual);
     }
 
+    @Test
+    void testMultipleSplitsWithBackfill() throws Exception {
+        String tableName = "customers_even_dist";
+        String tableId = customerDatabase.getDatabaseName() + "." + tableName;
+        MySqlSourceConfig sourceConfig = getConfig(customerDatabase, new 
String[] {tableName}, 4);
+        StatefulTaskContext statefulTaskContext =
+                new StatefulTaskContext(sourceConfig, binaryLogClient, 
mySqlConnection);
+
+        // Hook to make highWatermark > lowWatermark, enforcing backfill phase
+        SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
+        snapshotHooks.setPreHighWatermarkAction(
+                (mySqlConnection, split) -> {
+                    if (split.splitId().equals(tableId + ":0")) {
+                        mySqlConnection.execute(
+                                "UPDATE " + tableId + " SET address = 
'Beijing' WHERE id = 103");
+                        mySqlConnection.commit();
+                    } else if (split.splitId().equals(tableId + ":1")) {
+                        mySqlConnection.execute(
+                                "UPDATE " + tableId + " SET address = 
'Beijing' WHERE id = 106");
+                        mySqlConnection.commit();
+                    } else if (split.splitId().equals(tableId + ":2")) {
+                        mySqlConnection.execute(
+                                "UPDATE " + tableId + " SET address = 
'Beijing' WHERE id = 109");
+                        mySqlConnection.commit();
+                    }
+                });
+
+        final DataType dataType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("id", DataTypes.BIGINT()),
+                        DataTypes.FIELD("name", DataTypes.STRING()),
+                        DataTypes.FIELD("address", DataTypes.STRING()),
+                        DataTypes.FIELD("phone_number", DataTypes.STRING()));
+        List<MySqlSplit> mySqlSplits = getMySqlSplits(sourceConfig);
+
+        String[] expected =
+                new String[] {
+                    "+I[101, user_1, Shanghai, 123567891234]",
+                    "+I[102, user_2, Shanghai, 123567891234]",
+                    "+I[103, user_3, Beijing, 123567891234]",
+                    "+I[104, user_4, Shanghai, 123567891234]",
+                    "+I[105, user_5, Shanghai, 123567891234]",
+                    "+I[106, user_6, Beijing, 123567891234]",
+                    "+I[107, user_7, Shanghai, 123567891234]",
+                    "+I[108, user_8, Shanghai, 123567891234]",
+                    "+I[109, user_9, Beijing, 123567891234]",
+                    "+I[110, user_10, Shanghai, 123567891234]"
+                };
+
+        List<String> actual =
+                readTableSnapshotSplits(
+                        mySqlSplits,
+                        statefulTaskContext,
+                        mySqlSplits.size(),
+                        dataType,
+                        snapshotHooks);
+        assertEqualsInAnyOrder(Arrays.asList(expected), actual);
+    }
+
     private List<String> readTableSnapshotSplits(
             List<MySqlSplit> mySqlSplits,
             StatefulTaskContext statefulTaskContext,

Reply via email to