lvyanquan commented on code in PR #3845:
URL: https://github.com/apache/flink-cdc/pull/3845#discussion_r2278148619


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java:
##########
@@ -36,36 +38,64 @@ public class GtidUtils {
     public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet 
restoredGtidSet) {
         Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();
         serverGtidSet.getUUIDSets().forEach(uuidSet -> 
newSet.put(uuidSet.getUUID(), uuidSet));
-        for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {
-            GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());
+        for (GtidSet.UUIDSet restoredUuidSet : restoredGtidSet.getUUIDSets()) {
+            GtidSet.UUIDSet serverUuidSet = 
newSet.get(restoredUuidSet.getUUID());
             if (serverUuidSet != null) {
-                long restoredIntervalEnd = getIntervalEnd(uuidSet);
-                List<com.github.shyiko.mysql.binlog.GtidSet.Interval> 
newIntervals =
-                        new ArrayList<>();
-                for (GtidSet.Interval serverInterval : 
serverUuidSet.getIntervals()) {
-                    if (serverInterval.getEnd() <= restoredIntervalEnd) {
-                        newIntervals.add(
-                                new 
com.github.shyiko.mysql.binlog.GtidSet.Interval(
-                                        serverInterval.getStart(), 
serverInterval.getEnd()));
-                    } else if (serverInterval.getStart() <= restoredIntervalEnd
-                            && serverInterval.getEnd() > restoredIntervalEnd) {
-                        newIntervals.add(
+                List<GtidSet.Interval> serverIntervals = 
serverUuidSet.getIntervals();
+                List<GtidSet.Interval> restoredIntervals = 
restoredUuidSet.getIntervals();
+
+                long earliestRestoredTx = 
getMinIntervalStart(restoredIntervals);
+
+                List<com.github.shyiko.mysql.binlog.GtidSet.Interval> merged = 
new ArrayList<>();
+
+                // Process each server interval
+                for (GtidSet.Interval serverInterval : serverIntervals) {
+                    // First, check if any part comes before earliest restored
+                    if (serverInterval.getStart() < earliestRestoredTx) {
+                        long end = Math.min(serverInterval.getEnd(), 
earliestRestoredTx - 1);
+                        merged.add(
                                 new 
com.github.shyiko.mysql.binlog.GtidSet.Interval(
-                                        serverInterval.getStart(), 
restoredIntervalEnd));
+                                        serverInterval.getStart(), end));

Review Comment:
   The operation here likely arises because the GTIDs in our state are merged 
from binlog events and do not include GTIDs that were executed prior to the 
current binlog entries.
   Thus, if we can use the `PreviousGtidsEvent` to supplement the GTIDs that 
were already executed, would this operation become unnecessary? (Of course, we 
must also consider compatibility with historical jobs—removing it could lead to 
redundant data retrieval.)
   
   Example code for handling `PreviousGtidsEvent` in 
`MySqlStreamingChangeEventSource`:
   ```
       protected void handlePreviousGtidsEvent(MySqlOffsetContext 
offsetContext, Event event) {
           PreviousGtidSetEventData previousGtidSetEventData = 
unwrapData(event);
           LOGGER.info(
                   "Handling PREVIOUS_GTID_EVENT: {}. Current GTID set: {}",
                   previousGtidSetEventData,
                   gtidSet);
           com.github.shyiko.mysql.binlog.GtidSet previousGtidSet =
                   new 
com.github.shyiko.mysql.binlog.GtidSet(previousGtidSetEventData.getGtidSet());
           if (previousGtidSet.isContainedWithin(gtidSet)) {
               LOGGER.info(
                       "Current GTID set already contains previous GTID set and 
will not be updated.");
               return;
           }
           LOGGER.info(
                   "Updating GTID set with previous GTID set: {}",
                   previousGtidSetEventData.getGtidSet());
           gtidSet = previousGtidSet;
           offsetContext.setCompletedGtidSet(gtidSet.toString());
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to