This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 78d6adf0f [FLINK-39207][mysql] Fix mysql cdc could get stuck in
backfill binlog reading when reuse snapshot split reader (#4311)
78d6adf0f is described below
commit 78d6adf0ff835f3e53c8537ad66104da58171ff5
Author: chengcongchina <[email protected]>
AuthorDate: Fri Mar 20 11:30:07 2026 +0800
[FLINK-39207][mysql] Fix mysql cdc could get stuck in backfill binlog
reading when reuse snapshot split reader (#4311)
---
.../mysql/debezium/reader/SnapshotSplitReader.java | 7 +-
.../reader/StoppableChangeEventSourceContext.java | 4 ++
.../debezium/reader/SnapshotSplitReaderTest.java | 77 ++++++++++++++++++++--
3 files changed, 80 insertions(+), 8 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
index b7b23a18e..e35492370 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java
@@ -153,7 +153,7 @@ public class SnapshotSplitReader implements
DebeziumReader<SourceRecords, MySqlS
executorService.execute(
() -> {
try {
- currentTaskRunning = true;
+ startCurrentTask();
final SnapshotSplitChangeEventSourceContextImpl
sourceContext =
new
SnapshotSplitChangeEventSourceContextImpl();
@@ -432,6 +432,11 @@ public class SnapshotSplitReader implements
DebeziumReader<SourceRecords, MySqlS
}
}
+ private void startCurrentTask() {
+ currentTaskRunning = true;
+ changeEventSourceContext.startChangeEventSource();
+ }
+
private void stopCurrentTask() {
currentTaskRunning = false;
changeEventSourceContext.stopChangeEventSource();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/StoppableChangeEventSourceContext.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/StoppableChangeEventSourceContext.java
index bc9c9f981..96c5eb6b3 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/StoppableChangeEventSourceContext.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/StoppableChangeEventSourceContext.java
@@ -28,6 +28,10 @@ public class StoppableChangeEventSourceContext
private volatile boolean isRunning = true;
+ public void startChangeEventSource() {
+ isRunning = true;
+ }
+
public void stopChangeEventSource() {
isRunning = false;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
index 157138618..6598ed126 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java
@@ -38,8 +38,8 @@ import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.sql.SQLException;
@@ -64,18 +64,17 @@ class SnapshotSplitReaderTest extends MySqlSourceTestBase {
private static BinaryLogClient binaryLogClient;
private static MySqlConnection mySqlConnection;
- @BeforeAll
- public static void init() {
+ @BeforeEach
+ public void beforeEach() {
customerDatabase.createAndInitialize();
- customer3_0Database.createAndInitialize();
MySqlSourceConfig sourceConfig =
getConfig(customerDatabase, new String[] {"customers"}, 10);
binaryLogClient =
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
}
- @AfterAll
- public static void afterClass() throws Exception {
+ @AfterEach
+ public void afterEach() throws Exception {
if (mySqlConnection != null) {
mySqlConnection.close();
}
@@ -83,6 +82,7 @@ class SnapshotSplitReaderTest extends MySqlSourceTestBase {
if (binaryLogClient != null) {
binaryLogClient.disconnect();
}
+ customerDatabase.dropDatabase();
}
@Test
@@ -113,6 +113,7 @@ class SnapshotSplitReaderTest extends MySqlSourceTestBase {
@Test
void testReadSingleSnapshotSplitWithDotName() throws Exception {
+ customer3_0Database.createAndInitialize();
MySqlSourceConfig sourceConfig =
getConfig(customer3_0Database, new String[] {"customers3.0"},
4);
BinaryLogClient binaryLogClient =
@@ -146,6 +147,9 @@ class SnapshotSplitReaderTest extends MySqlSourceTestBase {
};
List<String> actual =
readTableSnapshotSplits(mySqlSplits, statefulTaskContext, 1,
dataType);
+ mySqlConnection.close();
+ binaryLogClient.disconnect();
+
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@@ -565,6 +569,65 @@ class SnapshotSplitReaderTest extends MySqlSourceTestBase {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
+ @Test
+ void testMultipleSplitsWithBackfill() throws Exception {
+ String tableName = "customers_even_dist";
+ String tableId = customerDatabase.getDatabaseName() + "." + tableName;
+ MySqlSourceConfig sourceConfig = getConfig(customerDatabase, new
String[] {tableName}, 4);
+ StatefulTaskContext statefulTaskContext =
+ new StatefulTaskContext(sourceConfig, binaryLogClient,
mySqlConnection);
+
+ // Hook to make highWatermark > lowWatermark, enforcing backfill phase
+ SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
+ snapshotHooks.setPreHighWatermarkAction(
+ (mySqlConnection, split) -> {
+ if (split.splitId().equals(tableId + ":0")) {
+ mySqlConnection.execute(
+ "UPDATE " + tableId + " SET address =
'Beijing' WHERE id = 103");
+ mySqlConnection.commit();
+ } else if (split.splitId().equals(tableId + ":1")) {
+ mySqlConnection.execute(
+ "UPDATE " + tableId + " SET address =
'Beijing' WHERE id = 106");
+ mySqlConnection.commit();
+ } else if (split.splitId().equals(tableId + ":2")) {
+ mySqlConnection.execute(
+ "UPDATE " + tableId + " SET address =
'Beijing' WHERE id = 109");
+ mySqlConnection.commit();
+ }
+ });
+
+ final DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.BIGINT()),
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("address", DataTypes.STRING()),
+ DataTypes.FIELD("phone_number", DataTypes.STRING()));
+ List<MySqlSplit> mySqlSplits = getMySqlSplits(sourceConfig);
+
+ String[] expected =
+ new String[] {
+ "+I[101, user_1, Shanghai, 123567891234]",
+ "+I[102, user_2, Shanghai, 123567891234]",
+ "+I[103, user_3, Beijing, 123567891234]",
+ "+I[104, user_4, Shanghai, 123567891234]",
+ "+I[105, user_5, Shanghai, 123567891234]",
+ "+I[106, user_6, Beijing, 123567891234]",
+ "+I[107, user_7, Shanghai, 123567891234]",
+ "+I[108, user_8, Shanghai, 123567891234]",
+ "+I[109, user_9, Beijing, 123567891234]",
+ "+I[110, user_10, Shanghai, 123567891234]"
+ };
+
+ List<String> actual =
+ readTableSnapshotSplits(
+ mySqlSplits,
+ statefulTaskContext,
+ mySqlSplits.size(),
+ dataType,
+ snapshotHooks);
+ assertEqualsInAnyOrder(Arrays.asList(expected), actual);
+ }
+
private List<String> readTableSnapshotSplits(
List<MySqlSplit> mySqlSplits,
StatefulTaskContext statefulTaskContext,