This is an automated email from the ASF dual-hosted git repository.
lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new eb3cdc06f [FLINK-38061][mysql] Make defensive copies of collections in
SnapshotPendingSplitsState to prevent concurrent modification issues (#4379)
eb3cdc06f is described below
commit eb3cdc06f2b32ed1f4c31075ab9f81294cb3b87e
Author: Jia Fan <[email protected]>
AuthorDate: Tue Apr 21 16:35:07 2026 +0800
[FLINK-38061][mysql] Make defensive copies of collections in
SnapshotPendingSplitsState to prevent concurrent modification issues (#4379)
---
.../assigner/state/SnapshotPendingSplitsState.java | 19 ++++++++++++-------
.../assigners/state/SnapshotPendingSplitsState.java | 16 ++++++++++------
2 files changed, 22 insertions(+), 13 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java
index 367bc792d..ca5e578bd 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java
@@ -26,6 +26,9 @@ import
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplit
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -89,17 +92,19 @@ public class SnapshotPendingSplitsState extends
PendingSplitsState {
boolean isRemainingTablesCheckpointed,
Map<String, Long> splitFinishedCheckpointIds,
ChunkSplitterState chunkSplitterState) {
- this.alreadyProcessedTables = alreadyProcessedTables;
- this.remainingSplits = remainingSplits;
- this.assignedSplits = assignedSplits;
- this.splitFinishedOffsets = splitFinishedOffsets;
+ // FLINK-38061: make defensive copy to avoid potential concurrent
modification of the
+ // collections.
+ this.alreadyProcessedTables = new ArrayList<>(alreadyProcessedTables);
+ this.remainingSplits = new ArrayList<>(remainingSplits);
+ this.assignedSplits = new LinkedHashMap<>(assignedSplits);
+ this.splitFinishedOffsets = new HashMap<>(splitFinishedOffsets);
this.assignerStatus = assignerStatus;
- this.remainingTables = remainingTables;
+ this.remainingTables = new ArrayList<>(remainingTables);
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
- this.tableSchemas = tableSchemas;
+ this.tableSchemas = new HashMap<>(tableSchemas);
this.chunkSplitterState = chunkSplitterState;
- this.splitFinishedCheckpointIds = splitFinishedCheckpointIds;
+ this.splitFinishedCheckpointIds = new
HashMap<>(splitFinishedCheckpointIds);
}
public Map<String, Long> getSplitFinishedCheckpointIds() {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
index c49069a60..55df2e7da 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
@@ -27,6 +27,8 @@ import
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapsho
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -84,15 +86,17 @@ public class SnapshotPendingSplitsState extends
PendingSplitsState {
boolean isTableIdCaseSensitive,
boolean isRemainingTablesCheckpointed,
ChunkSplitterState chunkSplitterState) {
- this.alreadyProcessedTables = alreadyProcessedTables;
- this.remainingSplits = remainingSplits;
- this.assignedSplits = assignedSplits;
- this.splitFinishedOffsets = splitFinishedOffsets;
+ // FLINK-38061: make defensive copy to avoid potential concurrent
modification of the
+ // collections.
+ this.alreadyProcessedTables = new ArrayList<>(alreadyProcessedTables);
+ this.remainingSplits = new ArrayList<>(remainingSplits);
+ this.assignedSplits = new LinkedHashMap<>(assignedSplits);
+ this.splitFinishedOffsets = new HashMap<>(splitFinishedOffsets);
this.assignerStatus = assignerStatus;
- this.remainingTables = remainingTables;
+ this.remainingTables = new ArrayList<>(remainingTables);
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
- this.tableSchemas = tableSchemas;
+ this.tableSchemas = new HashMap<>(tableSchemas);
this.chunkSplitterState = chunkSplitterState;
}