This is an automated email from the ASF dual-hosted git repository.
yuxiqian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new a97a5bcb4 [FLINK-39621] Fix incremental-snapshot based sources may get
stuck in binlog-backfill stage (#4388)
a97a5bcb4 is described below
commit a97a5bcb4f780c79ddf628e77e050b8b3e0fc8db
Author: yuxiqian <[email protected]>
AuthorDate: Mon Jun 1 19:29:22 2026 +0800
[FLINK-39621] Fix incremental-snapshot based sources may get stuck in
binlog-backfill stage (#4388)
* [FLINK-39621] Fix incremental-snapshot based sources may get stuck in
binlog-backfill stage
* Use AtomicInteger for thread-safe reference counting.
---
.../external/IncrementalSourceStreamFetcher.java | 16 ++++--
.../source/fetch/PostgresScanFetchTaskTest.java | 51 +++++++++++++++++++
.../reader/fetch/SqlServerScanFetchTaskTest.java | 59 ++++++++++++++++++++++
3 files changed, 121 insertions(+), 5 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 9ddb41576..937fefe67 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isEndWatermarkEvent;
@@ -57,9 +58,9 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
private final FetchTask.Context taskContext;
private final ExecutorService executorService;
private final Set<TableId> pureStreamPhaseTables;
+ private final AtomicInteger numberOfRunningTasks;
private volatile ChangeEventQueue<DataChangeEvent> queue;
- private volatile boolean currentTaskRunning;
private volatile Throwable readException;
private FetchTask<SourceSplitBase> streamFetchTask;
@@ -77,10 +78,10 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" +
subTaskId).build();
this.executorService =
Executors.newSingleThreadExecutor(threadFactory);
- this.currentTaskRunning = true;
this.pureStreamPhaseTables = new HashSet<>();
this.isBackfillSkipped =
taskContext.getSourceConfig().isSkipSnapshotBackfill();
this.supportsSplitKeyOptimization =
taskContext.supportsSplitKeyOptimization();
+ this.numberOfRunningTasks = new AtomicInteger(0);
}
@Override
@@ -90,6 +91,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
configureFilter();
taskContext.configure(currentStreamSplit);
this.queue = taskContext.getQueue();
+ startReadTask();
executorService.submit(
() -> {
try {
@@ -107,7 +109,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
@Override
public boolean isFinished() {
- return currentStreamSplit == null || !currentTaskRunning;
+ return currentStreamSplit == null || numberOfRunningTasks.get() == 0;
}
@Nullable
@@ -116,7 +118,7 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
checkReadException();
final List<SourceRecord> sourceRecords = new ArrayList<>();
// what happens if currentTaskRunning
- if (currentTaskRunning) {
+ if (numberOfRunningTasks.get() > 0) {
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
if (isEndWatermarkEvent(event.getRecord())) {
@@ -282,8 +284,12 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
this.pureStreamPhaseTables.clear();
}
+ public void startReadTask() {
+ this.numberOfRunningTasks.incrementAndGet();
+ }
+
public void stopReadTask() throws Exception {
- this.currentTaskRunning = false;
+ this.numberOfRunningTasks.decrementAndGet();
if (taskContext != null) {
taskContext.close();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
index a40f70296..8406b98bb 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
@@ -243,6 +243,57 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
+ @Test
+ void testMultipleSplitsWithBackfill() throws Exception {
+ customDatabase.createAndInitialize();
+
+ TestTableId tableId = new TestTableId(schemaName, tableName);
+ PostgresSourceConfigFactory sourceConfigFactory =
+ getMockPostgresSourceConfigFactory(
+ customDatabase, schemaName, tableName, null, 4, false);
+ PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
+ PostgresDialect postgresDialect = new
PostgresDialect(sourceConfigFactory.create(0));
+
+ SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
+ snapshotHooks.setPreHighWatermarkAction(
+ (postgresSourceConfig, split) -> {
+ try (PostgresConnection conn =
postgresDialect.openJdbcConnection()) {
+ conn.execute(
+ "UPDATE "
+ + tableId.toSql()
+ + " SET address = 'Beijing' WHERE
\"Id\" = 103");
+ conn.commit();
+ }
+ });
+
+ final DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("Id", DataTypes.BIGINT()),
+ DataTypes.FIELD("Name", DataTypes.STRING()),
+ DataTypes.FIELD("address", DataTypes.STRING()),
+ DataTypes.FIELD("phone_number", DataTypes.STRING()));
+
+ PostgresSourceFetchTaskContext postgresSourceFetchTaskContext =
+ new PostgresSourceFetchTaskContext(sourceConfig,
postgresDialect);
+ List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig,
postgresDialect);
+
+ List<String> actual =
+ readTableSnapshotSplits(
+ reOrderSnapshotSplits(snapshotSplits),
+ postgresSourceFetchTaskContext,
+ snapshotSplits.size(),
+ dataType,
+ snapshotHooks);
+
+ // Verify the ScanFetcher can successfully process all splits without
getting stuck
+ // (the FLINK-39207 bug would cause the reader to appear finished/stuck
+ // when reusing a stopped ScanFetcher for the next split).
+ // The preHighWatermark hook forces backfill phase for each split by
making
+ // highWatermark > lowWatermark.
+ assertThat(actual).hasSize(21);
+ assertThat(actual).contains("+I[103, user_3, Beijing, 123567891234]");
+ }
+
@Test
void testSnapshotFetchSize() throws Exception {
customDatabase.createAndInitialize();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java
index 09008a3d0..f541cfe43 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java
@@ -202,6 +202,65 @@ class SqlServerScanFetchTaskTest extends
SqlServerSourceTestBase {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
+ @Test
+ void testMultipleSplitsWithBackfill() throws Exception {
+ String databaseName = "customer";
+ String tableName = "dbo.customers";
+
+ initializeSqlServerTable(databaseName);
+
+ SqlServerSourceConfigFactory sourceConfigFactory =
+ getConfigFactory(databaseName, new String[] {tableName}, 4);
+ SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
+ SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig);
+
+ String tableId = databaseName + "." + tableName;
+ SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
+ hooks.setPreHighWatermarkAction(
+ (config, split) -> {
+ executeSql(
+ (SqlServerSourceConfig) config,
+ new String[] {
+ "UPDATE " + tableId + " SET address =
'Beijing' WHERE id = 103"
+ });
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext =
+ new SqlServerSourceFetchTaskContext(
+ sourceConfig,
+ sqlServerDialect,
+
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
+
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()));
+
+ final DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.BIGINT()),
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("address", DataTypes.STRING()),
+ DataTypes.FIELD("phone_number", DataTypes.STRING()));
+ List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig,
sqlServerDialect);
+
+ List<String> actual =
+ readTableSnapshotSplits(
+ reOrderSnapshotSplits(snapshotSplits),
+ sqlServerSourceFetchTaskContext,
+ snapshotSplits.size(),
+ dataType,
+ hooks);
+
+ // Verify the ScanFetcher can successfully process all splits without
getting stuck
+ // (the FLINK-39207 bug would cause the reader to appear finished/stuck
+ // when reusing a stopped ScanFetcher for the next split).
+ // The preHighWatermark hook forces backfill phase for each split by
making
+ // highWatermark > lowWatermark.
+ Assertions.assertThat(actual).hasSize(21);
+ Assertions.assertThat(actual).contains("+I[103, user_3, Beijing,
123567891234]");
+ }
+
@Test
void testDateTimePrimaryKey() throws Exception {
String databaseName = "pk";