This is an automated email from the ASF dual-hosted git repository.

lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new eb3cdc06f [FLINK-38061][mysql] Make defensive copies of collections in 
SnapshotPendingSplitsState to prevent concurrent modification issues (#4379)
eb3cdc06f is described below

commit eb3cdc06f2b32ed1f4c31075ab9f81294cb3b87e
Author: Jia Fan <[email protected]>
AuthorDate: Tue Apr 21 16:35:07 2026 +0800

    [FLINK-38061][mysql] Make defensive copies of collections in 
SnapshotPendingSplitsState to prevent concurrent modification issues (#4379)
---
 .../assigner/state/SnapshotPendingSplitsState.java    | 19 ++++++++++++-------
 .../assigners/state/SnapshotPendingSplitsState.java   | 16 ++++++++++------
 2 files changed, 22 insertions(+), 13 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java
index 367bc792d..ca5e578bd 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java
@@ -26,6 +26,9 @@ import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplit
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -89,17 +92,19 @@ public class SnapshotPendingSplitsState extends 
PendingSplitsState {
             boolean isRemainingTablesCheckpointed,
             Map<String, Long> splitFinishedCheckpointIds,
             ChunkSplitterState chunkSplitterState) {
-        this.alreadyProcessedTables = alreadyProcessedTables;
-        this.remainingSplits = remainingSplits;
-        this.assignedSplits = assignedSplits;
-        this.splitFinishedOffsets = splitFinishedOffsets;
+        // FLINK-38061: make defensive copy to avoid potential concurrent 
modification of the
+        // collections.
+        this.alreadyProcessedTables = new ArrayList<>(alreadyProcessedTables);
+        this.remainingSplits = new ArrayList<>(remainingSplits);
+        this.assignedSplits = new LinkedHashMap<>(assignedSplits);
+        this.splitFinishedOffsets = new HashMap<>(splitFinishedOffsets);
         this.assignerStatus = assignerStatus;
-        this.remainingTables = remainingTables;
+        this.remainingTables = new ArrayList<>(remainingTables);
         this.isTableIdCaseSensitive = isTableIdCaseSensitive;
         this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
-        this.tableSchemas = tableSchemas;
+        this.tableSchemas = new HashMap<>(tableSchemas);
         this.chunkSplitterState = chunkSplitterState;
-        this.splitFinishedCheckpointIds = splitFinishedCheckpointIds;
+        this.splitFinishedCheckpointIds = new 
HashMap<>(splitFinishedCheckpointIds);
     }
 
     public Map<String, Long> getSplitFinishedCheckpointIds() {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
index c49069a60..55df2e7da 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapsho
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges.TableChange;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -84,15 +86,17 @@ public class SnapshotPendingSplitsState extends 
PendingSplitsState {
             boolean isTableIdCaseSensitive,
             boolean isRemainingTablesCheckpointed,
             ChunkSplitterState chunkSplitterState) {
-        this.alreadyProcessedTables = alreadyProcessedTables;
-        this.remainingSplits = remainingSplits;
-        this.assignedSplits = assignedSplits;
-        this.splitFinishedOffsets = splitFinishedOffsets;
+        // FLINK-38061: make defensive copy to avoid potential concurrent 
modification of the
+        // collections.
+        this.alreadyProcessedTables = new ArrayList<>(alreadyProcessedTables);
+        this.remainingSplits = new ArrayList<>(remainingSplits);
+        this.assignedSplits = new LinkedHashMap<>(assignedSplits);
+        this.splitFinishedOffsets = new HashMap<>(splitFinishedOffsets);
         this.assignerStatus = assignerStatus;
-        this.remainingTables = remainingTables;
+        this.remainingTables = new ArrayList<>(remainingTables);
         this.isTableIdCaseSensitive = isTableIdCaseSensitive;
         this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
-        this.tableSchemas = tableSchemas;
+        this.tableSchemas = new HashMap<>(tableSchemas);
         this.chunkSplitterState = chunkSplitterState;
     }
 

Reply via email to