This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new c4b698bff [FLINK-39149][mysql-cdc] Implement LATEST mode GTID merging 
to prevent replaying pre-checkpoint transactions with non-contiguous ranges 
(#4286)
c4b698bff is described below

commit c4b698bffb692f480622123fec36494ec505929c
Author: Jia Fan <[email protected]>
AuthorDate: Thu Mar 19 15:03:42 2026 +0800

    [FLINK-39149][mysql-cdc] Implement LATEST mode GTID merging to prevent 
replaying pre-checkpoint transactions with non-contiguous ranges (#4286)
---
 .../flink-connector-mysql-cdc/pom.xml              |  11 ++
 .../io/debezium/connector/mysql/GtidUtils.java     |  60 ++++++++++
 .../mysql/MySqlStreamingChangeEventSource.java     |  30 +++--
 .../connector/mysql/FilterGtidSetTest.java         | 125 +++++++++++++++++++
 .../io/debezium/connector/mysql/GtidUtilsTest.java | 132 +++++++++++++++++++++
 .../mysql/MySqlStreamingChangeEventSource.java     |  30 +++--
 6 files changed, 364 insertions(+), 24 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
index 5ce4f9a52..026181af5 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
@@ -29,6 +29,10 @@ limitations under the License.
     <name>flink-connector-mysql-cdc</name>
     <packaging>jar</packaging>
 
+    <properties>
+        <mockito.version>3.12.4</mockito.version>
+    </properties>
+
     <dependencies>
 
         <!-- Debezium dependencies -->
@@ -195,6 +199,13 @@ limitations under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
index f6c4987e7..5956de9b3 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
@@ -23,10 +23,70 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Predicate;
 
 /** Utils for handling GTIDs. */
 public class GtidUtils {
 
+    /**
+     * Fixes old channels' GTID ranges by filling prefix gaps using server 
GTID intervals.
+     *
+     * <p>This is the shared logic between EARLIEST and LATEST modes. For 
UUIDs present in the
+     * checkpoint, non-contiguous GTID ranges are corrected via {@link 
#fixRestoredGtidSet} to avoid
+     * MySQL replaying pre-checkpoint transactions.
+     *
+     * @param availableServerGtidSet the relevant (filtered) server GTID set
+     * @param purgedServerGtid the GTID set already purged from the MySQL 
server
+     * @param checkpointGtidSet the GTID set restored from checkpoint
+     * @return the fixed GTID set for old channels
+     */
+    public static GtidSet fixOldChannelsGtidSet(
+            GtidSet availableServerGtidSet, GtidSet purgedServerGtid, GtidSet 
checkpointGtidSet) {
+        return fixRestoredGtidSet(
+                mergeGtidSetInto(
+                        availableServerGtidSet.retainAll(
+                                uuid -> 
checkpointGtidSet.forServerWithId(uuid) != null),
+                        purgedServerGtid),
+                checkpointGtidSet);
+    }
+
+    /**
+     * Computes the merged GTID set for the LATEST new-channel-position mode.
+     *
+     * <p>For old channels (UUIDs present in checkpoint), non-contiguous GTID 
ranges are fixed via
+     * {@link #fixOldChannelsGtidSet}. For new channels (UUIDs not in 
checkpoint), the server's full
+     * GTID is used to skip all history.
+     *
+     * @param availableServerGtidSet the GTID set currently available on the 
MySQL server
+     * @param purgedServerGtid the GTID set already purged from the MySQL 
server
+     * @param checkpointGtidSet the GTID set restored from checkpoint (after 
source filter applied)
+     * @param gtidSourceFilter optional predicate to filter GTID source UUIDs; 
may be null
+     * @return the merged GTID set suitable for binlog subscription
+     */
+    public static GtidSet computeLatestModeGtidSet(
+            GtidSet availableServerGtidSet,
+            GtidSet purgedServerGtid,
+            GtidSet checkpointGtidSet,
+            Predicate<String> gtidSourceFilter) {
+        final GtidSet relevantAvailableServerGtidSet =
+                (gtidSourceFilter != null)
+                        ? availableServerGtidSet.retainAll(gtidSourceFilter)
+                        : availableServerGtidSet;
+
+        // Step 1: Fix old channels' GTID ranges
+        GtidSet fixedOldChannelsGtid =
+                fixOldChannelsGtidSet(
+                        relevantAvailableServerGtidSet, purgedServerGtid, 
checkpointGtidSet);
+
+        // Step 2: For new channels, use server's full GTID to skip all history
+        GtidSet newChannelsGtid =
+                relevantAvailableServerGtidSet.retainAll(
+                        uuid -> checkpointGtidSet.forServerWithId(uuid) == 
null);
+
+        // Step 3: Merge fixed old channels + new channels
+        return mergeGtidSetInto(fixedOldChannelsGtid, newChannelsGtid);
+    }
+
     /**
      * This method corrects the GTID set that has been restored from a state 
or checkpoint using the
      * GTID set fetched from the server via SHOW MASTER STATUS. During the 
correction process, the
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
index ceecca942..557a149a2 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -87,12 +87,17 @@ import static io.debezium.util.Strings.isNullOrEmpty;
  * Copied from Debezium project(1.9.8.Final) to fix
  * https://github.com/ververica/flink-cdc-connectors/issues/1944.
  *
- * <p>Line 1427-1433 : Adjust GTID merging logic to support recovering from 
job which previously
- * specifying starting offset on start.
+ * <p>Line 1432-1443 : Adjust GTID merging logic to support recovering from 
job which previously
+ * specifying starting offset on start. Uses {@link 
GtidUtils#fixOldChannelsGtidSet} for shared
+ * EARLIEST/LATEST logic.
  *
- * <p>Line 1485 : Add more error details for some exceptions.
+ * <p>Line 1444-1452 : Fix LATEST mode GTID merging to avoid replaying 
pre-checkpoint transactions
+ * when checkpoint GTID has non-contiguous ranges. Delegates to {@link
+ * GtidUtils#computeLatestModeGtidSet}. See FLINK-39149.
  *
- * <p>Line 947-958 : Use iterator instead of index-based loop to avoid O(n²) 
complexity when
+ * <p>Line 1490 : Add more error details for some exceptions.
+ *
+ * <p>Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) 
complexity when
  * processing LinkedList rows in handleChange method. See FLINK-38846.
  */
 public class MySqlStreamingChangeEventSource
@@ -1416,7 +1421,6 @@ public class MySqlStreamingChangeEventSource
         GtidSet mergedGtidSet;
 
         if (connectorConfig.gtidNewChannelPosition() == 
GtidNewChannelPosition.EARLIEST) {
-            final GtidSet knownGtidSet = filteredGtidSet;
             LOGGER.info("Using first available positions for new GTID 
channels");
             final GtidSet relevantAvailableServerGtidSet =
                     (gtidSourceFilter != null)
@@ -1436,14 +1440,16 @@ public class MySqlStreamingChangeEventSource
             // recorded offset in the checkpoint, and the available GTID for 
other MySQL instances
             // should be completed.
             mergedGtidSet =
-                    GtidUtils.fixRestoredGtidSet(
-                            GtidUtils.mergeGtidSetInto(
-                                    relevantAvailableServerGtidSet.retainAll(
-                                            uuid -> 
knownGtidSet.forServerWithId(uuid) != null),
-                                    purgedServerGtid),
-                            filteredGtidSet);
+                    GtidUtils.fixOldChannelsGtidSet(
+                            relevantAvailableServerGtidSet, purgedServerGtid, 
filteredGtidSet);
         } else {
-            mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
+            LOGGER.info("Using latest positions for new GTID channels");
+            mergedGtidSet =
+                    GtidUtils.computeLatestModeGtidSet(
+                            availableServerGtidSet,
+                            purgedServerGtid,
+                            filteredGtidSet,
+                            gtidSourceFilter);
         }
 
         LOGGER.info("Final merged GTID set to use when connecting to MySQL: 
{}", mergedGtidSet);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java
new file mode 100644
index 000000000..d50f2eea0
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.mysql;
+
+import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+/**
+ * Integration test for {@link MySqlStreamingChangeEventSource#filterGtidSet} 
to ensure the LATEST
+ * mode fix (FLINK-39149) cannot regress.
+ */
+class FilterGtidSetTest {
+
+    @Test
+    void testFilterGtidSetLatestModeFixesNonContiguousGtid() throws Exception {
+        MySqlStreamingChangeEventSource source =
+                createSourceWithConfig(GtidNewChannelPosition.LATEST, null);
+
+        MySqlOffsetContext offsetContext = 
createOffsetContext("aaa-111:5000-8000");
+        GtidSet availableServerGtidSet = new 
GtidSet("aaa-111:1-10000,bbb-222:1-3000");
+        GtidSet purgedServerGtid = new GtidSet("");
+
+        GtidSet result =
+                source.filterGtidSet(offsetContext, availableServerGtidSet, 
purgedServerGtid);
+
+        assertThat(result.toString()).contains("aaa-111:1-8000");
+        assertThat(result.toString()).contains("bbb-222:1-3000");
+    }
+
+    @Test
+    void testFilterGtidSetLatestModeWithSourceFilter() throws Exception {
+        Predicate<String> excludeCcc = uuid -> !uuid.equals("ccc-333");
+        MySqlStreamingChangeEventSource source =
+                createSourceWithConfig(GtidNewChannelPosition.LATEST, 
excludeCcc);
+
+        MySqlOffsetContext offsetContext = 
createOffsetContext("aaa-111:5000-8000,bbb-222:1-2000");
+        GtidSet availableServerGtidSet =
+                new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000");
+        GtidSet purgedServerGtid = new GtidSet("");
+
+        GtidSet result =
+                source.filterGtidSet(offsetContext, availableServerGtidSet, 
purgedServerGtid);
+
+        assertThat(result.toString()).contains("aaa-111:1-8000");
+        assertThat(result.toString()).contains("bbb-222:1-2000");
+        assertThat(result.toString()).doesNotContain("ccc-333");
+    }
+
+    @Test
+    void testFilterGtidSetEarliestModeNotAffected() throws Exception {
+        MySqlStreamingChangeEventSource source =
+                createSourceWithConfig(GtidNewChannelPosition.EARLIEST, null);
+
+        MySqlOffsetContext offsetContext = 
createOffsetContext("aaa-111:5000-8000");
+        GtidSet availableServerGtidSet = new 
GtidSet("aaa-111:1-10000,bbb-222:1-3000");
+        GtidSet purgedServerGtid = new GtidSet("");
+
+        GtidSet result =
+                source.filterGtidSet(offsetContext, availableServerGtidSet, 
purgedServerGtid);
+
+        assertThat(result.toString()).contains("aaa-111:1-8000");
+        assertThat(result.forServerWithId("bbb-222")).isNull();
+    }
+
+    @Test
+    void testFilterGtidSetReturnsNullWhenNoGtid() throws Exception {
+        MySqlStreamingChangeEventSource source =
+                createSourceWithConfig(GtidNewChannelPosition.LATEST, null);
+
+        MySqlOffsetContext offsetContext = createOffsetContext(null);
+        GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000");
+        GtidSet purgedServerGtid = new GtidSet("");
+
+        GtidSet result =
+                source.filterGtidSet(offsetContext, availableServerGtidSet, 
purgedServerGtid);
+
+        assertThat(result).isNull();
+    }
+
+    private static MySqlStreamingChangeEventSource createSourceWithConfig(
+            GtidNewChannelPosition channelPosition, Predicate<String> 
gtidSourceFilter)
+            throws Exception {
+        MySqlConnectorConfig mockConfig = 
Mockito.mock(MySqlConnectorConfig.class);
+        when(mockConfig.gtidNewChannelPosition()).thenReturn(channelPosition);
+        when(mockConfig.gtidSourceFilter()).thenReturn(gtidSourceFilter);
+
+        MySqlStreamingChangeEventSource source =
+                Mockito.mock(MySqlStreamingChangeEventSource.class, 
Mockito.CALLS_REAL_METHODS);
+
+        Field configField =
+                
MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig");
+        configField.setAccessible(true);
+        configField.set(source, mockConfig);
+
+        return source;
+    }
+
+    private static MySqlOffsetContext createOffsetContext(String gtidSetStr) {
+        MySqlOffsetContext offsetContext = 
Mockito.mock(MySqlOffsetContext.class);
+        when(offsetContext.gtidSet()).thenReturn(gtidSetStr);
+        return offsetContext;
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
index 88d4b8aba..f0dbe6cd4 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
@@ -22,8 +22,10 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.util.function.Predicate;
 import java.util.stream.Stream;
 
+import static io.debezium.connector.mysql.GtidUtils.computeLatestModeGtidSet;
 import static io.debezium.connector.mysql.GtidUtils.fixRestoredGtidSet;
 import static io.debezium.connector.mysql.GtidUtils.mergeGtidSetInto;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -83,6 +85,75 @@ class GtidUtilsTest {
                         "A:1-20:30-35:45-50:60-65:75-80"));
     }
 
+    /** Tests {@link GtidUtils#computeLatestModeGtidSet} for FLINK-39149. */
+    @ParameterizedTest(name = "{0}")
+    @MethodSource("latestModeGtidSetsProvider")
+    void testLatestModeGtidMerge(
+            String description,
+            String serverGtidStr,
+            String checkpointGtidStr,
+            String expectedMergedStr) {
+        GtidSet serverGtidSet = new GtidSet(serverGtidStr);
+        GtidSet checkpointGtidSet = new GtidSet(checkpointGtidStr);
+
+        GtidSet mergedGtidSet =
+                computeLatestModeGtidSet(serverGtidSet, new GtidSet(""), 
checkpointGtidSet, null);
+
+        assertThat(mergedGtidSet).hasToString(expectedMergedStr);
+
+        // Verify MySQL would not replay pre-checkpoint transactions
+        GtidSet transactionsToSend = serverGtidSet.subtract(mergedGtidSet);
+        for (GtidSet.UUIDSet uuidSet : checkpointGtidSet.getUUIDSets()) {
+            String uuid = uuidSet.getUUID();
+            long earliestCheckpointTx =
+                    uuidSet.getIntervals().stream()
+                            .mapToLong(GtidSet.Interval::getStart)
+                            .min()
+                            .orElse(1);
+            if (earliestCheckpointTx > 1) {
+                GtidSet.UUIDSet toSendUuidSet = 
transactionsToSend.forServerWithId(uuid);
+                if (toSendUuidSet != null) {
+                    for (GtidSet.Interval interval : 
toSendUuidSet.getIntervals()) {
+                        assertThat(interval.getStart())
+                                .as(
+                                        "Should not replay pre-checkpoint 
transactions for UUID %s",
+                                        uuid)
+                                .isGreaterThan(earliestCheckpointTx);
+                    }
+                }
+            }
+        }
+    }
+
+    private static Stream<Arguments> latestModeGtidSetsProvider() {
+        return Stream.of(
+                Arguments.of(
+                        "Old channel with non-contiguous GTID, new channel 
present",
+                        "aaa-111:1-10000,bbb-222:1-3000",
+                        "aaa-111:5000-8000",
+                        "aaa-111:1-8000,bbb-222:1-3000"),
+                Arguments.of(
+                        "Mixed old channels (contiguous and non-contiguous) 
with new channel",
+                        "aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000",
+                        "aaa-111:5000-8000,bbb-222:1-2000",
+                        "aaa-111:1-8000,bbb-222:1-2000,ccc-333:1-5000"),
+                Arguments.of(
+                        "All old channels, no new channels",
+                        "aaa-111:1-10000,bbb-222:1-3000",
+                        "aaa-111:1-8000,bbb-222:1-2000",
+                        "aaa-111:1-8000,bbb-222:1-2000"),
+                Arguments.of(
+                        "Contiguous checkpoint GTID, no regression",
+                        "aaa-111:1-10000,bbb-222:1-3000",
+                        "aaa-111:1-8000",
+                        "aaa-111:1-8000,bbb-222:1-3000"),
+                Arguments.of(
+                        "Only new channels, checkpoint has unknown UUID",
+                        "aaa-111:1-10000,bbb-222:1-3000",
+                        "xxx-999:1-500",
+                        "aaa-111:1-10000,bbb-222:1-3000,xxx-999:1-500"));
+    }
+
     @Test
     void testMergingGtidSets() {
         GtidSet base = new GtidSet("A:1-100");
@@ -96,4 +167,65 @@ class GtidUtilsTest {
         toMerge = new GtidSet("A:1-10,C:1-10");
         assertThat(mergeGtidSetInto(base, 
toMerge)).hasToString("A:1-100,B:1-100,C:1-10");
     }
+
+    /** Tests {@link GtidUtils#computeLatestModeGtidSet} with {@code 
gtidSourceFilter}. */
+    @Test
+    void testLatestModeGtidMergeWithSourceFilter() {
+        GtidSet availableServerGtidSet =
+                new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000");
+        GtidSet checkpointGtidSet = new 
GtidSet("aaa-111:5000-8000,bbb-222:1-2000");
+        Predicate<String> gtidSourceFilter = uuid -> !uuid.equals("ccc-333");
+
+        GtidSet mergedGtidSet =
+                computeLatestModeGtidSet(
+                        availableServerGtidSet,
+                        new GtidSet(""),
+                        checkpointGtidSet,
+                        gtidSourceFilter);
+
+        assertThat(mergedGtidSet.toString()).contains("aaa-111:1-8000");
+        assertThat(mergedGtidSet.toString()).contains("bbb-222:1-2000");
+        assertThat(mergedGtidSet.toString()).doesNotContain("ccc-333");
+    }
+
+    /** Tests {@link GtidUtils#computeLatestModeGtidSet} with purged GTID. */
+    @Test
+    void testLatestModeGtidMergeWithPurgedGtid() {
+        GtidSet availableServerGtidSet = new 
GtidSet("aaa-111:50-10000,bbb-222:1-3000");
+        GtidSet purgedServerGtid = new GtidSet("aaa-111:1-49");
+        GtidSet checkpointGtidSet = new GtidSet("aaa-111:5000-8000");
+
+        GtidSet mergedGtidSet =
+                computeLatestModeGtidSet(
+                        availableServerGtidSet, purgedServerGtid, 
checkpointGtidSet, null);
+
+        assertThat(mergedGtidSet.toString()).contains("aaa-111:50-8000");
+        assertThat(mergedGtidSet.toString()).contains("bbb-222:1-3000");
+
+        // Verify no pre-checkpoint replay
+        GtidSet transactionsToSend = 
availableServerGtidSet.subtract(mergedGtidSet);
+        GtidSet.UUIDSet aaaToSend = 
transactionsToSend.forServerWithId("aaa-111");
+        if (aaaToSend != null) {
+            for (GtidSet.Interval interval : aaaToSend.getIntervals()) {
+                assertThat(interval.getStart())
+                        .as("Should not request pre-checkpoint transactions")
+                        .isGreaterThanOrEqualTo(8001);
+            }
+        }
+    }
+
+    /** Tests {@link GtidUtils#computeLatestModeGtidSet} with a completely 
purged UUID. */
+    @Test
+    void testLatestModeGtidMergeWithFullyPurgedChannel() {
+        GtidSet availableServerGtidSet = new GtidSet("bbb-222:1-3000");
+        GtidSet purgedServerGtid = new GtidSet("aaa-111:1-500");
+        GtidSet checkpointGtidSet = new GtidSet("aaa-111:200-400");
+
+        GtidSet mergedGtidSet =
+                computeLatestModeGtidSet(
+                        availableServerGtidSet, purgedServerGtid, 
checkpointGtidSet, null);
+
+        assertThat(mergedGtidSet.toString()).contains("aaa-111:1-400");
+        assertThat(mergedGtidSet.toString()).contains("bbb-222:1-3000");
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
index 279d6fb3a..14b3d96f1 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -89,12 +89,17 @@ import static io.debezium.util.Strings.isNullOrEmpty;
  * <p>Copied from Debezium project(1.9.8.Final) to fix
  * https://github.com/ververica/flink-cdc-connectors/issues/1944.
  *
- * <p>Line 1427-1433 : Adjust GTID merging logic to support recovering from 
job which previously
- * specifying starting offset on start.
+ * <p>Line 1438-1449 : Adjust GTID merging logic to support recovering from 
job which previously
+ * specifying starting offset on start. Uses {@link 
GtidUtils#fixOldChannelsGtidSet} for shared
+ * EARLIEST/LATEST logic.
  *
- * <p>Line 1485 : Add more error details for some exceptions.
+ * <p>Line 1450-1458 : Fix LATEST mode GTID merging to avoid replaying 
pre-checkpoint transactions
+ * when checkpoint GTID has non-contiguous ranges. Delegates to {@link
+ * GtidUtils#computeLatestModeGtidSet}. See FLINK-39149.
  *
- * <p>Line 951-964 : Use iterator instead of index-based loop to avoid O(n²) 
complexity when
+ * <p>Line 1496 : Add more error details for some exceptions.
+ *
+ * <p>Line 956-968 : Use iterator instead of index-based loop to avoid O(n²) 
complexity when
  * processing LinkedList rows in handleChange method. See FLINK-38846.
  */
 public class MySqlStreamingChangeEventSource
@@ -1421,7 +1426,6 @@ public class MySqlStreamingChangeEventSource
         GtidSet mergedGtidSet;
 
         if (connectorConfig.gtidNewChannelPosition() == 
GtidNewChannelPosition.EARLIEST) {
-            final GtidSet knownGtidSet = filteredGtidSet;
             LOGGER.info("Using first available positions for new GTID 
channels");
             final GtidSet relevantAvailableServerGtidSet =
                     (gtidSourceFilter != null)
@@ -1441,14 +1445,16 @@ public class MySqlStreamingChangeEventSource
             // recorded offset in the checkpoint, and the available GTID for 
other MySQL instances
             // should be completed.
             mergedGtidSet =
-                    GtidUtils.fixRestoredGtidSet(
-                            GtidUtils.mergeGtidSetInto(
-                                    relevantAvailableServerGtidSet.retainAll(
-                                            uuid -> 
knownGtidSet.forServerWithId(uuid) != null),
-                                    purgedServerGtid),
-                            filteredGtidSet);
+                    GtidUtils.fixOldChannelsGtidSet(
+                            relevantAvailableServerGtidSet, purgedServerGtid, 
filteredGtidSet);
         } else {
-            mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
+            LOGGER.info("Using latest positions for new GTID channels");
+            mergedGtidSet =
+                    GtidUtils.computeLatestModeGtidSet(
+                            availableServerGtidSet,
+                            purgedServerGtid,
+                            filteredGtidSet,
+                            gtidSourceFilter);
         }
 
         LOGGER.info("Final merged GTID set to use when connecting to MySQL: 
{}", mergedGtidSet);

Reply via email to