This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bae0e5fb [FLINK-38218] Fix MySQL CDC binlog split metadata split 
transmission (#4087)
8bae0e5fb is described below

commit 8bae0e5fb0b70ec8a1cbf439b9c435c46190cb21
Author: Sergei Morozov <[email protected]>
AuthorDate: Fri Mar 20 03:09:41 2026 -0700

    [FLINK-38218] Fix MySQL CDC binlog split metadata split transmission (#4087)
---
 .../source/assigners/MySqlHybridSplitAssigner.java |  6 +-
 .../assigners/MySqlSnapshotSplitAssigner.java      | 31 ++++---
 .../state/PendingSplitsStateSerializer.java        | 15 ++--
 .../state/SnapshotPendingSplitsState.java          |  7 +-
 .../mysql/source/reader/MySqlSourceReader.java     | 49 ++++-------
 .../mysql/source/split/MySqlBinlogSplit.java       | 38 +++++++--
 .../assigners/MySqlHybridSplitAssignerTest.java    |  3 +-
 .../assigners/MySqlSnapshotSplitAssignerTest.java  | 95 +++++++++++++++++++++-
 .../state/PendingSplitsStateSerializerTest.java    |  4 +-
 .../mysql/source/split/MySqlBinlogSplitTest.java   | 26 ++++++
 10 files changed, 203 insertions(+), 71 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java
index d66e6f3da..4a25491b2 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java
@@ -33,12 +33,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
  * A {@link MySqlSplitAssigner} that splits tables into small chunk splits 
based on primary key
@@ -208,9 +206,7 @@ public class MySqlHybridSplitAssigner implements 
MySqlSplitAssigner {
 
     private MySqlBinlogSplit createBinlogSplit() {
         final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
-                snapshotSplitAssigner.getAssignedSplits().values().stream()
-                        .sorted(Comparator.comparing(MySqlSplit::splitId))
-                        .collect(Collectors.toList());
+                new 
ArrayList<>(snapshotSplitAssigner.getAssignedSplits().values());
 
         Map<String, BinlogOffset> splitFinishedOffsets =
                 snapshotSplitAssigner.getSplitFinishedOffsets();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index b49e8f7cb..f51df018d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
 import 
org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools;
 import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
+import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader;
 import 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
 import 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
@@ -73,7 +74,21 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
 
     private final List<TableId> alreadyProcessedTables;
     private final List<MySqlSchemalessSnapshotSplit> remainingSplits;
-    private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;
+
+    /**
+     * The splits that have been assigned to a reader. Once a split is 
finished, it remains in this
+     * map. An entry added to {@link #splitFinishedOffsets} indicates that the 
split has been
+     * finished. If reading the split fails, it is removed from this map.
+     *
+     * <p>{@link MySqlSourceReader} relies on the order of elements within the 
map:
+     *
+     * <ol>
+     *   <li>It must correspond to the order of assignment of the splits to 
readers.
+     *   <li>The order must be retained across job restarts.
+     * </ol>
+     */
+    private final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> 
assignedSplits;
+
     private final Map<TableId, TableChanges.TableChange> tableSchemas;
     private final Map<String, BinlogOffset> splitFinishedOffsets;
     private final MySqlSourceConfig sourceConfig;
@@ -141,7 +156,7 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
             int currentParallelism,
             List<TableId> alreadyProcessedTables,
             List<MySqlSchemalessSnapshotSplit> remainingSplits,
-            Map<String, MySqlSchemalessSnapshotSplit> assignedSplits,
+            LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits,
             Map<TableId, TableChanges.TableChange> tableSchemas,
             Map<String, BinlogOffset> splitFinishedOffsets,
             AssignerStatus assignerStatus,
@@ -154,17 +169,7 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
         this.currentParallelism = currentParallelism;
         this.alreadyProcessedTables = alreadyProcessedTables;
         this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits);
-        // When job restore from savepoint, sort the existing tables and newly 
added tables
-        // to let enumerator only send newly added tables' BinlogSplitMetaEvent
-        this.assignedSplits =
-                assignedSplits.entrySet().stream()
-                        .sorted(Entry.comparingByKey())
-                        .collect(
-                                Collectors.toMap(
-                                        Entry::getKey,
-                                        Entry::getValue,
-                                        (o, o2) -> o,
-                                        LinkedHashMap::new));
+        this.assignedSplits = assignedSplits;
         this.tableSchemas = tableSchemas;
         this.splitFinishedOffsets = splitFinishedOffsets;
         this.assignerStatus = assignerStatus;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
index bb3e2d2ed..0df1ee979 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
@@ -35,6 +35,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -196,12 +197,12 @@ public class PendingSplitsStateSerializer implements 
SimpleVersionedSerializer<P
             int splitVersion, DataInputDeserializer in) throws IOException {
         List<TableId> alreadyProcessedTables = readTableIds(in);
         List<MySqlSnapshotSplit> remainingSplits = 
readMySqlSnapshotSplits(splitVersion, in);
-        Map<String, MySqlSnapshotSplit> assignedSnapshotSplits =
+        LinkedHashMap<String, MySqlSnapshotSplit> assignedSnapshotSplits =
                 readAssignedSnapshotSplits(splitVersion, in);
 
         final List<MySqlSchemalessSnapshotSplit> remainingSchemalessSplits = 
new ArrayList<>();
-        final Map<String, MySqlSchemalessSnapshotSplit> 
assignedSchemalessSnapshotSplits =
-                new HashMap<>();
+        final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> 
assignedSchemalessSnapshotSplits =
+                new LinkedHashMap<>();
         final Map<TableId, TableChanges.TableChange> tableSchemas = new 
HashMap<>();
         remainingSplits.forEach(
                 split -> {
@@ -267,8 +268,8 @@ public class PendingSplitsStateSerializer implements 
SimpleVersionedSerializer<P
         List<TableId> remainingTableIds = readTableIds(in);
         boolean isTableIdCaseSensitive = in.readBoolean();
         final List<MySqlSchemalessSnapshotSplit> remainingSchemalessSplits = 
new ArrayList<>();
-        final Map<String, MySqlSchemalessSnapshotSplit> 
assignedSchemalessSnapshotSplits =
-                new HashMap<>();
+        final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> 
assignedSchemalessSnapshotSplits =
+                new LinkedHashMap<>();
         final Map<TableId, TableChanges.TableChange> tableSchemas = new 
HashMap<>();
         remainingSplits.forEach(
                 split -> {
@@ -368,9 +369,9 @@ public class PendingSplitsStateSerializer implements 
SimpleVersionedSerializer<P
         }
     }
 
-    private Map<String, MySqlSnapshotSplit> readAssignedSnapshotSplits(
+    private LinkedHashMap<String, MySqlSnapshotSplit> 
readAssignedSnapshotSplits(
             int splitVersion, DataInputDeserializer in) throws IOException {
-        Map<String, MySqlSnapshotSplit> assignedSplits = new HashMap<>();
+        LinkedHashMap<String, MySqlSnapshotSplit> assignedSplits = new 
LinkedHashMap<>();
         final int size = in.readInt();
         for (int i = 0; i < size; i++) {
             String splitId = in.readUTF();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
index 39aa71ad0..c49069a60 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapsho
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges.TableChange;
 
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -50,7 +51,7 @@ public class SnapshotPendingSplitsState extends 
PendingSplitsState {
      * The snapshot splits that the {@link MySqlSourceEnumerator} has assigned 
to {@link
      * MySqlSplitReader}s.
      */
-    private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;
+    private final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> 
assignedSplits;
 
     /**
      * The offsets of finished (snapshot) splits that the {@link 
MySqlSourceEnumerator} has received
@@ -75,7 +76,7 @@ public class SnapshotPendingSplitsState extends 
PendingSplitsState {
     public SnapshotPendingSplitsState(
             List<TableId> alreadyProcessedTables,
             List<MySqlSchemalessSnapshotSplit> remainingSplits,
-            Map<String, MySqlSchemalessSnapshotSplit> assignedSplits,
+            LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits,
             Map<TableId, TableChange> tableSchemas,
             Map<String, BinlogOffset> splitFinishedOffsets,
             AssignerStatus assignerStatus,
@@ -103,7 +104,7 @@ public class SnapshotPendingSplitsState extends 
PendingSplitsState {
         return remainingSplits;
     }
 
-    public Map<String, MySqlSchemalessSnapshotSplit> getAssignedSplits() {
+    public LinkedHashMap<String, MySqlSchemalessSnapshotSplit> 
getAssignedSplits() {
         return assignedSplits;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
index a4f32a375..b3f421cee 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
@@ -59,10 +59,8 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -419,15 +417,24 @@ public class MySqlSourceReader<T>
                                 binlogSplit, receivedTotalFinishedSplitSize);
                 uncompletedBinlogSplits.put(binlogSplit.splitId(), 
binlogSplit);
             } else if (receivedMetaGroupId == expectedMetaGroupId) {
-                List<FinishedSnapshotSplitInfo> newAddedMetadataGroup;
-                Set<String> existedSplitsOfLastGroup =
-                        getExistedSplitsOfLastGroup(
-                                binlogSplit.getFinishedSnapshotSplitInfos(),
-                                sourceConfig.getSplitMetaGroupSize());
-                newAddedMetadataGroup =
-                        metadataEvent.getMetaGroup().stream()
+                int expectedNumberOfAlreadyRetrievedElements =
+                        binlogSplit.getFinishedSnapshotSplitInfos().size()
+                                % sourceConfig.getSplitMetaGroupSize();
+                List<byte[]> metaGroup = metadataEvent.getMetaGroup();
+                if (expectedNumberOfAlreadyRetrievedElements > 0) {
+                    LOG.info(
+                            "Source reader {} is discarding the first {} out 
of {} elements of meta group {}.",
+                            subtaskId,
+                            expectedNumberOfAlreadyRetrievedElements,
+                            metaGroup.size(),
+                            receivedMetaGroupId);
+                    metaGroup =
+                            metaGroup.subList(
+                                    expectedNumberOfAlreadyRetrievedElements, 
metaGroup.size());
+                }
+                List<FinishedSnapshotSplitInfo> newAddedMetadataGroup =
+                        metaGroup.stream()
                                 .map(FinishedSnapshotSplitInfo::deserialize)
-                                .filter(r -> 
!existedSplitsOfLastGroup.contains(r.getSplitId()))
                                 .collect(Collectors.toList());
 
                 uncompletedBinlogSplits.put(
@@ -497,28 +504,6 @@ public class MySqlSourceReader<T>
         }
     }
 
-    private Set<String> getExistedSplitsOfLastGroup(
-            List<FinishedSnapshotSplitInfo> finishedSnapshotSplits, int 
metaGroupSize) {
-        int splitsNumOfLastGroup =
-                finishedSnapshotSplits.size() % 
sourceConfig.getSplitMetaGroupSize();
-        if (splitsNumOfLastGroup != 0) {
-            int lastGroupStart =
-                    ((int) (finishedSnapshotSplits.size() / 
sourceConfig.getSplitMetaGroupSize()))
-                            * metaGroupSize;
-            // Keep same order with 
MySqlHybridSplitAssigner.createBinlogSplit() to avoid
-            // 'invalid request meta group id' error
-            List<String> sortedFinishedSnapshotSplits =
-                    finishedSnapshotSplits.stream()
-                            .map(FinishedSnapshotSplitInfo::getSplitId)
-                            .sorted()
-                            .collect(Collectors.toList());
-            return new HashSet<>(
-                    sortedFinishedSnapshotSplits.subList(
-                            lastGroupStart, lastGroupStart + 
splitsNumOfLastGroup));
-        }
-        return new HashSet<>();
-    }
-
     private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long 
checkpointId) {
         if (!LOG.isInfoEnabled()) {
             return;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
index fba3abcce..47cb9f23b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -42,7 +43,10 @@ public class MySqlBinlogSplit extends MySqlSplit {
 
     private final BinlogOffset startingOffset;
     private final BinlogOffset endingOffset;
+
+    /** Split IDs of all elements must be unique. */
     private final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos;
+
     private final Map<TableId, TableChange> tableSchemas;
     private final int totalFinishedSplitSize;
     private final boolean isSuspended;
@@ -58,6 +62,9 @@ public class MySqlBinlogSplit extends MySqlSplit {
             int totalFinishedSplitSize,
             boolean isSuspended) {
         super(splitId);
+
+        ensureNoDuplicates(finishedSnapshotSplitInfos);
+
         this.startingOffset = startingOffset;
         this.endingOffset = endingOffset;
         this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos;
@@ -74,14 +81,29 @@ public class MySqlBinlogSplit extends MySqlSplit {
             List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos,
             Map<TableId, TableChange> tableSchemas,
             int totalFinishedSplitSize) {
-        super(splitId);
-        this.startingOffset = startingOffset;
-        this.endingOffset = endingOffset;
-        this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos;
-        this.tableSchemas = tableSchemas;
-        this.totalFinishedSplitSize = totalFinishedSplitSize;
-        this.isSuspended = false;
-        this.tablesForLog = getTablesForLog();
+        this(
+                splitId,
+                startingOffset,
+                endingOffset,
+                finishedSnapshotSplitInfos,
+                tableSchemas,
+                totalFinishedSplitSize,
+                false);
+    }
+
+    private static void ensureNoDuplicates(
+            List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos) {
+        Set<String> seenSplitIds = new HashSet<>();
+        for (FinishedSnapshotSplitInfo splitInfo : finishedSnapshotSplitInfos) 
{
+            if (seenSplitIds.contains(splitInfo.getSplitId())) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Found duplicate split ID %s in finished 
snapshot split infos",
+                                splitInfo.getSplitId()));
+            }
+
+            seenSplitIds.add(splitInfo.getSplitId());
+        }
     }
 
     public BinlogOffset getStartingOffset() {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java
index 0b02fd590..605b848c3 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java
@@ -47,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -81,7 +82,7 @@ class MySqlHybridSplitAssignerTest extends 
MySqlSourceTestBase {
         List<TableId> alreadyProcessedTables = Lists.newArrayList(tableId);
         List<MySqlSchemalessSnapshotSplit> remainingSplits = new ArrayList<>();
 
-        Map<String, MySqlSchemalessSnapshotSplit> assignedSplits = new 
HashMap<>();
+        LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits = 
new LinkedHashMap<>();
         Map<String, BinlogOffset> splitFinishedOffsets = new HashMap<>();
 
         for (int i = 0; i < 5; i++) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
index d97555593..ca5dff5bd 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
@@ -36,16 +36,23 @@ import io.debezium.relational.TableId;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.annotation.Nullable;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
 import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
@@ -527,6 +534,92 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
         assertThat(splits).isEqualTo(expected);
     }
 
+    @ParameterizedTest
+    @MethodSource
+    void testFinishedSnapshotSplitInfosAreInOrderOfAssignment(
+            String table1Name, String table2Name) {
+        List<String> tableNames = new ArrayList<>();
+        tableNames.add(table1Name);
+
+        SnapshotPendingSplitsState state;
+
+        try (MySqlSnapshotSplitAssigner assigner = createAssigner(tableNames, 
null)) {
+            state = processAllSplitsAndSnapshotState(assigner, 1L);
+        }
+
+        tableNames.add(table2Name);
+
+        try (MySqlSnapshotSplitAssigner assigner = createAssigner(tableNames, 
state)) {
+            state = processAllSplitsAndSnapshotState(assigner, 2L);
+        }
+
+        try (MySqlSnapshotSplitAssigner assigner = createAssigner(tableNames, 
state)) {
+            List<String> finishedSnapshotSplitTableNames =
+                    assigner.getFinishedSplitInfos().stream()
+                            .map(i -> i.getTableId().table())
+                            .collect(Collectors.toList());
+
+            assertThat(finishedSnapshotSplitTableNames).isEqualTo(tableNames);
+        }
+    }
+
+    /**
+     * Use various combinations of table names to ensure that the finished 
snapshot split infos are
+     * in the order of assignment, not the order of table names.
+     */
+    public static Stream<Arguments> 
testFinishedSnapshotSplitInfosAreInOrderOfAssignment() {
+        String table1Name = "customers";
+        String table2Name = "customers_1";
+
+        return Stream.of(
+                Arguments.of(table1Name, table2Name), Arguments.of(table2Name, 
table1Name));
+    }
+
+    private MySqlSnapshotSplitAssigner createAssigner(
+            List<String> tableNames, @Nullable SnapshotPendingSplitsState 
state) {
+        int currentParallelism = 1;
+
+        if (state == null) {
+            return new MySqlSnapshotSplitAssigner(
+                    createConfiguration(tableNames),
+                    currentParallelism,
+                    new ArrayList<>(),
+                    true,
+                    getMySqlSplitEnumeratorContext());
+        }
+
+        return new MySqlSnapshotSplitAssigner(
+                createConfiguration(tableNames),
+                currentParallelism,
+                state,
+                getMySqlSplitEnumeratorContext());
+    }
+
+    private MySqlSourceConfig createConfiguration(List<String> tableNames) {
+        return getConfig(
+                customerDatabase,
+                Integer.MAX_VALUE,
+                CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+                tableNames.toArray(new String[0]),
+                "id",
+                true,
+                false);
+    }
+
+    private SnapshotPendingSplitsState processAllSplitsAndSnapshotState(
+            MySqlSnapshotSplitAssigner assigner, long checkpointId) {
+        assigner.open();
+
+        Optional<MySqlSplit> optional;
+        while ((optional = assigner.getNext()).isPresent()) {
+            assigner.onFinishedSplits(
+                    Collections.singletonMap(optional.get().splitId(), 
BinlogOffset.ofLatest()));
+        }
+
+        return assigner.snapshotState(checkpointId);
+    }
+
     private List<String> getTestAssignSnapshotSplits(
             int splitSize,
             double distributionFactorUpper,
@@ -647,7 +740,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                                 null,
                                 null));
 
-        Map<String, MySqlSchemalessSnapshotSplit> assignedSplits = new 
HashMap<>();
+        LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits = 
new LinkedHashMap<>();
         assignedSplits.put(
                 processedTable + ":0",
                 new MySqlSchemalessSnapshotSplit(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
index 3d813b8a8..60abb56ff 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -147,7 +148,8 @@ class PendingSplitsStateSerializerTest {
         remainingSplits.add(getTestSchemalessSnapshotSplit(tableId1, 2));
         remainingSplits.add(getTestSchemalessSnapshotSplit(tableId1, 3));
 
-        final Map<String, MySqlSchemalessSnapshotSplit> assignedSnapshotSplits 
= new HashMap<>();
+        final LinkedHashMap<String, MySqlSchemalessSnapshotSplit> 
assignedSnapshotSplits =
+                new LinkedHashMap<>();
         Arrays.asList(
                         getTestSchemalessSnapshotSplit(tableId0, 0),
                         getTestSchemalessSnapshotSplit(tableId0, 1),
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java
index f26c3a0b8..ff310d367 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java
@@ -148,6 +148,32 @@ class MySqlBinlogSplitTest {
         
Assertions.assertThat(binlogSplit.getTables()).isEqualTo(expectedTables);
     }
 
+    @Test
+    public void duplicateSplitInfo() {
+        FinishedSnapshotSplitInfo info =
+                new FinishedSnapshotSplitInfo(
+                        new TableId("catalog", "schema", "table"),
+                        "split",
+                        null,
+                        null,
+                        BinlogOffset.ofLatest());
+        List<FinishedSnapshotSplitInfo> infos = new ArrayList<>();
+        infos.add(info);
+        infos.add(info);
+
+        Assertions.assertThatThrownBy(
+                        () ->
+                                new MySqlBinlogSplit(
+                                        "binlog-split",
+                                        BinlogOffset.ofLatest(),
+                                        null,
+                                        infos,
+                                        Collections.emptyMap(),
+                                        0,
+                                        false))
+                .isExactlyInstanceOf(IllegalArgumentException.class);
+    }
+
     /** A mock implementation for {@link Table} which is used for unit tests. 
*/
     private static class MockTable implements Table {
         private final TableId tableId;

Reply via email to