This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new c4b698bff [FLINK-39149][mysql-cdc] Implement LATEST mode GTID merging
to prevent replaying pre-checkpoint transactions with non-contiguous ranges
(#4286)
c4b698bff is described below
commit c4b698bffb692f480622123fec36494ec505929c
Author: Jia Fan <[email protected]>
AuthorDate: Thu Mar 19 15:03:42 2026 +0800
[FLINK-39149][mysql-cdc] Implement LATEST mode GTID merging to prevent
replaying pre-checkpoint transactions with non-contiguous ranges (#4286)
---
.../flink-connector-mysql-cdc/pom.xml | 11 ++
.../io/debezium/connector/mysql/GtidUtils.java | 60 ++++++++++
.../mysql/MySqlStreamingChangeEventSource.java | 30 +++--
.../connector/mysql/FilterGtidSetTest.java | 125 +++++++++++++++++++
.../io/debezium/connector/mysql/GtidUtilsTest.java | 132 +++++++++++++++++++++
.../mysql/MySqlStreamingChangeEventSource.java | 30 +++--
6 files changed, 364 insertions(+), 24 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
index 5ce4f9a52..026181af5 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml
@@ -29,6 +29,10 @@ limitations under the License.
<name>flink-connector-mysql-cdc</name>
<packaging>jar</packaging>
+ <properties>
+ <mockito.version>3.12.4</mockito.version>
+ </properties>
+
<dependencies>
<!-- Debezium dependencies -->
@@ -195,6 +199,13 @@ limitations under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
index f6c4987e7..5956de9b3 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
@@ -23,10 +23,70 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Predicate;
/** Utils for handling GTIDs. */
public class GtidUtils {
+ /**
+ * Fixes old channels' GTID ranges by filling prefix gaps using server
GTID intervals.
+ *
+ * <p>This is the shared logic between EARLIEST and LATEST modes. For
UUIDs present in the
+ * checkpoint, non-contiguous GTID ranges are corrected via {@link
#fixRestoredGtidSet} to avoid
+ * MySQL replaying pre-checkpoint transactions.
+ *
+ * @param availableServerGtidSet the relevant (filtered) server GTID set
+ * @param purgedServerGtid the GTID set already purged from the MySQL
server
+ * @param checkpointGtidSet the GTID set restored from checkpoint
+ * @return the fixed GTID set for old channels
+ */
+ public static GtidSet fixOldChannelsGtidSet(
+ GtidSet availableServerGtidSet, GtidSet purgedServerGtid, GtidSet
checkpointGtidSet) {
+ return fixRestoredGtidSet(
+ mergeGtidSetInto(
+ availableServerGtidSet.retainAll(
+ uuid ->
checkpointGtidSet.forServerWithId(uuid) != null),
+ purgedServerGtid),
+ checkpointGtidSet);
+ }
+
+ /**
+ * Computes the merged GTID set for the LATEST new-channel-position mode.
+ *
+ * <p>For old channels (UUIDs present in checkpoint), non-contiguous GTID
ranges are fixed via
+ * {@link #fixOldChannelsGtidSet}. For new channels (UUIDs not in
checkpoint), the server's full
+ * GTID is used to skip all history.
+ *
+ * @param availableServerGtidSet the GTID set currently available on the
MySQL server
+ * @param purgedServerGtid the GTID set already purged from the MySQL
server
+ * @param checkpointGtidSet the GTID set restored from checkpoint (after
source filter applied)
+ * @param gtidSourceFilter optional predicate to filter GTID source UUIDs;
may be null
+ * @return the merged GTID set suitable for binlog subscription
+ */
+ public static GtidSet computeLatestModeGtidSet(
+ GtidSet availableServerGtidSet,
+ GtidSet purgedServerGtid,
+ GtidSet checkpointGtidSet,
+ Predicate<String> gtidSourceFilter) {
+ final GtidSet relevantAvailableServerGtidSet =
+ (gtidSourceFilter != null)
+ ? availableServerGtidSet.retainAll(gtidSourceFilter)
+ : availableServerGtidSet;
+
+ // Step 1: Fix old channels' GTID ranges
+ GtidSet fixedOldChannelsGtid =
+ fixOldChannelsGtidSet(
+ relevantAvailableServerGtidSet, purgedServerGtid,
checkpointGtidSet);
+
+ // Step 2: For new channels, use server's full GTID to skip all history
+ GtidSet newChannelsGtid =
+ relevantAvailableServerGtidSet.retainAll(
+ uuid -> checkpointGtidSet.forServerWithId(uuid) ==
null);
+
+ // Step 3: Merge fixed old channels + new channels
+ return mergeGtidSetInto(fixedOldChannelsGtid, newChannelsGtid);
+ }
+
/**
* This method corrects the GTID set that has been restored from a state
or checkpoint using the
* GTID set fetched from the server via SHOW MASTER STATUS. During the
correction process, the
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
index ceecca942..557a149a2 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -87,12 +87,17 @@ import static io.debezium.util.Strings.isNullOrEmpty;
* Copied from Debezium project(1.9.8.Final) to fix
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
*
- * <p>Line 1427-1433 : Adjust GTID merging logic to support recovering from
job which previously
- * specifying starting offset on start.
+ * <p>Line 1432-1443 : Adjust GTID merging logic to support recovering from
job which previously
+ * specifying starting offset on start. Uses {@link
GtidUtils#fixOldChannelsGtidSet} for shared
+ * EARLIEST/LATEST logic.
*
- * <p>Line 1485 : Add more error details for some exceptions.
+ * <p>Line 1444-1452 : Fix LATEST mode GTID merging to avoid replaying
pre-checkpoint transactions
+ * when checkpoint GTID has non-contiguous ranges. Delegates to {@link
+ * GtidUtils#computeLatestModeGtidSet}. See FLINK-39149.
*
- * <p>Line 947-958 : Use iterator instead of index-based loop to avoid O(n²)
complexity when
+ * <p>Line 1490 : Add more error details for some exceptions.
+ *
+ * <p>Line 951-963 : Use iterator instead of index-based loop to avoid O(n²)
complexity when
* processing LinkedList rows in handleChange method. See FLINK-38846.
*/
public class MySqlStreamingChangeEventSource
@@ -1416,7 +1421,6 @@ public class MySqlStreamingChangeEventSource
GtidSet mergedGtidSet;
if (connectorConfig.gtidNewChannelPosition() ==
GtidNewChannelPosition.EARLIEST) {
- final GtidSet knownGtidSet = filteredGtidSet;
LOGGER.info("Using first available positions for new GTID
channels");
final GtidSet relevantAvailableServerGtidSet =
(gtidSourceFilter != null)
@@ -1436,14 +1440,16 @@ public class MySqlStreamingChangeEventSource
// recorded offset in the checkpoint, and the available GTID for
other MySQL instances
// should be completed.
mergedGtidSet =
- GtidUtils.fixRestoredGtidSet(
- GtidUtils.mergeGtidSetInto(
- relevantAvailableServerGtidSet.retainAll(
- uuid ->
knownGtidSet.forServerWithId(uuid) != null),
- purgedServerGtid),
- filteredGtidSet);
+ GtidUtils.fixOldChannelsGtidSet(
+ relevantAvailableServerGtidSet, purgedServerGtid,
filteredGtidSet);
} else {
- mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
+ LOGGER.info("Using latest positions for new GTID channels");
+ mergedGtidSet =
+ GtidUtils.computeLatestModeGtidSet(
+ availableServerGtidSet,
+ purgedServerGtid,
+ filteredGtidSet,
+ gtidSourceFilter);
}
LOGGER.info("Final merged GTID set to use when connecting to MySQL:
{}", mergedGtidSet);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java
new file mode 100644
index 000000000..d50f2eea0
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.debezium.connector.mysql;
+
+import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+/**
+ * Integration test for {@link MySqlStreamingChangeEventSource#filterGtidSet}
to ensure the LATEST
+ * mode fix (FLINK-39149) cannot regress.
+ */
+class FilterGtidSetTest {
+
+ @Test
+ void testFilterGtidSetLatestModeFixesNonContiguousGtid() throws Exception {
+ MySqlStreamingChangeEventSource source =
+ createSourceWithConfig(GtidNewChannelPosition.LATEST, null);
+
+ MySqlOffsetContext offsetContext =
createOffsetContext("aaa-111:5000-8000");
+ GtidSet availableServerGtidSet = new
GtidSet("aaa-111:1-10000,bbb-222:1-3000");
+ GtidSet purgedServerGtid = new GtidSet("");
+
+ GtidSet result =
+ source.filterGtidSet(offsetContext, availableServerGtidSet,
purgedServerGtid);
+
+ assertThat(result.toString()).contains("aaa-111:1-8000");
+ assertThat(result.toString()).contains("bbb-222:1-3000");
+ }
+
+ @Test
+ void testFilterGtidSetLatestModeWithSourceFilter() throws Exception {
+ Predicate<String> excludeCcc = uuid -> !uuid.equals("ccc-333");
+ MySqlStreamingChangeEventSource source =
+ createSourceWithConfig(GtidNewChannelPosition.LATEST,
excludeCcc);
+
+ MySqlOffsetContext offsetContext =
createOffsetContext("aaa-111:5000-8000,bbb-222:1-2000");
+ GtidSet availableServerGtidSet =
+ new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000");
+ GtidSet purgedServerGtid = new GtidSet("");
+
+ GtidSet result =
+ source.filterGtidSet(offsetContext, availableServerGtidSet,
purgedServerGtid);
+
+ assertThat(result.toString()).contains("aaa-111:1-8000");
+ assertThat(result.toString()).contains("bbb-222:1-2000");
+ assertThat(result.toString()).doesNotContain("ccc-333");
+ }
+
+ @Test
+ void testFilterGtidSetEarliestModeNotAffected() throws Exception {
+ MySqlStreamingChangeEventSource source =
+ createSourceWithConfig(GtidNewChannelPosition.EARLIEST, null);
+
+ MySqlOffsetContext offsetContext =
createOffsetContext("aaa-111:5000-8000");
+ GtidSet availableServerGtidSet = new
GtidSet("aaa-111:1-10000,bbb-222:1-3000");
+ GtidSet purgedServerGtid = new GtidSet("");
+
+ GtidSet result =
+ source.filterGtidSet(offsetContext, availableServerGtidSet,
purgedServerGtid);
+
+ assertThat(result.toString()).contains("aaa-111:1-8000");
+ assertThat(result.forServerWithId("bbb-222")).isNull();
+ }
+
+ @Test
+ void testFilterGtidSetReturnsNullWhenNoGtid() throws Exception {
+ MySqlStreamingChangeEventSource source =
+ createSourceWithConfig(GtidNewChannelPosition.LATEST, null);
+
+ MySqlOffsetContext offsetContext = createOffsetContext(null);
+ GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000");
+ GtidSet purgedServerGtid = new GtidSet("");
+
+ GtidSet result =
+ source.filterGtidSet(offsetContext, availableServerGtidSet,
purgedServerGtid);
+
+ assertThat(result).isNull();
+ }
+
+ private static MySqlStreamingChangeEventSource createSourceWithConfig(
+ GtidNewChannelPosition channelPosition, Predicate<String>
gtidSourceFilter)
+ throws Exception {
+ MySqlConnectorConfig mockConfig =
Mockito.mock(MySqlConnectorConfig.class);
+ when(mockConfig.gtidNewChannelPosition()).thenReturn(channelPosition);
+ when(mockConfig.gtidSourceFilter()).thenReturn(gtidSourceFilter);
+
+ MySqlStreamingChangeEventSource source =
+ Mockito.mock(MySqlStreamingChangeEventSource.class,
Mockito.CALLS_REAL_METHODS);
+
+ Field configField =
+
MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig");
+ configField.setAccessible(true);
+ configField.set(source, mockConfig);
+
+ return source;
+ }
+
+ private static MySqlOffsetContext createOffsetContext(String gtidSetStr) {
+ MySqlOffsetContext offsetContext =
Mockito.mock(MySqlOffsetContext.class);
+ when(offsetContext.gtidSet()).thenReturn(gtidSetStr);
+ return offsetContext;
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
index 88d4b8aba..f0dbe6cd4 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
@@ -22,8 +22,10 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import java.util.function.Predicate;
import java.util.stream.Stream;
+import static io.debezium.connector.mysql.GtidUtils.computeLatestModeGtidSet;
import static io.debezium.connector.mysql.GtidUtils.fixRestoredGtidSet;
import static io.debezium.connector.mysql.GtidUtils.mergeGtidSetInto;
import static org.assertj.core.api.Assertions.assertThat;
@@ -83,6 +85,75 @@ class GtidUtilsTest {
"A:1-20:30-35:45-50:60-65:75-80"));
}
+ /** Tests {@link GtidUtils#computeLatestModeGtidSet} for FLINK-39149. */
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("latestModeGtidSetsProvider")
+ void testLatestModeGtidMerge(
+ String description,
+ String serverGtidStr,
+ String checkpointGtidStr,
+ String expectedMergedStr) {
+ GtidSet serverGtidSet = new GtidSet(serverGtidStr);
+ GtidSet checkpointGtidSet = new GtidSet(checkpointGtidStr);
+
+ GtidSet mergedGtidSet =
+ computeLatestModeGtidSet(serverGtidSet, new GtidSet(""),
checkpointGtidSet, null);
+
+ assertThat(mergedGtidSet).hasToString(expectedMergedStr);
+
+ // Verify MySQL would not replay pre-checkpoint transactions
+ GtidSet transactionsToSend = serverGtidSet.subtract(mergedGtidSet);
+ for (GtidSet.UUIDSet uuidSet : checkpointGtidSet.getUUIDSets()) {
+ String uuid = uuidSet.getUUID();
+ long earliestCheckpointTx =
+ uuidSet.getIntervals().stream()
+ .mapToLong(GtidSet.Interval::getStart)
+ .min()
+ .orElse(1);
+ if (earliestCheckpointTx > 1) {
+ GtidSet.UUIDSet toSendUuidSet =
transactionsToSend.forServerWithId(uuid);
+ if (toSendUuidSet != null) {
+ for (GtidSet.Interval interval :
toSendUuidSet.getIntervals()) {
+ assertThat(interval.getStart())
+ .as(
+ "Should not replay pre-checkpoint
transactions for UUID %s",
+ uuid)
+ .isGreaterThan(earliestCheckpointTx);
+ }
+ }
+ }
+ }
+ }
+
+ private static Stream<Arguments> latestModeGtidSetsProvider() {
+ return Stream.of(
+ Arguments.of(
+ "Old channel with non-contiguous GTID, new channel
present",
+ "aaa-111:1-10000,bbb-222:1-3000",
+ "aaa-111:5000-8000",
+ "aaa-111:1-8000,bbb-222:1-3000"),
+ Arguments.of(
+ "Mixed old channels (contiguous and non-contiguous)
with new channel",
+ "aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000",
+ "aaa-111:5000-8000,bbb-222:1-2000",
+ "aaa-111:1-8000,bbb-222:1-2000,ccc-333:1-5000"),
+ Arguments.of(
+ "All old channels, no new channels",
+ "aaa-111:1-10000,bbb-222:1-3000",
+ "aaa-111:1-8000,bbb-222:1-2000",
+ "aaa-111:1-8000,bbb-222:1-2000"),
+ Arguments.of(
+ "Contiguous checkpoint GTID, no regression",
+ "aaa-111:1-10000,bbb-222:1-3000",
+ "aaa-111:1-8000",
+ "aaa-111:1-8000,bbb-222:1-3000"),
+ Arguments.of(
+ "Only new channels, checkpoint has unknown UUID",
+ "aaa-111:1-10000,bbb-222:1-3000",
+ "xxx-999:1-500",
+ "aaa-111:1-10000,bbb-222:1-3000,xxx-999:1-500"));
+ }
+
@Test
void testMergingGtidSets() {
GtidSet base = new GtidSet("A:1-100");
@@ -96,4 +167,65 @@ class GtidUtilsTest {
toMerge = new GtidSet("A:1-10,C:1-10");
assertThat(mergeGtidSetInto(base,
toMerge)).hasToString("A:1-100,B:1-100,C:1-10");
}
+
+ /** Tests {@link GtidUtils#computeLatestModeGtidSet} with {@code
gtidSourceFilter}. */
+ @Test
+ void testLatestModeGtidMergeWithSourceFilter() {
+ GtidSet availableServerGtidSet =
+ new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000");
+ GtidSet checkpointGtidSet = new
GtidSet("aaa-111:5000-8000,bbb-222:1-2000");
+ Predicate<String> gtidSourceFilter = uuid -> !uuid.equals("ccc-333");
+
+ GtidSet mergedGtidSet =
+ computeLatestModeGtidSet(
+ availableServerGtidSet,
+ new GtidSet(""),
+ checkpointGtidSet,
+ gtidSourceFilter);
+
+ assertThat(mergedGtidSet.toString()).contains("aaa-111:1-8000");
+ assertThat(mergedGtidSet.toString()).contains("bbb-222:1-2000");
+ assertThat(mergedGtidSet.toString()).doesNotContain("ccc-333");
+ }
+
+ /** Tests {@link GtidUtils#computeLatestModeGtidSet} with purged GTID. */
+ @Test
+ void testLatestModeGtidMergeWithPurgedGtid() {
+ GtidSet availableServerGtidSet = new
GtidSet("aaa-111:50-10000,bbb-222:1-3000");
+ GtidSet purgedServerGtid = new GtidSet("aaa-111:1-49");
+ GtidSet checkpointGtidSet = new GtidSet("aaa-111:5000-8000");
+
+ GtidSet mergedGtidSet =
+ computeLatestModeGtidSet(
+ availableServerGtidSet, purgedServerGtid,
checkpointGtidSet, null);
+
+ assertThat(mergedGtidSet.toString()).contains("aaa-111:50-8000");
+ assertThat(mergedGtidSet.toString()).contains("bbb-222:1-3000");
+
+ // Verify no pre-checkpoint replay
+ GtidSet transactionsToSend =
availableServerGtidSet.subtract(mergedGtidSet);
+ GtidSet.UUIDSet aaaToSend =
transactionsToSend.forServerWithId("aaa-111");
+ if (aaaToSend != null) {
+ for (GtidSet.Interval interval : aaaToSend.getIntervals()) {
+ assertThat(interval.getStart())
+ .as("Should not request pre-checkpoint transactions")
+ .isGreaterThanOrEqualTo(8001);
+ }
+ }
+ }
+
+ /** Tests {@link GtidUtils#computeLatestModeGtidSet} with a completely
purged UUID. */
+ @Test
+ void testLatestModeGtidMergeWithFullyPurgedChannel() {
+ GtidSet availableServerGtidSet = new GtidSet("bbb-222:1-3000");
+ GtidSet purgedServerGtid = new GtidSet("aaa-111:1-500");
+ GtidSet checkpointGtidSet = new GtidSet("aaa-111:200-400");
+
+ GtidSet mergedGtidSet =
+ computeLatestModeGtidSet(
+ availableServerGtidSet, purgedServerGtid,
checkpointGtidSet, null);
+
+ assertThat(mergedGtidSet.toString()).contains("aaa-111:1-400");
+ assertThat(mergedGtidSet.toString()).contains("bbb-222:1-3000");
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
index 279d6fb3a..14b3d96f1 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -89,12 +89,17 @@ import static io.debezium.util.Strings.isNullOrEmpty;
* <p>Copied from Debezium project(1.9.8.Final) to fix
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
*
- * <p>Line 1427-1433 : Adjust GTID merging logic to support recovering from
job which previously
- * specifying starting offset on start.
+ * <p>Line 1438-1449 : Adjust GTID merging logic to support recovering from
job which previously
+ * specifying starting offset on start. Uses {@link
GtidUtils#fixOldChannelsGtidSet} for shared
+ * EARLIEST/LATEST logic.
*
- * <p>Line 1485 : Add more error details for some exceptions.
+ * <p>Line 1450-1458 : Fix LATEST mode GTID merging to avoid replaying
pre-checkpoint transactions
+ * when checkpoint GTID has non-contiguous ranges. Delegates to {@link
+ * GtidUtils#computeLatestModeGtidSet}. See FLINK-39149.
*
- * <p>Line 951-964 : Use iterator instead of index-based loop to avoid O(n²)
complexity when
+ * <p>Line 1496 : Add more error details for some exceptions.
+ *
+ * <p>Line 956-968 : Use iterator instead of index-based loop to avoid O(n²)
complexity when
* processing LinkedList rows in handleChange method. See FLINK-38846.
*/
public class MySqlStreamingChangeEventSource
@@ -1421,7 +1426,6 @@ public class MySqlStreamingChangeEventSource
GtidSet mergedGtidSet;
if (connectorConfig.gtidNewChannelPosition() ==
GtidNewChannelPosition.EARLIEST) {
- final GtidSet knownGtidSet = filteredGtidSet;
LOGGER.info("Using first available positions for new GTID
channels");
final GtidSet relevantAvailableServerGtidSet =
(gtidSourceFilter != null)
@@ -1441,14 +1445,16 @@ public class MySqlStreamingChangeEventSource
// recorded offset in the checkpoint, and the available GTID for
other MySQL instances
// should be completed.
mergedGtidSet =
- GtidUtils.fixRestoredGtidSet(
- GtidUtils.mergeGtidSetInto(
- relevantAvailableServerGtidSet.retainAll(
- uuid ->
knownGtidSet.forServerWithId(uuid) != null),
- purgedServerGtid),
- filteredGtidSet);
+ GtidUtils.fixOldChannelsGtidSet(
+ relevantAvailableServerGtidSet, purgedServerGtid,
filteredGtidSet);
} else {
- mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
+ LOGGER.info("Using latest positions for new GTID channels");
+ mergedGtidSet =
+ GtidUtils.computeLatestModeGtidSet(
+ availableServerGtidSet,
+ purgedServerGtid,
+ filteredGtidSet,
+ gtidSourceFilter);
}
LOGGER.info("Final merged GTID set to use when connecting to MySQL:
{}", mergedGtidSet);