This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 8bae0e5fb [FLINK-38218] Fix MySQL CDC binlog split metadata split
transmission (#4087)
8bae0e5fb is described below
commit 8bae0e5fb0b70ec8a1cbf439b9c435c46190cb21
Author: Sergei Morozov <[email protected]>
AuthorDate: Fri Mar 20 03:09:41 2026 -0700
[FLINK-38218] Fix MySQL CDC binlog split metadata split transmission (#4087)
---
.../source/assigners/MySqlHybridSplitAssigner.java | 6 +-
.../assigners/MySqlSnapshotSplitAssigner.java | 31 ++++---
.../state/PendingSplitsStateSerializer.java | 15 ++--
.../state/SnapshotPendingSplitsState.java | 7 +-
.../mysql/source/reader/MySqlSourceReader.java | 49 ++++-------
.../mysql/source/split/MySqlBinlogSplit.java | 38 +++++++--
.../assigners/MySqlHybridSplitAssignerTest.java | 3 +-
.../assigners/MySqlSnapshotSplitAssignerTest.java | 95 +++++++++++++++++++++-
.../state/PendingSplitsStateSerializerTest.java | 4 +-
.../mysql/source/split/MySqlBinlogSplitTest.java | 26 ++++++
10 files changed, 203 insertions(+), 71 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java
index d66e6f3da..4a25491b2 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java
@@ -33,12 +33,10 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
* A {@link MySqlSplitAssigner} that splits tables into small chunk splits
based on primary key
@@ -208,9 +206,7 @@ public class MySqlHybridSplitAssigner implements
MySqlSplitAssigner {
private MySqlBinlogSplit createBinlogSplit() {
final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
- snapshotSplitAssigner.getAssignedSplits().values().stream()
- .sorted(Comparator.comparing(MySqlSplit::splitId))
- .collect(Collectors.toList());
+ new
ArrayList<>(snapshotSplitAssigner.getAssignedSplits().values());
Map<String, BinlogOffset> splitFinishedOffsets =
snapshotSplitAssigner.getSplitFinishedOffsets();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index b49e8f7cb..f51df018d 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -26,6 +26,7 @@ import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import
org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
+import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
@@ -73,7 +74,21 @@ public class MySqlSnapshotSplitAssigner implements
MySqlSplitAssigner {
private final List<TableId> alreadyProcessedTables;
private final List<MySqlSchemalessSnapshotSplit> remainingSplits;
- private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;
+
+ /**
+ * The splits that have been assigned to a reader. Once a split is
finished, it remains in this
+ * map. An entry added to {@link #splitFinishedOffsets} indicates that the
split has been
+ * finished. If reading the split fails, it is removed from this map.
+ *
+ * <p>{@link MySqlSourceReader} relies on the order of elements within the
map:
+ *
+ * <ol>
+ * <li>It must correspond to the order of assignment of the splits to
readers.
+ * <li>The order must be retained across job restarts.
+ * </ol>
+ */
+ private final LinkedHashMap<String, MySqlSchemalessSnapshotSplit>
assignedSplits;
+
private final Map<TableId, TableChanges.TableChange> tableSchemas;
private final Map<String, BinlogOffset> splitFinishedOffsets;
private final MySqlSourceConfig sourceConfig;
@@ -141,7 +156,7 @@ public class MySqlSnapshotSplitAssigner implements
MySqlSplitAssigner {
int currentParallelism,
List<TableId> alreadyProcessedTables,
List<MySqlSchemalessSnapshotSplit> remainingSplits,
- Map<String, MySqlSchemalessSnapshotSplit> assignedSplits,
+ LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits,
Map<TableId, TableChanges.TableChange> tableSchemas,
Map<String, BinlogOffset> splitFinishedOffsets,
AssignerStatus assignerStatus,
@@ -154,17 +169,7 @@ public class MySqlSnapshotSplitAssigner implements
MySqlSplitAssigner {
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits);
- // When job restore from savepoint, sort the existing tables and newly
added tables
- // to let enumerator only send newly added tables' BinlogSplitMetaEvent
- this.assignedSplits =
- assignedSplits.entrySet().stream()
- .sorted(Entry.comparingByKey())
- .collect(
- Collectors.toMap(
- Entry::getKey,
- Entry::getValue,
- (o, o2) -> o,
- LinkedHashMap::new));
+ this.assignedSplits = assignedSplits;
this.tableSchemas = tableSchemas;
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerStatus = assignerStatus;
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
index bb3e2d2ed..0df1ee979 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java
@@ -35,6 +35,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -196,12 +197,12 @@ public class PendingSplitsStateSerializer implements
SimpleVersionedSerializer<P
int splitVersion, DataInputDeserializer in) throws IOException {
List<TableId> alreadyProcessedTables = readTableIds(in);
List<MySqlSnapshotSplit> remainingSplits =
readMySqlSnapshotSplits(splitVersion, in);
- Map<String, MySqlSnapshotSplit> assignedSnapshotSplits =
+ LinkedHashMap<String, MySqlSnapshotSplit> assignedSnapshotSplits =
readAssignedSnapshotSplits(splitVersion, in);
final List<MySqlSchemalessSnapshotSplit> remainingSchemalessSplits =
new ArrayList<>();
- final Map<String, MySqlSchemalessSnapshotSplit>
assignedSchemalessSnapshotSplits =
- new HashMap<>();
+ final LinkedHashMap<String, MySqlSchemalessSnapshotSplit>
assignedSchemalessSnapshotSplits =
+ new LinkedHashMap<>();
final Map<TableId, TableChanges.TableChange> tableSchemas = new
HashMap<>();
remainingSplits.forEach(
split -> {
@@ -267,8 +268,8 @@ public class PendingSplitsStateSerializer implements
SimpleVersionedSerializer<P
List<TableId> remainingTableIds = readTableIds(in);
boolean isTableIdCaseSensitive = in.readBoolean();
final List<MySqlSchemalessSnapshotSplit> remainingSchemalessSplits =
new ArrayList<>();
- final Map<String, MySqlSchemalessSnapshotSplit>
assignedSchemalessSnapshotSplits =
- new HashMap<>();
+ final LinkedHashMap<String, MySqlSchemalessSnapshotSplit>
assignedSchemalessSnapshotSplits =
+ new LinkedHashMap<>();
final Map<TableId, TableChanges.TableChange> tableSchemas = new
HashMap<>();
remainingSplits.forEach(
split -> {
@@ -368,9 +369,9 @@ public class PendingSplitsStateSerializer implements
SimpleVersionedSerializer<P
}
}
- private Map<String, MySqlSnapshotSplit> readAssignedSnapshotSplits(
+ private LinkedHashMap<String, MySqlSnapshotSplit>
readAssignedSnapshotSplits(
int splitVersion, DataInputDeserializer in) throws IOException {
- Map<String, MySqlSnapshotSplit> assignedSplits = new HashMap<>();
+ LinkedHashMap<String, MySqlSnapshotSplit> assignedSplits = new
LinkedHashMap<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
String splitId = in.readUTF();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
index 39aa71ad0..c49069a60 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java
@@ -27,6 +27,7 @@ import
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapsho
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -50,7 +51,7 @@ public class SnapshotPendingSplitsState extends
PendingSplitsState {
* The snapshot splits that the {@link MySqlSourceEnumerator} has assigned
to {@link
* MySqlSplitReader}s.
*/
- private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;
+ private final LinkedHashMap<String, MySqlSchemalessSnapshotSplit>
assignedSplits;
/**
* The offsets of finished (snapshot) splits that the {@link
MySqlSourceEnumerator} has received
@@ -75,7 +76,7 @@ public class SnapshotPendingSplitsState extends
PendingSplitsState {
public SnapshotPendingSplitsState(
List<TableId> alreadyProcessedTables,
List<MySqlSchemalessSnapshotSplit> remainingSplits,
- Map<String, MySqlSchemalessSnapshotSplit> assignedSplits,
+ LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits,
Map<TableId, TableChange> tableSchemas,
Map<String, BinlogOffset> splitFinishedOffsets,
AssignerStatus assignerStatus,
@@ -103,7 +104,7 @@ public class SnapshotPendingSplitsState extends
PendingSplitsState {
return remainingSplits;
}
- public Map<String, MySqlSchemalessSnapshotSplit> getAssignedSplits() {
+ public LinkedHashMap<String, MySqlSchemalessSnapshotSplit>
getAssignedSplits() {
return assignedSplits;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
index a4f32a375..b3f421cee 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java
@@ -59,10 +59,8 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -419,15 +417,24 @@ public class MySqlSourceReader<T>
binlogSplit, receivedTotalFinishedSplitSize);
uncompletedBinlogSplits.put(binlogSplit.splitId(),
binlogSplit);
} else if (receivedMetaGroupId == expectedMetaGroupId) {
- List<FinishedSnapshotSplitInfo> newAddedMetadataGroup;
- Set<String> existedSplitsOfLastGroup =
- getExistedSplitsOfLastGroup(
- binlogSplit.getFinishedSnapshotSplitInfos(),
- sourceConfig.getSplitMetaGroupSize());
- newAddedMetadataGroup =
- metadataEvent.getMetaGroup().stream()
+ int expectedNumberOfAlreadyRetrievedElements =
+ binlogSplit.getFinishedSnapshotSplitInfos().size()
+ % sourceConfig.getSplitMetaGroupSize();
+ List<byte[]> metaGroup = metadataEvent.getMetaGroup();
+ if (expectedNumberOfAlreadyRetrievedElements > 0) {
+ LOG.info(
+ "Source reader {} is discarding the first {} out
of {} elements of meta group {}.",
+ subtaskId,
+ expectedNumberOfAlreadyRetrievedElements,
+ metaGroup.size(),
+ receivedMetaGroupId);
+ metaGroup =
+ metaGroup.subList(
+ expectedNumberOfAlreadyRetrievedElements,
metaGroup.size());
+ }
+ List<FinishedSnapshotSplitInfo> newAddedMetadataGroup =
+ metaGroup.stream()
.map(FinishedSnapshotSplitInfo::deserialize)
- .filter(r ->
!existedSplitsOfLastGroup.contains(r.getSplitId()))
.collect(Collectors.toList());
uncompletedBinlogSplits.put(
@@ -497,28 +504,6 @@ public class MySqlSourceReader<T>
}
}
- private Set<String> getExistedSplitsOfLastGroup(
- List<FinishedSnapshotSplitInfo> finishedSnapshotSplits, int
metaGroupSize) {
- int splitsNumOfLastGroup =
- finishedSnapshotSplits.size() %
sourceConfig.getSplitMetaGroupSize();
- if (splitsNumOfLastGroup != 0) {
- int lastGroupStart =
- ((int) (finishedSnapshotSplits.size() /
sourceConfig.getSplitMetaGroupSize()))
- * metaGroupSize;
- // Keep same order with
MySqlHybridSplitAssigner.createBinlogSplit() to avoid
- // 'invalid request meta group id' error
- List<String> sortedFinishedSnapshotSplits =
- finishedSnapshotSplits.stream()
- .map(FinishedSnapshotSplitInfo::getSplitId)
- .sorted()
- .collect(Collectors.toList());
- return new HashSet<>(
- sortedFinishedSnapshotSplits.subList(
- lastGroupStart, lastGroupStart +
splitsNumOfLastGroup));
- }
- return new HashSet<>();
- }
-
private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long
checkpointId) {
if (!LOG.isInfoEnabled()) {
return;
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
index fba3abcce..47cb9f23b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -42,7 +43,10 @@ public class MySqlBinlogSplit extends MySqlSplit {
private final BinlogOffset startingOffset;
private final BinlogOffset endingOffset;
+
+ /** Split IDs of all elements must be unique. */
private final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos;
+
private final Map<TableId, TableChange> tableSchemas;
private final int totalFinishedSplitSize;
private final boolean isSuspended;
@@ -58,6 +62,9 @@ public class MySqlBinlogSplit extends MySqlSplit {
int totalFinishedSplitSize,
boolean isSuspended) {
super(splitId);
+
+ ensureNoDuplicates(finishedSnapshotSplitInfos);
+
this.startingOffset = startingOffset;
this.endingOffset = endingOffset;
this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos;
@@ -74,14 +81,29 @@ public class MySqlBinlogSplit extends MySqlSplit {
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos,
Map<TableId, TableChange> tableSchemas,
int totalFinishedSplitSize) {
- super(splitId);
- this.startingOffset = startingOffset;
- this.endingOffset = endingOffset;
- this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos;
- this.tableSchemas = tableSchemas;
- this.totalFinishedSplitSize = totalFinishedSplitSize;
- this.isSuspended = false;
- this.tablesForLog = getTablesForLog();
+ this(
+ splitId,
+ startingOffset,
+ endingOffset,
+ finishedSnapshotSplitInfos,
+ tableSchemas,
+ totalFinishedSplitSize,
+ false);
+ }
+
+ private static void ensureNoDuplicates(
+ List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos) {
+ Set<String> seenSplitIds = new HashSet<>();
+ for (FinishedSnapshotSplitInfo splitInfo : finishedSnapshotSplitInfos)
{
+ if (seenSplitIds.contains(splitInfo.getSplitId())) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Found duplicate split ID %s in finished
snapshot split infos",
+ splitInfo.getSplitId()));
+ }
+
+ seenSplitIds.add(splitInfo.getSplitId());
+ }
}
public BinlogOffset getStartingOffset() {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java
index 0b02fd590..605b848c3 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java
@@ -47,6 +47,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -81,7 +82,7 @@ class MySqlHybridSplitAssignerTest extends
MySqlSourceTestBase {
List<TableId> alreadyProcessedTables = Lists.newArrayList(tableId);
List<MySqlSchemalessSnapshotSplit> remainingSplits = new ArrayList<>();
- Map<String, MySqlSchemalessSnapshotSplit> assignedSplits = new
HashMap<>();
+ LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits =
new LinkedHashMap<>();
Map<String, BinlogOffset> splitFinishedOffsets = new HashMap<>();
for (int i = 0; i < 5; i++) {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
index d97555593..ca5dff5bd 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
@@ -36,16 +36,23 @@ import io.debezium.relational.TableId;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.annotation.Nullable;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
@@ -527,6 +534,92 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
assertThat(splits).isEqualTo(expected);
}
+ @ParameterizedTest
+ @MethodSource
+ void testFinishedSnapshotSplitInfosAreInOrderOfAssignment(
+ String table1Name, String table2Name) {
+ List<String> tableNames = new ArrayList<>();
+ tableNames.add(table1Name);
+
+ SnapshotPendingSplitsState state;
+
+ try (MySqlSnapshotSplitAssigner assigner = createAssigner(tableNames,
null)) {
+ state = processAllSplitsAndSnapshotState(assigner, 1L);
+ }
+
+ tableNames.add(table2Name);
+
+ try (MySqlSnapshotSplitAssigner assigner = createAssigner(tableNames,
state)) {
+ state = processAllSplitsAndSnapshotState(assigner, 2L);
+ }
+
+ try (MySqlSnapshotSplitAssigner assigner = createAssigner(tableNames,
state)) {
+ List<String> finishedSnapshotSplitTableNames =
+ assigner.getFinishedSplitInfos().stream()
+ .map(i -> i.getTableId().table())
+ .collect(Collectors.toList());
+
+ assertThat(finishedSnapshotSplitTableNames).isEqualTo(tableNames);
+ }
+ }
+
+ /**
+ * Use various combinations of table names to ensure that the finished
snapshot split infos are
+ * in the order of assignment, not the order of table names.
+ */
+ public static Stream<Arguments>
testFinishedSnapshotSplitInfosAreInOrderOfAssignment() {
+ String table1Name = "customers";
+ String table2Name = "customers_1";
+
+ return Stream.of(
+ Arguments.of(table1Name, table2Name), Arguments.of(table2Name,
table1Name));
+ }
+
+ private MySqlSnapshotSplitAssigner createAssigner(
+ List<String> tableNames, @Nullable SnapshotPendingSplitsState
state) {
+ int currentParallelism = 1;
+
+ if (state == null) {
+ return new MySqlSnapshotSplitAssigner(
+ createConfiguration(tableNames),
+ currentParallelism,
+ new ArrayList<>(),
+ true,
+ getMySqlSplitEnumeratorContext());
+ }
+
+ return new MySqlSnapshotSplitAssigner(
+ createConfiguration(tableNames),
+ currentParallelism,
+ state,
+ getMySqlSplitEnumeratorContext());
+ }
+
+ private MySqlSourceConfig createConfiguration(List<String> tableNames) {
+ return getConfig(
+ customerDatabase,
+ Integer.MAX_VALUE,
+ CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+ CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+ tableNames.toArray(new String[0]),
+ "id",
+ true,
+ false);
+ }
+
+ private SnapshotPendingSplitsState processAllSplitsAndSnapshotState(
+ MySqlSnapshotSplitAssigner assigner, long checkpointId) {
+ assigner.open();
+
+ Optional<MySqlSplit> optional;
+ while ((optional = assigner.getNext()).isPresent()) {
+ assigner.onFinishedSplits(
+ Collections.singletonMap(optional.get().splitId(),
BinlogOffset.ofLatest()));
+ }
+
+ return assigner.snapshotState(checkpointId);
+ }
+
private List<String> getTestAssignSnapshotSplits(
int splitSize,
double distributionFactorUpper,
@@ -647,7 +740,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
null,
null));
- Map<String, MySqlSchemalessSnapshotSplit> assignedSplits = new
HashMap<>();
+ LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits =
new LinkedHashMap<>();
assignedSplits.put(
processedTable + ":0",
new MySqlSchemalessSnapshotSplit(
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
index 3d813b8a8..60abb56ff 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@@ -147,7 +148,8 @@ class PendingSplitsStateSerializerTest {
remainingSplits.add(getTestSchemalessSnapshotSplit(tableId1, 2));
remainingSplits.add(getTestSchemalessSnapshotSplit(tableId1, 3));
- final Map<String, MySqlSchemalessSnapshotSplit> assignedSnapshotSplits
= new HashMap<>();
+ final LinkedHashMap<String, MySqlSchemalessSnapshotSplit>
assignedSnapshotSplits =
+ new LinkedHashMap<>();
Arrays.asList(
getTestSchemalessSnapshotSplit(tableId0, 0),
getTestSchemalessSnapshotSplit(tableId0, 1),
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java
index f26c3a0b8..ff310d367 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java
@@ -148,6 +148,32 @@ class MySqlBinlogSplitTest {
Assertions.assertThat(binlogSplit.getTables()).isEqualTo(expectedTables);
}
+ @Test
+ public void duplicateSplitInfo() {
+ FinishedSnapshotSplitInfo info =
+ new FinishedSnapshotSplitInfo(
+ new TableId("catalog", "schema", "table"),
+ "split",
+ null,
+ null,
+ BinlogOffset.ofLatest());
+ List<FinishedSnapshotSplitInfo> infos = new ArrayList<>();
+ infos.add(info);
+ infos.add(info);
+
+ Assertions.assertThatThrownBy(
+ () ->
+ new MySqlBinlogSplit(
+ "binlog-split",
+ BinlogOffset.ofLatest(),
+ null,
+ infos,
+ Collections.emptyMap(),
+ 0,
+ false))
+ .isExactlyInstanceOf(IllegalArgumentException.class);
+ }
+
/** A mock implementation for {@link Table} which is used for unit tests.
*/
private static class MockTable implements Table {
private final TableId tableId;