[ https://issues.apache.org/jira/browse/FLINK-34634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17827334#comment-17827334 ]
Hongshun Wang commented on FLINK-34634: --------------------------------------- Please assign it to me. > Restarting the job will not read the changelog anymore if it stops before the > synchronization of meta information is complete and some table is removed > ------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-34634 > URL: https://issues.apache.org/jira/browse/FLINK-34634 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Reporter: Hongshun Wang > Priority: Major > Labels: pull-request-available > Fix For: cdc-3.1.0 > > Attachments: image-2024-03-09-15-25-26-187.png, > image-2024-03-09-15-27-46-073.png > > > h3. What's the problem > Once, I removed a table from the option and then restarted the job from the > savepoint, but the job couldn't read the binlog anymore. When I checked the > logs, I found an Error level log stating: > ' The enumerator received invalid request meta group id 6, the valid meta > group id range is [0, 4].' > It appears that the Reader is requesting more splits than the Enumerator is > aware of. > However, the code should indeed remove redundant split information from the > Reader as seen in > [https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does > this issue occur? > > h3. why occurs > !image-2024-03-09-15-25-26-187.png|width=751,height=329! > Upon examining the code, I discovered the cause. If the job stops before > completing all the split meta information and then restarts, this issue > occurs. Suppose that the totalFinishedSplitSize of binlogSplit in the Reader > is 6, and no meta information has been synchronized, leaving the > finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After > restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 > - (0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed > table have two split). This could lead to an out-of-range request. > !image-2024-03-09-15-27-46-073.png|width=755,height=305! > h3. How to reproduce > * Add Thread.sleep(1000L) in > com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#handleSourceEvents > to postpone split meta infos synchronization. > {code:java} > public void handleSourceEvents(SourceEvent sourceEvent) { > else if (sourceEvent instanceof BinlogSplitMetaEvent) { > LOG.debug( > "Source reader {} receives binlog meta with group id {}.", > subtaskId, > ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId()); > try { > Thread.sleep(1000L); > } catch (InterruptedException e) { > throw new RuntimeException(e); > } > fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent); > } {code} > * Add Thread.sleep(500L) in > com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testRemoveTablesOneByOne > to trigger savepoint before meta infos synchronization finishes. > > {code:java} > // step 2: execute insert and trigger savepoint with all tables added > { > // ..ingore > waitForSinkSize("sink", fetchedDataList.size()); > Thread.sleep(500L); > assertEqualsInAnyOrder(fetchedDataList, > TestValuesTableFactory.getRawResults("sink")); > finishedSavePointPath = triggerSavepointWithRetry(jobClient, > savepointDirectory); > jobClient.cancel().get(); > } > // test removing table one by one, note that there should be at least one > table remaining > for (int round = 0; round < captureAddressTables.length - 1; round++) { > ... > } > {code} > > * Add chunk-meta.group.size =2 in > com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#getCreateTableStatement > Then, run > test(com.ververica.cdc.connectors.mysql.source.NewlyAddedTableITCase#testJobManagerFailoverForRemoveTable), > the error log will occur. > -- This message was sent by Atlassian Jira (v8.20.10#820010)