[
https://issues.apache.org/jira/browse/FLINK-38183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leonard Xu reassigned FLINK-38183:
----------------------------------
Assignee: LiuZeshan
> Data loss when cdc reading mysql that has out of order GTID
> -----------------------------------------------------------
>
> Key: FLINK-38183
> URL: https://issues.apache.org/jira/browse/FLINK-38183
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: 3.0.0
> Environment: Fink-CDC: 3.5-SNAPSHOT
> Flink:1.20.1
>
> Reporter: LiuZeshan
> Assignee: LiuZeshan
> Priority: Critical
> Labels: pull-request-available
>
> As the design of
> [https://github.com/apache/flink-cdc/pull/2220|http://example.com/],CDC only
> cares about the maximum GTID position and starts from it. For example, if
> reading from gtid offset 1-7:9-10, it will automatically adjust to read from
> 1-10, which causes an error in skipping gitd site 8, thus losing data. In
> particular, when gtid bit 8 is a large transaction, it will cause more
> serious data loss. We have encountered this problem many times in the
> production environment.
> MySQL 5.7+ supports parallel replication based on group commit
> (LOGICAL_LOCK). Conflict free transactions are distributed from the SQL
> thread (Coordinator) of the database to multiple worker threads for
> concurrent execution. Although the main database generates continuous GTIDs
> in the order of submission (such as A: 1-100), the worker threads of the
> slave database may complete transaction submission in disorder. When the CDC
> reads the MySQL slave database, we may encounter the following gtid order. In
> fact, we can also manually set the gtid to construct this scenario.
> {code:java}
> SET @@SESSION.GTID_NEXT='XXX:1';
> INSERT ...;
> SET @@SESSION.GTID_NEXT='XXX:2';
> INSERT ...;
> ...
> SET @@SESSION.GTID_NEXT='XXX:7';
> INSERT ...;
> SET @@SESSION.GTID_NEXT='XXX:9';
> INSERT ...;
> SET @@SESSION.GTID_NEXT='XXX:10';
> INSERT ...;
> SET @@SESSION.GTID_NEXT='XXX:8';
> BEGIN;
> INSERT ...;
> ...
> INSERT ...; -- (the number 1 million DML, checkpoint at this position)
> ...
> INSERT ...; -- (the number 2 millions DML)
> COMMIT;
> SET @@SESSION.GTID_NEXT='XXX:11';
> INSERT ...; {code}
> There are 2 million transactions at GTID location 8. When 1 million data are
> read, a checkpoint is triggered and completed. The recorded gtid offset is
> 1-7:9-10, and the skip events are 1 million, as shown below.
> {code:java}
> offset={transaction_id=null, ts_sec=1754145492, file=mysql-bin.000190,
> pos=1443601, kind=SPECIFIC, gtids=xxx:1-7:9-10, row=3, event=1000000,
> server_id=123} {code}
> The job is restarted and recovered from this checkpoint. According to the
> design of CDC, it is automatically adjusted to read from 1-10, and continues
> to skip 1 million events, resulting in the loss of 1 million unread data of
> gitd site 8 and the loss of data contained in 1 million events starting from
> gtid offset 11.
>
> h3. {*}Recurrence steps:{*}mysqlcdc -> print SQL job
> 1. show master status to check current gtid offset
>
> {code:java}
> mysql> show master status;
> +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+
> | File | Position | Binlog_Do_DB | Binlog_Ignore_DB |
> Executed_Gtid_Set
> |
> +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+
> | mysql-bin.000190 | 26484 | | |
> ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,
> fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540 |
> +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+
> 1 row in set (0.00 sec) {code}
>
> 2. Use the with options to specify the gtid offset to start with
>
> {code:java}
> 'scan.startup.specific-offset.gtid-set' =
> 'ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,
> fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540',
> 'scan.startup.mode' = 'specific-offset', {code}
> 3. Manually set the gtid and update the data in MySQL
>
>
> {code:java}
> mysql> set gtid_next='fc992a75-c2a9-11ee-82e7-0242ac120004:182545';
> Query OK, 0 rows affected (0.01 sec)
> mysql> update full_types_2col set tiny_c=30 where id = 10;
> Query OK, 1 row affected (0.02 sec)
> Rows matched: 1 Changed: 1 Warnings: 0
> mysql> set gtid_next='automatic';
> Query OK, 0 rows affected (0.01 sec)
> mysql> begin;
> Query OK, 0 rows affected (0.00 sec)
> mysql> update full_types_2col set tiny_c=31 where id = 10;
> Query OK, 1 row affected (0.01 sec)
> Rows matched: 1 Changed: 1 Warnings: 0
> mysql> update full_types_2col set tiny_c=32 where id = 10;
> Query OK, 1 row affected (0.00 sec)
> Rows matched: 1 Changed: 1 Warnings: 0
> mysql> update full_types_2col set tiny_c=33 where id = 10;
> Query OK, 1 row affected (0.00 sec)
> Rows matched: 1 Changed: 1 Warnings: 0
> mysql> commit;
> Query OK, 0 rows affected (0.02 sec)
> mysql> show master status;
> +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+
> | File | Position | Binlog_Do_DB | Binlog_Ignore_DB |
> Executed_Gtid_Set
> |
> +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+
> | mysql-bin.000190 | 27368 | | |
> ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,
> fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545 |
> +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+
> 1 row in set (0.00 sec) {code}
> CDC job output:
> {code:java}
> -U[10, 29]
> +U[10, 30]
> -U[10, 30]
> +U[10, 31]
> -U[10, 31]
> +U[10, 32]
> -U[10, 32]
> +U[10, 33] {code}
> 4. Trigger checkpoint id=2
>
> {code:java}
> Binlog offset for tables [test.full_types_2col] on checkpoint 2:
> {transaction_id=null, ts_sec=1754194732, file=mysql-bin.000190, pos=26881,
> kind=SPECIFIC,
> gtids=ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540:182545-182545,
> row=1, event=6, server_id=123} {code}
> 5. Restart the job and restore it from checkpoint id=2. It can be seen that
> the current calculation logic of GtidUtils#fixRestoredGtidSet skips gtid
> offset=182541, resulting in data loss when starting from the checkpoint.
> {code:java}
> GTID set from previous recorded offset:
> ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540:182545-182545
> GTID set available on server:
> ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545-182545
> Final merged GTID set to use when connecting to MySQL:
> ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545-182545
> {code}
> 6. Continue to update mysql data and observe job output, 3 data lost.
> {code:java}
> mysql> update full_types_2col set tiny_c=34 where id = 10;
> Query OK, 1 row affected (0.01 sec)
> Rows matched: 1 Changed: 1 Warnings: 0
> mysql> update full_types_2col set tiny_c=35 where id = 10;
> Query OK, 1 row affected (0.01 sec)
> Rows matched: 1 Changed: 1 Warnings: 0
> mysql> update full_types_2col set tiny_c=36 where id = 10;
> Query OK, 1 row affected (0.00 sec)
> Rows matched: 1 Changed: 1 Warnings: 0
> mysql> update full_types_2col set tiny_c=37 where id = 10;
> Query OK, 1 row affected (0.02 sec)
> Rows matched: 1 Changed: 1 Warnings: 0 {code}
> {code:java}
> Skipping previously processed row event:
> Event{header=EventHeaderV4{timestamp=1754194959000,
> eventType=EXT_UPDATE_ROWS, serverId=123, headerLength=19, dataLength=37,
> nextPosition=27655, flags=0}, data=UpdateRowsEventData{tableId=199,
> includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
> {before=[10, 33], after=[10, 34]}
> ]}}
> Skipping previously processed row event:
> Event{header=EventHeaderV4{timestamp=1754194963000,
> eventType=EXT_UPDATE_ROWS, serverId=123, headerLength=19, dataLength=37,
> nextPosition=27973, flags=0}, data=UpdateRowsEventData{tableId=199,
> includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
> {before=[10, 34], after=[10, 35]}
> ]}}
> -U[10, 36]
> +U[10, 37] {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)