lvyanquan commented on code in PR #3845:
URL: https://github.com/apache/flink-cdc/pull/3845#discussion_r2278148619
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java:
##########
@@ -36,36 +38,64 @@ public class GtidUtils {
public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet
restoredGtidSet) {
Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();
serverGtidSet.getUUIDSets().forEach(uuidSet ->
newSet.put(uuidSet.getUUID(), uuidSet));
- for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {
- GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());
+ for (GtidSet.UUIDSet restoredUuidSet : restoredGtidSet.getUUIDSets()) {
+ GtidSet.UUIDSet serverUuidSet =
newSet.get(restoredUuidSet.getUUID());
if (serverUuidSet != null) {
- long restoredIntervalEnd = getIntervalEnd(uuidSet);
- List<com.github.shyiko.mysql.binlog.GtidSet.Interval>
newIntervals =
- new ArrayList<>();
- for (GtidSet.Interval serverInterval :
serverUuidSet.getIntervals()) {
- if (serverInterval.getEnd() <= restoredIntervalEnd) {
- newIntervals.add(
- new
com.github.shyiko.mysql.binlog.GtidSet.Interval(
- serverInterval.getStart(),
serverInterval.getEnd()));
- } else if (serverInterval.getStart() <= restoredIntervalEnd
- && serverInterval.getEnd() > restoredIntervalEnd) {
- newIntervals.add(
+ List<GtidSet.Interval> serverIntervals =
serverUuidSet.getIntervals();
+ List<GtidSet.Interval> restoredIntervals =
restoredUuidSet.getIntervals();
+
+ long earliestRestoredTx =
getMinIntervalStart(restoredIntervals);
+
+ List<com.github.shyiko.mysql.binlog.GtidSet.Interval> merged =
new ArrayList<>();
+
+ // Process each server interval
+ for (GtidSet.Interval serverInterval : serverIntervals) {
+ // First, check if any part comes before earliest restored
+ if (serverInterval.getStart() < earliestRestoredTx) {
+ long end = Math.min(serverInterval.getEnd(),
earliestRestoredTx - 1);
+ merged.add(
new
com.github.shyiko.mysql.binlog.GtidSet.Interval(
- serverInterval.getStart(),
restoredIntervalEnd));
+ serverInterval.getStart(), end));
Review Comment:
The operation here likely arises because the GTIDs in our state are merged
from binlog events and do not include GTIDs that were executed prior to the
current binlog entries.
Thus, if we can use the `PreviousGtidsEvent` to supplement the GTIDs that
were already executed, would this operation become unnecessary? (Of course, we
must also consider compatibility with historical jobs—removing it could lead to
redundant data retrieval.)
Example code for handling `PreviousGtidsEvent` in
`MySqlStreamingChangeEventSource`:
```
protected void handlePreviousGtidsEvent(MySqlOffsetContext
offsetContext, Event event) {
PreviousGtidSetEventData previousGtidSetEventData =
unwrapData(event);
LOGGER.info(
"Handling PREVIOUS_GTID_EVENT: {}. Current GTID set: {}",
previousGtidSetEventData,
gtidSet);
com.github.shyiko.mysql.binlog.GtidSet previousGtidSet =
new
com.github.shyiko.mysql.binlog.GtidSet(previousGtidSetEventData.getGtidSet());
if (previousGtidSet.isContainedWithin(gtidSet)) {
LOGGER.info(
"Current GTID set already contains previous GTID set and
will not be updated.");
return;
}
LOGGER.info(
"Updating GTID set with previous GTID set: {}",
previousGtidSetEventData.getGtidSet());
gtidSet = previousGtidSet;
offsetContext.setCompletedGtidSet(gtidSet.toString());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]