[
https://issues.apache.org/jira/browse/FLINK-39149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jia Fan updated FLINK-39149:
----------------------------
Description:
When using `gtid.new.channel.position=LATEST` and restarting from a checkpoint,
the GTID merging logic in `filterGtidSet` correctly skips history for *{*}new
channels{*}*
(UUIDs not present in the checkpoint), but fails to handle *{*}old channels{*}*
(UUIDs present in the checkpoint) whose recorded GTID range does not start from
`:1`.
This causes MySQL to replay pre-checkpoint historical transactions, leading to
*{*}duplicate data or out-of-order events{*}*.
h3. Actual Behavior
The `LATEST` branch in `filterGtidSet` executes:
```java
mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
```
The `with` operation produces a union, using `filteredGtidSet` (checkpoint
value)
as the override for known UUIDs. For an old channel with a non-contiguous
checkpoint
GTID, the result is:
```
Checkpoint GTID: aaa-111:5000-8000
Server GTID: aaa-111:1-10000
bbb-222:1-3000
Merged GTID sent to MySQL:
aaa-111:5000-8000,
bbb-222:1-3000
```
MySQL then needs to send back:
```
aaa-111:1-4999 ← pre-checkpoint transactions, already consumed or should be
skipped
aaa-111:8001-10000
```
The pre-checkpoint transactions `aaa-111:1-4999` are *{*}replayed
unexpectedly{*}*,
causing data duplication.
h3. Expected Behavior
For old channels, the merged GTID should reflect all transactions up to the
checkpoint boundary, without triggering a replay of pre-checkpoint history.
Expected merged GTID:
```
aaa-111:1-8000,
bbb-222:1-3000
```
was:
When using `gtid.new.channel.position=LATEST` and restarting from a checkpoint,
the GTID merging logic in `filterGtidSet` correctly skips history for **new
channels**
(UUIDs not present in the checkpoint), but fails to handle **old channels**
(UUIDs present in the checkpoint) whose recorded GTID range does not start from
`:1`.
This causes MySQL to replay pre-checkpoint historical transactions, leading to
**duplicate data or out-of-order events**.
## Actual Behavior
The `LATEST` branch in `filterGtidSet` executes:
```java
mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
```
The `with` operation produces a union, using `filteredGtidSet` (checkpoint
value)
as the override for known UUIDs. For an old channel with a non-contiguous
checkpoint
GTID, the result is:
```
Checkpoint GTID: aaa-111:5000-8000
Server GTID: aaa-111:1-10000
bbb-222:1-3000
Merged GTID sent to MySQL:
aaa-111:5000-8000,
bbb-222:1-3000
```
MySQL then needs to send back:
```
aaa-111:1-4999 ← pre-checkpoint transactions, already consumed or should be
skipped
aaa-111:8001-10000
```
The pre-checkpoint transactions `aaa-111:1-4999` are **replayed unexpectedly**,
causing data duplication.
## Expected Behavior
For old channels, the merged GTID should reflect all transactions up to the
checkpoint boundary, without triggering a replay of pre-checkpoint history.
Expected merged GTID:
```
aaa-111:1-8000,
bbb-222:1-3000
```
> `gtid.new.channel.position=LATEST` Incorrectly Replays Pre-Checkpoint
> Transactions for Old Channels in Multi-Source Replication
> -------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39149
> URL: https://issues.apache.org/jira/browse/FLINK-39149
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Reporter: Jia Fan
> Priority: Major
>
> When using `gtid.new.channel.position=LATEST` and restarting from a
> checkpoint,
> the GTID merging logic in `filterGtidSet` correctly skips history for *{*}new
> channels{*}*
> (UUIDs not present in the checkpoint), but fails to handle *{*}old
> channels{*}*
> (UUIDs present in the checkpoint) whose recorded GTID range does not start
> from `:1`.
> This causes MySQL to replay pre-checkpoint historical transactions, leading to
> *{*}duplicate data or out-of-order events{*}*.
> h3. Actual Behavior
>
> The `LATEST` branch in `filterGtidSet` executes:
> ```java
> mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
> ```
> The `with` operation produces a union, using `filteredGtidSet` (checkpoint
> value)
> as the override for known UUIDs. For an old channel with a non-contiguous
> checkpoint
> GTID, the result is:
> ```
> Checkpoint GTID: aaa-111:5000-8000
> Server GTID: aaa-111:1-10000
> bbb-222:1-3000
> Merged GTID sent to MySQL:
> aaa-111:5000-8000,
> bbb-222:1-3000
> ```
> MySQL then needs to send back:
> ```
> aaa-111:1-4999 ← pre-checkpoint transactions, already consumed or should be
> skipped
> aaa-111:8001-10000
> ```
> The pre-checkpoint transactions `aaa-111:1-4999` are *{*}replayed
> unexpectedly{*}*,
> causing data duplication.
> h3. Expected Behavior
>
> For old channels, the merged GTID should reflect all transactions up to the
> checkpoint boundary, without triggering a replay of pre-checkpoint history.
> Expected merged GTID:
> ```
> aaa-111:1-8000,
> bbb-222:1-3000
> ```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)