[ https://issues.apache.org/jira/browse/FLINK-38183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
LiuZeshan updated FLINK-38183: ------------------------------ Description: As the design of [https://github.com/apache/flink-cdc/pull/2220|http://example.com/],CDC only cares about the maximum GTID position and starts from it. For example, if reading from gtid offset 1-7:9-10, it will automatically adjust to read from 1-10, which causes an error in skipping gitd site 8, thus losing data. In particular, when gtid bit 8 is a large transaction, it will cause more serious data loss. We have encountered this problem many times in the production environment. MySQL 5.7+ supports parallel replication based on group commit (LOGICAL_LOCK). Conflict free transactions are distributed from the SQL thread (Coordinator) of the database to multiple worker threads for concurrent execution. Although the main database generates continuous GTIDs in the order of submission (such as A: 1-100), the worker threads of the slave database may complete transaction submission in disorder. When the CDC reads the MySQL slave database, we may encounter the following gtid order. In fact, we can also manually set the gtid to construct this scenario. {code:java} SET @@SESSION.GTID_NEXT='XXX:1'; INSERT ...; SET @@SESSION.GTID_NEXT='XXX:2'; INSERT ...; ... SET @@SESSION.GTID_NEXT='XXX:7'; INSERT ...; SET @@SESSION.GTID_NEXT='XXX:9'; INSERT ...; SET @@SESSION.GTID_NEXT='XXX:10'; INSERT ...; SET @@SESSION.GTID_NEXT='XXX:8'; BEGIN; INSERT ...; ... INSERT ...; -- (the number 1 million DML, checkpoint at this position) ... INSERT ...; -- (the number 2 millions DML) COMMIT; SET @@SESSION.GTID_NEXT='XXX:11'; INSERT ...; {code} There are 2 million transactions at GTID location 8. When 1 million data are read, a checkpoint is triggered and completed. The recorded git offset is 1-7:9-10, and the skip events are 1 million, as shown below. {code:java} offset={transaction_id=null, ts_sec=1754145492, file=mysql-bin.000190, pos=1443601, kind=SPECIFIC, gtids=xxx:1-7:9-10, row=3, event=1000000, server_id=123} {code} The job is restarted and recovered from this checkpoint. According to the design of CDC, it is automatically adjusted to read from 1-10, and continues to skip 1 million events, resulting in the loss of 1 million unread data of gitd site 8 and the loss of data contained in 1 million events starting from gtid site 11. h3. {*}Recurrence steps:{*}mysqlcdc -> print SQL job 1. show master status to check current gtid offset {code:java} mysql> show master status; +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+ | mysql-bin.000190 | 26484 | | | ad268c5d-2f18-11ef-8eac-0242ac120003:9-12, fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540 | +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+ 1 row in set (0.00 sec) {code} 2. Use the with options to specify the gtid offset to start with {code:java} 'scan.startup.specific-offset.gtid-set' = 'ad268c5d-2f18-11ef-8eac-0242ac120003:9-12, fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540', 'scan.startup.mode' = 'specific-offset', {code} 3. Manually set the gtid and update the data in MySQL {code:java} mysql> set gtid_next='fc992a75-c2a9-11ee-82e7-0242ac120004:182545'; Query OK, 0 rows affected (0.01 sec) mysql> update full_types_2col set tiny_c=30 where id = 10; Query OK, 1 row affected (0.02 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> set gtid_next='automatic'; Query OK, 0 rows affected (0.01 sec) mysql> begin; Query OK, 0 rows affected (0.00 sec) mysql> update full_types_2col set tiny_c=31 where id = 10; Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> update full_types_2col set tiny_c=32 where id = 10; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> update full_types_2col set tiny_c=33 where id = 10; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> commit; Query OK, 0 rows affected (0.02 sec) mysql> show master status; +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+ | mysql-bin.000190 | 27368 | | | ad268c5d-2f18-11ef-8eac-0242ac120003:9-12, fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545 | +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+ 1 row in set (0.00 sec) {code} CDC job output: {code:java} -U[10, 29] +U[10, 30] -U[10, 30] +U[10, 31] -U[10, 31] +U[10, 32] -U[10, 32] +U[10, 33] {code} 4. Trigger checkpoint id=2 {code:java} Binlog offset for tables [test.full_types_2col] on checkpoint 2: {transaction_id=null, ts_sec=1754194732, file=mysql-bin.000190, pos=26881, kind=SPECIFIC, gtids=ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540:182545-182545, row=1, event=6, server_id=123} {code} 5. Restart the job and restore it from checkpoint id=2. It can be seen that the current calculation logic of GtidUtils#fixRestoredGtidSet skips gtid offset=182541, resulting in data loss when starting from the checkpoint. {code:java} GTID set from previous recorded offset: ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540:182545-182545 GTID set available on server: ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545-182545 Final merged GTID set to use when connecting to MySQL: ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545-182545 {code} 6. Continue to update mysql data and observe job output, 2 data lost. {code:java} mysql> update full_types_2col set tiny_c=34 where id = 10; Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> update full_types_2col set tiny_c=35 where id = 10; Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> update full_types_2col set tiny_c=36 where id = 10; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> update full_types_2col set tiny_c=37 where id = 10; Query OK, 1 row affected (0.02 sec) Rows matched: 1 Changed: 1 Warnings: 0 {code} {code:java} Skipping previously processed row event: Event{header=EventHeaderV4{timestamp=1754194959000, eventType=EXT_UPDATE_ROWS, serverId=123, headerLength=19, dataLength=37, nextPosition=27655, flags=0}, data=UpdateRowsEventData{tableId=199, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[ {before=[10, 33], after=[10, 34]} ]}} Skipping previously processed row event: Event{header=EventHeaderV4{timestamp=1754194963000, eventType=EXT_UPDATE_ROWS, serverId=123, headerLength=19, dataLength=37, nextPosition=27973, flags=0}, data=UpdateRowsEventData{tableId=199, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[ {before=[10, 34], after=[10, 35]} ]}} -U[10, 36] +U[10, 37] {code} was: As the design of [https://github.com/apache/flink-cdc/pull/2220|http://example.com],CDC only cares about the maximum GTID position and starts from it. For example, if reading from gtid offset 1-7:9-10, it will automatically adjust to read from 1-10, which causes an error in skipping gitd site 8, thus losing data. In particular, when gtid bit 8 is a large transaction, it will cause more serious data loss. We have encountered this problem many times in the production environment. MySQL 5.7+ supports parallel replication based on group commit (LOGICAL_LOCK). Conflict free transactions are distributed from the SQL thread (Coordinator) of the database to multiple worker threads for concurrent execution. Although the main database generates continuous GTIDs in the order of submission (such as A: 1-100), the worker threads of the slave database may complete transaction submission in disorder. When the CDC reads the MySQL slave database, we may encounter the following gtid order. In fact, we can also manually set the gtid to construct this scenario. {code:java} SET @@SESSION.GTID_NEXT='XXX:1'; INSERT ...; SET @@SESSION.GTID_NEXT='XXX:2'; INSERT ...; ... SET @@SESSION.GTID_NEXT='XXX:7'; INSERT ...; SET @@SESSION.GTID_NEXT='XXX:9'; INSERT ...; SET @@SESSION.GTID_NEXT='XXX:10'; INSERT ...; SET @@SESSION.GTID_NEXT='XXX:8'; BEGIN; INSERT ...; ... INSERT ...; -- (the number 1 million DML, checkpoint at this position) ... INSERT ...; -- (the number 2 millions DML) COMMIT; SET @@SESSION.GTID_NEXT='XXX:11'; INSERT ...; {code} There are 2 million transactions at GTID location 8. When 1 million data are read, a checkpoint is triggered and completed. The recorded git offset is 1-7:9-10, and the skip events are 1 million, as shown below. {code:java} offset={transaction_id=null, ts_sec=1754145492, file=mysql-bin.000190, pos=1443601, kind=SPECIFIC, gtids=xxx:1-7:9-10, row=3, event=1000000, server_id=123} {code} The job is restarted and recovered from this checkpoint. According to the design of CDC, it is automatically adjusted to read from 1-10, and continues to skip 1 million events, resulting in the loss of 1 million unread data of gitd site 8 and the loss of data contained in 1 million events starting from gtid site 11. > Data loss when cdc reading mysql that has out of order GTID > ----------------------------------------------------------- > > Key: FLINK-38183 > URL: https://issues.apache.org/jira/browse/FLINK-38183 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: 3.0.0 > Environment: Fink-CDC: 3.5-SNAPSHOT > Flink:1.20.1 > > Reporter: LiuZeshan > Priority: Critical > Labels: pull-request-available > > As the design of > [https://github.com/apache/flink-cdc/pull/2220|http://example.com/],CDC only > cares about the maximum GTID position and starts from it. For example, if > reading from gtid offset 1-7:9-10, it will automatically adjust to read from > 1-10, which causes an error in skipping gitd site 8, thus losing data. In > particular, when gtid bit 8 is a large transaction, it will cause more > serious data loss. We have encountered this problem many times in the > production environment. > MySQL 5.7+ supports parallel replication based on group commit > (LOGICAL_LOCK). Conflict free transactions are distributed from the SQL > thread (Coordinator) of the database to multiple worker threads for > concurrent execution. Although the main database generates continuous GTIDs > in the order of submission (such as A: 1-100), the worker threads of the > slave database may complete transaction submission in disorder. When the CDC > reads the MySQL slave database, we may encounter the following gtid order. In > fact, we can also manually set the gtid to construct this scenario. > {code:java} > SET @@SESSION.GTID_NEXT='XXX:1'; > INSERT ...; > SET @@SESSION.GTID_NEXT='XXX:2'; > INSERT ...; > ... > SET @@SESSION.GTID_NEXT='XXX:7'; > INSERT ...; > SET @@SESSION.GTID_NEXT='XXX:9'; > INSERT ...; > SET @@SESSION.GTID_NEXT='XXX:10'; > INSERT ...; > SET @@SESSION.GTID_NEXT='XXX:8'; > BEGIN; > INSERT ...; > ... > INSERT ...; -- (the number 1 million DML, checkpoint at this position) > ... > INSERT ...; -- (the number 2 millions DML) > COMMIT; > SET @@SESSION.GTID_NEXT='XXX:11'; > INSERT ...; {code} > There are 2 million transactions at GTID location 8. When 1 million data are > read, a checkpoint is triggered and completed. The recorded git offset is > 1-7:9-10, and the skip events are 1 million, as shown below. > {code:java} > offset={transaction_id=null, ts_sec=1754145492, file=mysql-bin.000190, > pos=1443601, kind=SPECIFIC, gtids=xxx:1-7:9-10, row=3, event=1000000, > server_id=123} {code} > The job is restarted and recovered from this checkpoint. According to the > design of CDC, it is automatically adjusted to read from 1-10, and continues > to skip 1 million events, resulting in the loss of 1 million unread data of > gitd site 8 and the loss of data contained in 1 million events starting from > gtid site 11. > > h3. {*}Recurrence steps:{*}mysqlcdc -> print SQL job > 1. show master status to check current gtid offset > > {code:java} > mysql> show master status; > +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+ > | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | > Executed_Gtid_Set > | > +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+ > | mysql-bin.000190 | 26484 | | | > ad268c5d-2f18-11ef-8eac-0242ac120003:9-12, > fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540 | > +------------------+----------+--------------+------------------+------------------------------------------------------------------------------------------+ > 1 row in set (0.00 sec) {code} > > 2. Use the with options to specify the gtid offset to start with > > {code:java} > 'scan.startup.specific-offset.gtid-set' = > 'ad268c5d-2f18-11ef-8eac-0242ac120003:9-12, > fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540', > 'scan.startup.mode' = 'specific-offset', {code} > 3. Manually set the gtid and update the data in MySQL > > > {code:java} > mysql> set gtid_next='fc992a75-c2a9-11ee-82e7-0242ac120004:182545'; > Query OK, 0 rows affected (0.01 sec) > mysql> update full_types_2col set tiny_c=30 where id = 10; > Query OK, 1 row affected (0.02 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> set gtid_next='automatic'; > Query OK, 0 rows affected (0.01 sec) > mysql> begin; > Query OK, 0 rows affected (0.00 sec) > mysql> update full_types_2col set tiny_c=31 where id = 10; > Query OK, 1 row affected (0.01 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> update full_types_2col set tiny_c=32 where id = 10; > Query OK, 1 row affected (0.00 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> update full_types_2col set tiny_c=33 where id = 10; > Query OK, 1 row affected (0.00 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> commit; > Query OK, 0 rows affected (0.02 sec) > mysql> show master status; > +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+ > | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | > Executed_Gtid_Set > | > +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+ > | mysql-bin.000190 | 27368 | | | > ad268c5d-2f18-11ef-8eac-0242ac120003:9-12, > fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545 | > +------------------+----------+--------------+------------------+-------------------------------------------------------------------------------------------------+ > 1 row in set (0.00 sec) {code} > CDC job output: > {code:java} > -U[10, 29] > +U[10, 30] > -U[10, 30] > +U[10, 31] > -U[10, 31] > +U[10, 32] > -U[10, 32] > +U[10, 33] {code} > 4. Trigger checkpoint id=2 > > {code:java} > Binlog offset for tables [test.full_types_2col] on checkpoint 2: > {transaction_id=null, ts_sec=1754194732, file=mysql-bin.000190, pos=26881, > kind=SPECIFIC, > gtids=ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540:182545-182545, > row=1, event=6, server_id=123} {code} > 5. Restart the job and restore it from checkpoint id=2. It can be seen that > the current calculation logic of GtidUtils#fixRestoredGtidSet skips gtid > offset=182541, resulting in data loss when starting from the checkpoint. > {code:java} > GTID set from previous recorded offset: > ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182540:182545-182545 > GTID set available on server: > ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545-182545 > Final merged GTID set to use when connecting to MySQL: > ad268c5d-2f18-11ef-8eac-0242ac120003:9-12,fc992a75-c2a9-11ee-82e7-0242ac120004:1-182541:182545-182545 > {code} > 6. Continue to update mysql data and observe job output, 2 data lost. > {code:java} > mysql> update full_types_2col set tiny_c=34 where id = 10; > Query OK, 1 row affected (0.01 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> update full_types_2col set tiny_c=35 where id = 10; > Query OK, 1 row affected (0.01 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> update full_types_2col set tiny_c=36 where id = 10; > Query OK, 1 row affected (0.00 sec) > Rows matched: 1 Changed: 1 Warnings: 0 > mysql> update full_types_2col set tiny_c=37 where id = 10; > Query OK, 1 row affected (0.02 sec) > Rows matched: 1 Changed: 1 Warnings: 0 {code} > {code:java} > Skipping previously processed row event: > Event{header=EventHeaderV4{timestamp=1754194959000, > eventType=EXT_UPDATE_ROWS, serverId=123, headerLength=19, dataLength=37, > nextPosition=27655, flags=0}, data=UpdateRowsEventData{tableId=199, > includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[ > {before=[10, 33], after=[10, 34]} > ]}} > Skipping previously processed row event: > Event{header=EventHeaderV4{timestamp=1754194963000, > eventType=EXT_UPDATE_ROWS, serverId=123, headerLength=19, dataLength=37, > nextPosition=27973, flags=0}, data=UpdateRowsEventData{tableId=199, > includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[ > {before=[10, 34], after=[10, 35]} > ]}} > -U[10, 36] > +U[10, 37] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)