This is an automated email from the ASF dual-hosted git repository.

yuxiqian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new a97a5bcb4 [FLINK-39621] Fix incremental-snapshot based sources may get 
stuck in binlog-backfill stage (#4388)
a97a5bcb4 is described below

commit a97a5bcb4f780c79ddf628e77e050b8b3e0fc8db
Author: yuxiqian <[email protected]>
AuthorDate: Mon Jun 1 19:29:22 2026 +0800

    [FLINK-39621] Fix incremental-snapshot based sources may get stuck in 
binlog-backfill stage (#4388)
    
    * [FLINK-39621] Fix incremental-snapshot based sources may get stuck in 
binlog-backfill stage
    
    * Use AtomicInteger for thread-safe reference counting.
---
 .../external/IncrementalSourceStreamFetcher.java   | 16 ++++--
 .../source/fetch/PostgresScanFetchTaskTest.java    | 51 +++++++++++++++++++
 .../reader/fetch/SqlServerScanFetchTaskTest.java   | 59 ++++++++++++++++++++++
 3 files changed, 121 insertions(+), 5 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
index 9ddb41576..937fefe67 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isEndWatermarkEvent;
 
@@ -57,9 +58,9 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
     private final FetchTask.Context taskContext;
     private final ExecutorService executorService;
     private final Set<TableId> pureStreamPhaseTables;
+    private final AtomicInteger numberOfRunningTasks;
 
     private volatile ChangeEventQueue<DataChangeEvent> queue;
-    private volatile boolean currentTaskRunning;
     private volatile Throwable readException;
 
     private FetchTask<SourceSplitBase> streamFetchTask;
@@ -77,10 +78,10 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
         ThreadFactory threadFactory =
                 new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + 
subTaskId).build();
         this.executorService = 
Executors.newSingleThreadExecutor(threadFactory);
-        this.currentTaskRunning = true;
         this.pureStreamPhaseTables = new HashSet<>();
         this.isBackfillSkipped = 
taskContext.getSourceConfig().isSkipSnapshotBackfill();
         this.supportsSplitKeyOptimization = 
taskContext.supportsSplitKeyOptimization();
+        this.numberOfRunningTasks = new AtomicInteger(0);
     }
 
     @Override
@@ -90,6 +91,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
         configureFilter();
         taskContext.configure(currentStreamSplit);
         this.queue = taskContext.getQueue();
+        startReadTask();
         executorService.submit(
                 () -> {
                     try {
@@ -107,7 +109,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
 
     @Override
     public boolean isFinished() {
-        return currentStreamSplit == null || !currentTaskRunning;
+        return currentStreamSplit == null || numberOfRunningTasks.get() == 0;
     }
 
     @Nullable
@@ -116,7 +118,7 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
         checkReadException();
         final List<SourceRecord> sourceRecords = new ArrayList<>();
         // what happens if currentTaskRunning
-        if (currentTaskRunning) {
+        if (numberOfRunningTasks.get() > 0) {
             List<DataChangeEvent> batch = queue.poll();
             for (DataChangeEvent event : batch) {
                 if (isEndWatermarkEvent(event.getRecord())) {
@@ -282,8 +284,12 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
         this.pureStreamPhaseTables.clear();
     }
 
+    public void startReadTask() {
+        this.numberOfRunningTasks.incrementAndGet();
+    }
+
     public void stopReadTask() throws Exception {
-        this.currentTaskRunning = false;
+        this.numberOfRunningTasks.decrementAndGet();
 
         if (taskContext != null) {
             taskContext.close();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
index a40f70296..8406b98bb 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
@@ -243,6 +243,57 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
         assertEqualsInAnyOrder(Arrays.asList(expected), actual);
     }
 
+    @Test
+    void testMultipleSplitsWithBackfill() throws Exception {
+        customDatabase.createAndInitialize();
+
+        TestTableId tableId = new TestTableId(schemaName, tableName);
+        PostgresSourceConfigFactory sourceConfigFactory =
+                getMockPostgresSourceConfigFactory(
+                        customDatabase, schemaName, tableName, null, 4, false);
+        PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
+        PostgresDialect postgresDialect = new 
PostgresDialect(sourceConfigFactory.create(0));
+
+        SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
+        snapshotHooks.setPreHighWatermarkAction(
+                (postgresSourceConfig, split) -> {
+                    try (PostgresConnection conn = 
postgresDialect.openJdbcConnection()) {
+                        conn.execute(
+                                "UPDATE "
+                                        + tableId.toSql()
+                                        + " SET address = 'Beijing' WHERE 
\"Id\" = 103");
+                        conn.commit();
+                    }
+                });
+
+        final DataType dataType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("Id", DataTypes.BIGINT()),
+                        DataTypes.FIELD("Name", DataTypes.STRING()),
+                        DataTypes.FIELD("address", DataTypes.STRING()),
+                        DataTypes.FIELD("phone_number", DataTypes.STRING()));
+
+        PostgresSourceFetchTaskContext postgresSourceFetchTaskContext =
+                new PostgresSourceFetchTaskContext(sourceConfig, 
postgresDialect);
+        List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, 
postgresDialect);
+
+        List<String> actual =
+                readTableSnapshotSplits(
+                        reOrderSnapshotSplits(snapshotSplits),
+                        postgresSourceFetchTaskContext,
+                        snapshotSplits.size(),
+                        dataType,
+                        snapshotHooks);
+
+        // Verify the ScanFetcher can successfully process all splits without 
getting stuck
+        // (the FLINK-39207 bug would cause the reader to appear finished/stuck
+        // when reusing a stopped ScanFetcher for the next split).
+        // The preHighWatermark hook forces backfill phase for each split by 
making
+        // highWatermark > lowWatermark.
+        assertThat(actual).hasSize(21);
+        assertThat(actual).contains("+I[103, user_3, Beijing, 123567891234]");
+    }
+
     @Test
     void testSnapshotFetchSize() throws Exception {
         customDatabase.createAndInitialize();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java
index 09008a3d0..f541cfe43 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java
@@ -202,6 +202,65 @@ class SqlServerScanFetchTaskTest extends 
SqlServerSourceTestBase {
         assertEqualsInAnyOrder(Arrays.asList(expected), actual);
     }
 
+    @Test
+    void testMultipleSplitsWithBackfill() throws Exception {
+        String databaseName = "customer";
+        String tableName = "dbo.customers";
+
+        initializeSqlServerTable(databaseName);
+
+        SqlServerSourceConfigFactory sourceConfigFactory =
+                getConfigFactory(databaseName, new String[] {tableName}, 4);
+        SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
+        SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig);
+
+        String tableId = databaseName + "." + tableName;
+        SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
+        hooks.setPreHighWatermarkAction(
+                (config, split) -> {
+                    executeSql(
+                            (SqlServerSourceConfig) config,
+                            new String[] {
+                                "UPDATE " + tableId + " SET address = 
'Beijing' WHERE id = 103"
+                            });
+                    try {
+                        Thread.sleep(10 * 1000);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+        SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext =
+                new SqlServerSourceFetchTaskContext(
+                        sourceConfig,
+                        sqlServerDialect,
+                        
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
+                        
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()));
+
+        final DataType dataType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("id", DataTypes.BIGINT()),
+                        DataTypes.FIELD("name", DataTypes.STRING()),
+                        DataTypes.FIELD("address", DataTypes.STRING()),
+                        DataTypes.FIELD("phone_number", DataTypes.STRING()));
+        List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, 
sqlServerDialect);
+
+        List<String> actual =
+                readTableSnapshotSplits(
+                        reOrderSnapshotSplits(snapshotSplits),
+                        sqlServerSourceFetchTaskContext,
+                        snapshotSplits.size(),
+                        dataType,
+                        hooks);
+
+        // Verify the ScanFetcher can successfully process all splits without 
getting stuck
+        // (the FLINK-39207 bug would cause the reader to appear finished/stuck
+        // when reusing a stopped ScanFetcher for the next split).
+        // The preHighWatermark hook forces backfill phase for each split by 
making
+        // highWatermark > lowWatermark.
+        Assertions.assertThat(actual).hasSize(21);
+        Assertions.assertThat(actual).contains("+I[103, user_3, Beijing, 
123567891234]");
+    }
+
     @Test
     void testDateTimePrimaryKey() throws Exception {
         String databaseName = "pk";

Reply via email to