[ 
https://issues.apache.org/jira/browse/FLINK-38270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18018024#comment-18018024
 ] 

Hongshun Wang edited comment on FLINK-38270 at 9/4/25 3:26 AM:
---------------------------------------------------------------

Hi, [~morozov] , the core reason of two scenarios is that the enumerator and 
reader maintain different meta information.

I have another idea to solve this problem: I am pushing forwards FLIP-537: 
[Enumerator Maintains Global Splits Distribution for Balanced 
assignment.|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480]

Mysql cdc can implement this FLIP-537 later.Each time after restart, the splits 
are reassigned by enumerator, then all the information can be same. (Details we 
can discussed later, eg,  the stream split no longer serialized to state to 
decrease the split size)

 

You can also join the discussion: 
[https://lists.apache.org/thread/2jptndsfvkgg2knjc7pyfhn8jgto4wd6]


was (Author: JIRAUSER298968):
Hi, [~morozov] , the core reason of two scenarios is that the enumerator and 
reader maintain different meta information.

I have another idea to solve this problem: I am pushing forwards FLIP-537: 
[Enumerator Maintains Global Splits Distribution for Balanced 
assignment.|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480]

Mysql cdc can implement this FLIP-537 later.Each time after restart, the splits 
is reassigned by enumerator, and all the information can be same.

 

You can also join the discussion: 
[https://lists.apache.org/thread/2jptndsfvkgg2knjc7pyfhn8jgto4wd6]

> MySQL CDC source may ignore newly added tables while reading the binlog 
> (scenario 2)
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-38270
>                 URL: https://issues.apache.org/jira/browse/FLINK-38270
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.2.0
>            Reporter: Sergei Morozov
>            Priority: Major
>
> The logic of removing finished snapshot split infos for no longer existing 
> tables is inconsistent on the enumerator and the source reader. This may lead 
> to a situation where the infos of an old table are retained on the reader but 
> the infos of the new one aren’t transmitted. As a result, the binlog events 
> for the newly table will be skipped.
> h3. Discrepancy in the info subtraction logic
> When a table is no longer captured by the connector, it’s removed from the 
> enumerator’s and reader’s state.
> h4. Enumerator
>  # Discover tables that match the include list.
>  # Compare them with the tables from the state.
>  # Remove the state that corresponds to the no longer captured tables.
> As a result:
>  # If the table is no longer included into the configuration, it’s *removed* 
> from the state.
>  # If the table no longer exists, it is *also removed* from the state.
> h4. Reader
>  # Iterate finished snapshot split infos.
>  # Remove all infos whose tables are no longer included into the 
> configuration.
> As a result:
>  # If the table is no longer included into the configuration, it’s *removed* 
> from the state.
>  # If the table no longer exists, it _is *not*_ *removed* from the state, 
> because the reader doesn’t know that the table no longer exists.
> h3. Impact of the discrepancy on the binlog split metadata transmission
> The transmission logic uses the number split infos on each side as the 
> indicator of completion and relies on the fact that the no longer relevant 
> infos are subtracted consistently. The fact that an info that’s subtracted 
> from enumerator’s state may not be subtracted from the reader’s may lead to 
> the fact that the info of a newly snapshotted table won’t be transmitted to 
> the reader.
> h3. Steps to reproduce
>  # Create a source connection that captures tables {{A}} and {{{}B{}}}.
>  # Start the connection and wait until it reaches the steady state.
>  # Stop the connection.
>  # Drop table {{A}} in the source database.
>  # Start the connection.
>  # Observe the state of the enumerator and the reader
>  ## The enumerator’s finished split infos will only contain  B (A no longer 
> exists, so it’s subtracted from the state).
>  ## The reader’s finished split infos will contain A and B (both still match 
> the include list).
>  # Stop the connection.
>  # Add table C to the source configuration.
>  # Start the connection.
>  # Observe the state of the enumerator and the reader
>  ## The enumerator’s finished split infos will contain B and C (the total 
> number is 2).
>  ## The reader’s finished split infos will contain A and B (both still match 
> the include list, and the total number is also 2).
>  # Make data changes in C and confirm that they are not captured.
> h3. Observing state
> h4. Enumerator
>  # Set a breakpoint on {{MySqlSourceEnumerator#snapshotState()}}
>  # Evaluate 
> {{((MySqlHybridSplitAssigner)splitAssigner).snapshotSplitAssigner.splitFinishedOffsets}}
> h4. Reader
>  # Set a breakpoint on {{MySqlSplitReader#fetch()}}
>  # Evaluate 
> {{((MySqlBinlogSplit)((BinlogSplitReader)this.currentReader).currentBinlogSplit).finishedSnapshotSplitInfos}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to