[ 
https://issues.apache.org/jira/browse/FLINK-38270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18018024#comment-18018024
 ] 

Hongshun Wang edited comment on FLINK-38270 at 9/4/25 3:25 AM:
---------------------------------------------------------------

Hi, [~morozov] , the core reason of two scenarios is that the enumerator and 
reader maintain different meta information.

I have another idea to solve this problem: I am pushing forwards FLIP-537: 
[Enumerator Maintains Global Splits Distribution for Balanced 
assignment.|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480]

Mysql cdc can implement this FLIP-537 later.Each time after restart, the splits 
is reassigned by enumerator, and all the information can be same.

 

You can also join the discussion: 
[https://lists.apache.org/thread/2jptndsfvkgg2knjc7pyfhn8jgto4wd6]


was (Author: JIRAUSER298968):
Hi, [~morozov] , the core reasons of two scenarios is that the enumerator and 
reader maintain different meta infomation.

I have another idea to solve this problem: I am pushing forwards FLIP-537: 
[Enumerator Maintains Global Splits Distribution for Balanced 
assignment.|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480]

Mysql cdc can implement this FLIP-537 later.Each time after restart, the splits 
is reassigned by enumerator, and all the information can be same.

 

You can also join the discussion: 
https://lists.apache.org/thread/2jptndsfvkgg2knjc7pyfhn8jgto4wd6

> MySQL CDC source may ignore newly added tables while reading the binlog 
> (scenario 2)
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-38270
>                 URL: https://issues.apache.org/jira/browse/FLINK-38270
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.2.0
>            Reporter: Sergei Morozov
>            Priority: Major
>
> The logic of removing finished snapshot split infos for no longer existing 
> tables is inconsistent on the enumerator and the source reader. This may lead 
> to a situation where the infos of an old table are retained on the reader but 
> the infos of the new one aren’t transmitted. As a result, the binlog events 
> for the newly table will be skipped.
> h3. Discrepancy in the info subtraction logic
> When a table is no longer captured by the connector, it’s removed from the 
> enumerator’s and reader’s state.
> h4. Enumerator
>  # Discover tables that match the include list.
>  # Compare them with the tables from the state.
>  # Remove the state that corresponds to the no longer captured tables.
> As a result:
>  # If the table is no longer included into the configuration, it’s *removed* 
> from the state.
>  # If the table no longer exists, it is *also removed* from the state.
> h4. Reader
>  # Iterate finished snapshot split infos.
>  # Remove all infos whose tables are no longer included into the 
> configuration.
> As a result:
>  # If the table is no longer included into the configuration, it’s *removed* 
> from the state.
>  # If the table no longer exists, it _is *not*_ *removed* from the state, 
> because the reader doesn’t know that the table no longer exists.
> h3. Impact of the discrepancy on the binlog split metadata transmission
> The transmission logic uses the number split infos on each side as the 
> indicator of completion and relies on the fact that the no longer relevant 
> infos are subtracted consistently. The fact that an info that’s subtracted 
> from enumerator’s state may not be subtracted from the reader’s may lead to 
> the fact that the info of a newly snapshotted table won’t be transmitted to 
> the reader.
> h3. Steps to reproduce
>  # Create a source connection that captures tables {{A}} and {{{}B{}}}.
>  # Start the connection and wait until it reaches the steady state.
>  # Stop the connection.
>  # Drop table {{A}} in the source database.
>  # Start the connection.
>  # Observe the state of the enumerator and the reader
>  ## The enumerator’s finished split infos will only contain  B (A no longer 
> exists, so it’s subtracted from the state).
>  ## The reader’s finished split infos will contain A and B (both still match 
> the include list).
>  # Stop the connection.
>  # Add table C to the source configuration.
>  # Start the connection.
>  # Observe the state of the enumerator and the reader
>  ## The enumerator’s finished split infos will contain B and C (the total 
> number is 2).
>  ## The reader’s finished split infos will contain A and B (both still match 
> the include list, and the total number is also 2).
>  # Make data changes in C and confirm that they are not captured.
> h3. Observing state
> h4. Enumerator
>  # Set a breakpoint on {{MySqlSourceEnumerator#snapshotState()}}
>  # Evaluate 
> {{((MySqlHybridSplitAssigner)splitAssigner).snapshotSplitAssigner.splitFinishedOffsets}}
> h4. Reader
>  # Set a breakpoint on {{MySqlSplitReader#fetch()}}
>  # Evaluate 
> {{((MySqlBinlogSplit)((BinlogSplitReader)this.currentReader).currentBinlogSplit).finishedSnapshotSplitInfos}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to