This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new c1a7d0b6f [FLINK-38531][cdc-connector-mysql]Fix data loss when 
restoring from a checkpoint positioned in the middle of a bulk DML operation. 
(#4165)
c1a7d0b6f is described below

commit c1a7d0b6f194ad2511712c1efb0a17053080f3d8
Author: 5herhom <[email protected]>
AuthorDate: Wed Dec 10 14:54:23 2025 +0800

    [FLINK-38531][cdc-connector-mysql]Fix data loss when restoring from a 
checkpoint positioned in the middle of a bulk DML operation. (#4165)
---
 .../mysql/source/offset/BinlogOffset.java          |   6 +-
 .../debezium/reader/BinlogSplitReaderTest.java     |  83 +++++++
 .../mysql/source/offset/BinlogOffsetTest.java      | 249 +++++++++++++++++++++
 3 files changed, 337 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java
index c6511a1e7..67ffc3b82 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java
@@ -193,7 +193,11 @@ public class BinlogOffset implements 
Comparable<BinlogOffset>, Serializable {
                 if (gtidSet.equals(targetGtidSet)) {
                     long restartSkipEvents = this.getRestartSkipEvents();
                     long targetRestartSkipEvents = that.getRestartSkipEvents();
-                    return Long.compare(restartSkipEvents, 
targetRestartSkipEvents);
+                    if (restartSkipEvents != targetRestartSkipEvents) {
+                        return Long.compare(restartSkipEvents, 
targetRestartSkipEvents);
+                    }
+                    // The completed events are the same, so compare the row 
number ...
+                    return Long.compare(this.getRestartSkipRows(), 
that.getRestartSkipRows());
                 }
                 // The GTIDs are not an exact match, so figure out if this is 
a subset of the target
                 // offset
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
index 0a6417b3e..e7fd2c4cb 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
@@ -786,6 +786,89 @@ class BinlogSplitReaderTest extends MySqlSourceTestBase {
         assertEqualsInOrder(Arrays.asList(expected), actual);
     }
 
+    @Test
+    void testRestoreFromCheckpointWithGtidSetAndSkippingEventsAndRows() throws 
Exception {
+        // Preparations
+        customerDatabase.createAndInitialize();
+        MySqlSourceConfig connectionConfig = getConfig(new String[] 
{"customers"});
+        binaryLogClient = 
DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration());
+        mySqlConnection = 
DebeziumUtils.createMySqlConnection(connectionConfig);
+        DataType dataType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("id", DataTypes.BIGINT()),
+                        DataTypes.FIELD("name", DataTypes.STRING()),
+                        DataTypes.FIELD("address", DataTypes.STRING()),
+                        DataTypes.FIELD("phone_number", DataTypes.STRING()));
+
+        // Capture the current binlog offset, and we will start the reader 
from here
+        BinlogOffset startingOffset = 
DebeziumUtils.currentBinlogOffset(mySqlConnection);
+
+        // In this case, the binlog is:
+        // Event 0: QUERY,BEGIN
+        // Event 1: TABLE_MAP
+        // Event 2: Update id = 101 and id = 102
+        //        ROW 1 : Update id=101
+        //        ROW 2 : Update id=102
+        // Event 3: TABLE_MAP
+        // Event 4: Update id = 103 and id = 109
+        //        ROW 1 : Update id=103
+        //        ROW 2 : Update id=109
+
+        // When a checkpoint is triggered
+        // after id=103 ,before id=109 ,
+        // the position restored from checkpoint will be event=4 and row=1
+        BinlogOffset checkpointOffset =
+                BinlogOffset.builder()
+                        .setBinlogFilePosition("", 0)
+                        .setGtidSet(startingOffset.getGtidSet())
+                        // Because the position restored from checkpoint
+                        // will skip 4 events to drop the first update:
+                        // QUERY / TABLE_MAP / EXT_UPDATE_ROWS / TABLE_MAP
+                        .setSkipEvents(4)
+                        // The position restored from checkpoint
+                        // will skip 1 rows to drop the first
+                        .setSkipRows(1)
+                        .build();
+
+        // Create a new config to start reading from the offset captured above
+        MySqlSourceConfig sourceConfig =
+                getConfig(
+                        StartupOptions.specificOffset(checkpointOffset),
+                        new String[] {"customers"});
+
+        // Create reader and submit splits
+        MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
+        BinlogSplitReader reader = createBinlogReader(sourceConfig);
+        reader.submitSplit(split);
+
+        // Create some binlog events:
+        // Event 0: QUERY,BEGIN
+        // Event 1: TABLE_MAP
+        // Event 2: Update id = 101 and id = 102
+        //        ROW 1 : Update id=101
+        //        ROW 2 : Update id=102
+        // Event 3: TABLE_MAP
+        // Event 4: Update id = 103 and id = 109
+        //        ROW 1 : Update id=103
+        //        ROW 2 : Update id=109
+        // The event 0-3 will be dropped because skipEvents = 4.
+        // The row 1 in event 4 will be dropped because skipRows = 1.
+        // Only the update on 109 will be captured.
+        updateCustomersTableInBulk(
+                mySqlConnection, 
customerDatabase.qualifiedTableName("customers"));
+
+        // Read with binlog split reader and validate
+        String[] expected =
+                new String[] {
+                    "-U[109, user_4, Shanghai, 123567891234]",
+                    "+U[109, user_4, Pittsburgh, 123567891234]"
+                };
+        List<String> actual = readBinlogSplits(dataType, reader, 
expected.length);
+
+        reader.close();
+        assertEqualsInOrder(Arrays.asList(expected), actual);
+    }
+
     @Test
     void testReadBinlogFromTimestamp() throws Exception {
         // Preparations
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java
new file mode 100644
index 000000000..58f34c16a
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.offset;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Unit test for {@link BinlogOffset}. */
+public class BinlogOffsetTest {
+    public static final String PART_OF_GTID_SET_1 = "abcd:1-4";
+    public static final String PART_OF_GTID_SET_2 = "efgh:1-10";
+    public static final String FULL_GTID_SET =
+            String.join(",", PART_OF_GTID_SET_1, PART_OF_GTID_SET_2);
+
+    @Test
+    public void testCompareToWithGtidSet() {
+        // Test same GTID sets in different orders
+        BinlogOffset offset1 = 
BinlogOffset.builder().setGtidSet(FULL_GTID_SET).build();
+        BinlogOffset offset2 =
+                BinlogOffset.builder()
+                        .setGtidSet(String.join(",", PART_OF_GTID_SET_2, 
PART_OF_GTID_SET_1))
+                        .build();
+        assetCompareTo(offset1, offset2, 0);
+
+        // The test uses GTID instead of position for comparison.
+        offset1 =
+                BinlogOffset.builder()
+                        .setGtidSet(FULL_GTID_SET)
+                        .setBinlogFilePosition("binlog.001", 123)
+                        .build();
+        offset2 =
+                BinlogOffset.builder()
+                        .setGtidSet(String.join(",", PART_OF_GTID_SET_2, 
PART_OF_GTID_SET_1))
+                        .setBinlogFilePosition("binlog.001", 456)
+                        .build();
+        assetCompareTo(offset1, offset2, 0);
+
+        // Test different GTID sets where one contains another
+        BinlogOffset offset3 = 
BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_1).build();
+        BinlogOffset offset4 =
+                BinlogOffset.builder()
+                        .setGtidSet("abcd:1-5") // Contains offset3's GTID set
+                        .build();
+
+        // offset3 should be before offset4
+        assetCompareTo(offset3, offset4, -1);
+        assetCompareTo(offset4, offset3, 1);
+
+        // The test uses GTID instead of position for comparison.
+        offset3 =
+                BinlogOffset.builder()
+                        .setGtidSet(PART_OF_GTID_SET_1)
+                        .setBinlogFilePosition("binlog.001", 1000)
+                        .build();
+        offset4 =
+                BinlogOffset.builder()
+                        .setGtidSet("abcd:1-5") // Contains offset3's GTID set
+                        .setBinlogFilePosition("binlog.001", 23)
+                        .build();
+        assetCompareTo(offset3, offset4, -1);
+        assetCompareTo(offset4, offset3, 1);
+
+        // Test completely different GTID sets
+        BinlogOffset offset5 = 
BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_1).build();
+        BinlogOffset offset6 = 
BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_2).build();
+
+        // offsets don't contain each other, result is always 1
+        assetCompareTo(offset5, offset6, 1);
+        assetCompareTo(offset6, offset5, 1);
+    }
+
+    @Test
+    public void testCompareToWithGtidSetAndSkipEventsAndSkipRows() {
+        // Test same GTID but different skip events
+        BinlogOffset offset1 =
+                
BinlogOffset.builder().setGtidSet(FULL_GTID_SET).setSkipEvents(5).build();
+        BinlogOffset offset2 =
+                
BinlogOffset.builder().setGtidSet(FULL_GTID_SET).setSkipEvents(10).build();
+
+        assetCompareTo(offset1, offset2, -1);
+        assetCompareTo(offset2, offset1, 1);
+
+        // Test same GTID and skip events but different skip rows
+        BinlogOffset offset3 =
+                BinlogOffset.builder()
+                        .setGtidSet(FULL_GTID_SET)
+                        .setSkipEvents(5)
+                        .setSkipRows(10)
+                        .build();
+        BinlogOffset offset4 =
+                BinlogOffset.builder()
+                        .setGtidSet(FULL_GTID_SET)
+                        .setSkipEvents(5)
+                        .setSkipRows(20)
+                        .build();
+
+        assetCompareTo(offset3, offset4, -1);
+        assetCompareTo(offset4, offset3, 1);
+    }
+
+    @Test
+    public void testCompareToWithGtidSetExistence() {
+        // Test one offset has GTID set and another doesn't
+        BinlogOffset offsetWithGtid =
+                BinlogOffset.builder()
+                        .setGtidSet(PART_OF_GTID_SET_1)
+                        .setBinlogFilePosition("binlog.001", 123)
+                        .build();
+        BinlogOffset offsetWithoutGtid =
+                BinlogOffset.builder().setBinlogFilePosition("binlog.001", 
456).build();
+
+        // When one has GTID and another doesn't, the one without GTID is 
considered older
+        assetCompareTo(offsetWithGtid, offsetWithoutGtid, 1);
+        assetCompareTo(offsetWithoutGtid, offsetWithGtid, -1);
+
+        // Test the reverse scenario
+        BinlogOffset offsetWithGtid2 =
+                BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_2).build();
+        BinlogOffset offsetWithoutGtid2 =
+                BinlogOffset.builder()
+                        .setBinlogFilePosition("binlog.002", 789)
+                        .setSkipEvents(5)
+                        .build();
+
+        assetCompareTo(offsetWithGtid2, offsetWithoutGtid2, 1);
+        assetCompareTo(offsetWithoutGtid2, offsetWithGtid2, -1);
+    }
+
+    @Test
+    public void testCompareToWithFilePosition() {
+        // Test same file position - should be equal
+        BinlogOffset offset1 =
+                BinlogOffset.builder().setBinlogFilePosition("binlog.001", 
123).build();
+        BinlogOffset offset2 =
+                BinlogOffset.builder().setBinlogFilePosition("binlog.001", 
123).build();
+        assetCompareTo(offset1, offset2, 0);
+
+        // Test different file names
+        BinlogOffset offset3 =
+                BinlogOffset.builder().setBinlogFilePosition("binlog.001", 
123).build();
+        BinlogOffset offset4 =
+                BinlogOffset.builder().setBinlogFilePosition("binlog.002", 
123).build();
+        assetCompareTo(offset3, offset4, -1);
+        assetCompareTo(offset4, offset3, 1);
+
+        // Test different positions in same file
+        BinlogOffset offset5 =
+                BinlogOffset.builder().setBinlogFilePosition("binlog.001", 
100).build();
+        BinlogOffset offset6 =
+                BinlogOffset.builder().setBinlogFilePosition("binlog.001", 
200).build();
+        assetCompareTo(offset5, offset6, -1);
+        assetCompareTo(offset6, offset5, 1);
+    }
+
+    @Test
+    public void testCompareToWithFilePositionAndSkipEventsAndSkipRows() {
+        // Test with skip events
+        BinlogOffset offset1 =
+                BinlogOffset.builder()
+                        .setBinlogFilePosition("binlog.001", 123)
+                        .setSkipEvents(5)
+                        .build();
+        BinlogOffset offset2 =
+                BinlogOffset.builder()
+                        .setBinlogFilePosition("binlog.001", 123)
+                        .setSkipEvents(10)
+                        .build();
+        assetCompareTo(offset1, offset2, -1);
+        assetCompareTo(offset2, offset1, 1);
+
+        // Test with skip rows
+        BinlogOffset offset3 =
+                BinlogOffset.builder()
+                        .setBinlogFilePosition("binlog.001", 123)
+                        .setSkipEvents(5)
+                        .setSkipRows(10)
+                        .build();
+        BinlogOffset offset4 =
+                BinlogOffset.builder()
+                        .setBinlogFilePosition("binlog.001", 123)
+                        .setSkipEvents(5)
+                        .setSkipRows(20)
+                        .build();
+        assetCompareTo(offset3, offset4, -1);
+        assetCompareTo(offset4, offset3, 1);
+    }
+
+    @Test
+    public void testCompareToTimestampWithDifferentServerId() {
+        // Test different server IDs with different timestamps
+        BinlogOffset offset1 =
+                BinlogOffset.builder()
+                        .setServerId(1L)
+                        .setTimestampSec(1000L)
+                        .setBinlogFilePosition("binlog.001", 123)
+                        .build();
+        BinlogOffset offset2 =
+                BinlogOffset.builder()
+                        .setServerId(2L)
+                        .setTimestampSec(2000L)
+                        .setBinlogFilePosition("binlog.001", 123)
+                        .build();
+
+        // Should compare based on timestamp since server IDs are different
+        assetCompareTo(offset1, offset2, -1);
+        assetCompareTo(offset2, offset1, 1);
+
+        // Test same timestamps but different server IDs
+        BinlogOffset offset3 =
+                BinlogOffset.builder()
+                        .setServerId(1L)
+                        .setTimestampSec(1500L)
+                        .setBinlogFilePosition("binlog.001", 432)
+                        .build();
+        BinlogOffset offset4 =
+                BinlogOffset.builder()
+                        .setServerId(2L)
+                        .setTimestampSec(1500L)
+                        .setBinlogFilePosition("binlog.001", 123)
+                        .build();
+
+        // Same timestamp, different server IDs - should compare based on 
timestamp (which are
+        // equal)
+        // But since server IDs are different and timestamps are same, it will 
fall through to file
+        // position comparison
+        // Since file positions are same, it will compare skip events (default 
0)
+        assetCompareTo(offset3, offset4, 0);
+    }
+
+    private void assetCompareTo(BinlogOffset offset1, BinlogOffset offset2, 
int expected) {
+        int actual = offset1.compareTo(offset2);
+        Assertions.assertThat(expected).isEqualTo(actual);
+    }
+}

Reply via email to