This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new c1a7d0b6f [FLINK-38531][cdc-connector-mysql]Fix data loss when
restoring from a checkpoint positioned in the middle of a bulk DML operation.
(#4165)
c1a7d0b6f is described below
commit c1a7d0b6f194ad2511712c1efb0a17053080f3d8
Author: 5herhom <[email protected]>
AuthorDate: Wed Dec 10 14:54:23 2025 +0800
[FLINK-38531][cdc-connector-mysql]Fix data loss when restoring from a
checkpoint positioned in the middle of a bulk DML operation. (#4165)
---
.../mysql/source/offset/BinlogOffset.java | 6 +-
.../debezium/reader/BinlogSplitReaderTest.java | 83 +++++++
.../mysql/source/offset/BinlogOffsetTest.java | 249 +++++++++++++++++++++
3 files changed, 337 insertions(+), 1 deletion(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java
index c6511a1e7..67ffc3b82 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java
@@ -193,7 +193,11 @@ public class BinlogOffset implements
Comparable<BinlogOffset>, Serializable {
if (gtidSet.equals(targetGtidSet)) {
long restartSkipEvents = this.getRestartSkipEvents();
long targetRestartSkipEvents = that.getRestartSkipEvents();
- return Long.compare(restartSkipEvents,
targetRestartSkipEvents);
+ if (restartSkipEvents != targetRestartSkipEvents) {
+ return Long.compare(restartSkipEvents,
targetRestartSkipEvents);
+ }
+ // The completed events are the same, so compare the row
number ...
+ return Long.compare(this.getRestartSkipRows(),
that.getRestartSkipRows());
}
// The GTIDs are not an exact match, so figure out if this is
a subset of the target
// offset
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
index 0a6417b3e..e7fd2c4cb 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
@@ -786,6 +786,89 @@ class BinlogSplitReaderTest extends MySqlSourceTestBase {
assertEqualsInOrder(Arrays.asList(expected), actual);
}
+ @Test
+ void testRestoreFromCheckpointWithGtidSetAndSkippingEventsAndRows() throws
Exception {
+ // Preparations
+ customerDatabase.createAndInitialize();
+ MySqlSourceConfig connectionConfig = getConfig(new String[]
{"customers"});
+ binaryLogClient =
DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration());
+ mySqlConnection =
DebeziumUtils.createMySqlConnection(connectionConfig);
+ DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.BIGINT()),
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("address", DataTypes.STRING()),
+ DataTypes.FIELD("phone_number", DataTypes.STRING()));
+
+ // Capture the current binlog offset, and we will start the reader
from here
+ BinlogOffset startingOffset =
DebeziumUtils.currentBinlogOffset(mySqlConnection);
+
+ // In this case, the binlog is:
+ // Event 0: QUERY,BEGIN
+ // Event 1: TABLE_MAP
+ // Event 2: Update id = 101 and id = 102
+ // ROW 1 : Update id=101
+ // ROW 2 : Update id=102
+ // Event 3: TABLE_MAP
+ // Event 4: Update id = 103 and id = 109
+ // ROW 1 : Update id=103
+ // ROW 2 : Update id=109
+
+ // When a checkpoint is triggered
+ // after id=103 ,before id=109 ,
+ // the position restored from checkpoint will be event=4 and row=1
+ BinlogOffset checkpointOffset =
+ BinlogOffset.builder()
+ .setBinlogFilePosition("", 0)
+ .setGtidSet(startingOffset.getGtidSet())
+ // Because the position restored from checkpoint
+ // will skip 4 events to drop the first update:
+ // QUERY / TABLE_MAP / EXT_UPDATE_ROWS / TABLE_MAP
+ .setSkipEvents(4)
+ // The position restored from checkpoint
+ // will skip 1 rows to drop the first
+ .setSkipRows(1)
+ .build();
+
+ // Create a new config to start reading from the offset captured above
+ MySqlSourceConfig sourceConfig =
+ getConfig(
+ StartupOptions.specificOffset(checkpointOffset),
+ new String[] {"customers"});
+
+ // Create reader and submit splits
+ MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
+ BinlogSplitReader reader = createBinlogReader(sourceConfig);
+ reader.submitSplit(split);
+
+ // Create some binlog events:
+ // Event 0: QUERY,BEGIN
+ // Event 1: TABLE_MAP
+ // Event 2: Update id = 101 and id = 102
+ // ROW 1 : Update id=101
+ // ROW 2 : Update id=102
+ // Event 3: TABLE_MAP
+ // Event 4: Update id = 103 and id = 109
+ // ROW 1 : Update id=103
+ // ROW 2 : Update id=109
+ // The event 0-3 will be dropped because skipEvents = 4.
+ // The row 1 in event 4 will be dropped because skipRows = 1.
+ // Only the update on 109 will be captured.
+ updateCustomersTableInBulk(
+ mySqlConnection,
customerDatabase.qualifiedTableName("customers"));
+
+ // Read with binlog split reader and validate
+ String[] expected =
+ new String[] {
+ "-U[109, user_4, Shanghai, 123567891234]",
+ "+U[109, user_4, Pittsburgh, 123567891234]"
+ };
+ List<String> actual = readBinlogSplits(dataType, reader,
expected.length);
+
+ reader.close();
+ assertEqualsInOrder(Arrays.asList(expected), actual);
+ }
+
@Test
void testReadBinlogFromTimestamp() throws Exception {
// Preparations
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java
new file mode 100644
index 000000000..58f34c16a
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.offset;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Unit test for {@link BinlogOffset}. */
+public class BinlogOffsetTest {
+ public static final String PART_OF_GTID_SET_1 = "abcd:1-4";
+ public static final String PART_OF_GTID_SET_2 = "efgh:1-10";
+ public static final String FULL_GTID_SET =
+ String.join(",", PART_OF_GTID_SET_1, PART_OF_GTID_SET_2);
+
+ @Test
+ public void testCompareToWithGtidSet() {
+ // Test same GTID sets in different orders
+ BinlogOffset offset1 =
BinlogOffset.builder().setGtidSet(FULL_GTID_SET).build();
+ BinlogOffset offset2 =
+ BinlogOffset.builder()
+ .setGtidSet(String.join(",", PART_OF_GTID_SET_2,
PART_OF_GTID_SET_1))
+ .build();
+ assetCompareTo(offset1, offset2, 0);
+
+ // The test uses GTID instead of position for comparison.
+ offset1 =
+ BinlogOffset.builder()
+ .setGtidSet(FULL_GTID_SET)
+ .setBinlogFilePosition("binlog.001", 123)
+ .build();
+ offset2 =
+ BinlogOffset.builder()
+ .setGtidSet(String.join(",", PART_OF_GTID_SET_2,
PART_OF_GTID_SET_1))
+ .setBinlogFilePosition("binlog.001", 456)
+ .build();
+ assetCompareTo(offset1, offset2, 0);
+
+ // Test different GTID sets where one contains another
+ BinlogOffset offset3 =
BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_1).build();
+ BinlogOffset offset4 =
+ BinlogOffset.builder()
+ .setGtidSet("abcd:1-5") // Contains offset3's GTID set
+ .build();
+
+ // offset3 should be before offset4
+ assetCompareTo(offset3, offset4, -1);
+ assetCompareTo(offset4, offset3, 1);
+
+ // The test uses GTID instead of position for comparison.
+ offset3 =
+ BinlogOffset.builder()
+ .setGtidSet(PART_OF_GTID_SET_1)
+ .setBinlogFilePosition("binlog.001", 1000)
+ .build();
+ offset4 =
+ BinlogOffset.builder()
+ .setGtidSet("abcd:1-5") // Contains offset3's GTID set
+ .setBinlogFilePosition("binlog.001", 23)
+ .build();
+ assetCompareTo(offset3, offset4, -1);
+ assetCompareTo(offset4, offset3, 1);
+
+ // Test completely different GTID sets
+ BinlogOffset offset5 =
BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_1).build();
+ BinlogOffset offset6 =
BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_2).build();
+
+ // offsets don't contain each other, result is always 1
+ assetCompareTo(offset5, offset6, 1);
+ assetCompareTo(offset6, offset5, 1);
+ }
+
+ @Test
+ public void testCompareToWithGtidSetAndSkipEventsAndSkipRows() {
+ // Test same GTID but different skip events
+ BinlogOffset offset1 =
+
BinlogOffset.builder().setGtidSet(FULL_GTID_SET).setSkipEvents(5).build();
+ BinlogOffset offset2 =
+
BinlogOffset.builder().setGtidSet(FULL_GTID_SET).setSkipEvents(10).build();
+
+ assetCompareTo(offset1, offset2, -1);
+ assetCompareTo(offset2, offset1, 1);
+
+ // Test same GTID and skip events but different skip rows
+ BinlogOffset offset3 =
+ BinlogOffset.builder()
+ .setGtidSet(FULL_GTID_SET)
+ .setSkipEvents(5)
+ .setSkipRows(10)
+ .build();
+ BinlogOffset offset4 =
+ BinlogOffset.builder()
+ .setGtidSet(FULL_GTID_SET)
+ .setSkipEvents(5)
+ .setSkipRows(20)
+ .build();
+
+ assetCompareTo(offset3, offset4, -1);
+ assetCompareTo(offset4, offset3, 1);
+ }
+
+ @Test
+ public void testCompareToWithGtidSetExistence() {
+ // Test one offset has GTID set and another doesn't
+ BinlogOffset offsetWithGtid =
+ BinlogOffset.builder()
+ .setGtidSet(PART_OF_GTID_SET_1)
+ .setBinlogFilePosition("binlog.001", 123)
+ .build();
+ BinlogOffset offsetWithoutGtid =
+ BinlogOffset.builder().setBinlogFilePosition("binlog.001",
456).build();
+
+ // When one has GTID and another doesn't, the one without GTID is
considered older
+ assetCompareTo(offsetWithGtid, offsetWithoutGtid, 1);
+ assetCompareTo(offsetWithoutGtid, offsetWithGtid, -1);
+
+ // Test the reverse scenario
+ BinlogOffset offsetWithGtid2 =
+ BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_2).build();
+ BinlogOffset offsetWithoutGtid2 =
+ BinlogOffset.builder()
+ .setBinlogFilePosition("binlog.002", 789)
+ .setSkipEvents(5)
+ .build();
+
+ assetCompareTo(offsetWithGtid2, offsetWithoutGtid2, 1);
+ assetCompareTo(offsetWithoutGtid2, offsetWithGtid2, -1);
+ }
+
+ @Test
+ public void testCompareToWithFilePosition() {
+ // Test same file position - should be equal
+ BinlogOffset offset1 =
+ BinlogOffset.builder().setBinlogFilePosition("binlog.001",
123).build();
+ BinlogOffset offset2 =
+ BinlogOffset.builder().setBinlogFilePosition("binlog.001",
123).build();
+ assetCompareTo(offset1, offset2, 0);
+
+ // Test different file names
+ BinlogOffset offset3 =
+ BinlogOffset.builder().setBinlogFilePosition("binlog.001",
123).build();
+ BinlogOffset offset4 =
+ BinlogOffset.builder().setBinlogFilePosition("binlog.002",
123).build();
+ assetCompareTo(offset3, offset4, -1);
+ assetCompareTo(offset4, offset3, 1);
+
+ // Test different positions in same file
+ BinlogOffset offset5 =
+ BinlogOffset.builder().setBinlogFilePosition("binlog.001",
100).build();
+ BinlogOffset offset6 =
+ BinlogOffset.builder().setBinlogFilePosition("binlog.001",
200).build();
+ assetCompareTo(offset5, offset6, -1);
+ assetCompareTo(offset6, offset5, 1);
+ }
+
+ @Test
+ public void testCompareToWithFilePositionAndSkipEventsAndSkipRows() {
+ // Test with skip events
+ BinlogOffset offset1 =
+ BinlogOffset.builder()
+ .setBinlogFilePosition("binlog.001", 123)
+ .setSkipEvents(5)
+ .build();
+ BinlogOffset offset2 =
+ BinlogOffset.builder()
+ .setBinlogFilePosition("binlog.001", 123)
+ .setSkipEvents(10)
+ .build();
+ assetCompareTo(offset1, offset2, -1);
+ assetCompareTo(offset2, offset1, 1);
+
+ // Test with skip rows
+ BinlogOffset offset3 =
+ BinlogOffset.builder()
+ .setBinlogFilePosition("binlog.001", 123)
+ .setSkipEvents(5)
+ .setSkipRows(10)
+ .build();
+ BinlogOffset offset4 =
+ BinlogOffset.builder()
+ .setBinlogFilePosition("binlog.001", 123)
+ .setSkipEvents(5)
+ .setSkipRows(20)
+ .build();
+ assetCompareTo(offset3, offset4, -1);
+ assetCompareTo(offset4, offset3, 1);
+ }
+
+ @Test
+ public void testCompareToTimestampWithDifferentServerId() {
+ // Test different server IDs with different timestamps
+ BinlogOffset offset1 =
+ BinlogOffset.builder()
+ .setServerId(1L)
+ .setTimestampSec(1000L)
+ .setBinlogFilePosition("binlog.001", 123)
+ .build();
+ BinlogOffset offset2 =
+ BinlogOffset.builder()
+ .setServerId(2L)
+ .setTimestampSec(2000L)
+ .setBinlogFilePosition("binlog.001", 123)
+ .build();
+
+ // Should compare based on timestamp since server IDs are different
+ assetCompareTo(offset1, offset2, -1);
+ assetCompareTo(offset2, offset1, 1);
+
+ // Test same timestamps but different server IDs
+ BinlogOffset offset3 =
+ BinlogOffset.builder()
+ .setServerId(1L)
+ .setTimestampSec(1500L)
+ .setBinlogFilePosition("binlog.001", 432)
+ .build();
+ BinlogOffset offset4 =
+ BinlogOffset.builder()
+ .setServerId(2L)
+ .setTimestampSec(1500L)
+ .setBinlogFilePosition("binlog.001", 123)
+ .build();
+
+ // Same timestamp, different server IDs - should compare based on
timestamp (which are
+ // equal)
+ // But since server IDs are different and timestamps are same, it will
fall through to file
+ // position comparison
+ // Since file positions are same, it will compare skip events (default
0)
+ assetCompareTo(offset3, offset4, 0);
+ }
+
+ private void assetCompareTo(BinlogOffset offset1, BinlogOffset offset2,
int expected) {
+ int actual = offset1.compareTo(offset2);
+ Assertions.assertThat(expected).isEqualTo(actual);
+ }
+}