[ 
https://issues.apache.org/jira/browse/FLINK-37710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37710:
-----------------------------------
    Labels: pull-request-available  (was: )

> Schema operator relies on data sources re-emitting CreateTableEvents during 
> startup
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-37710
>                 URL: https://issues.apache.org/jira/browse/FLINK-37710
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.3.0
>            Reporter: Sergei Morozov
>            Priority: Major
>              Labels: pull-request-available
>
> The schema operator, in order to process incoming events, needs to be aware 
> of the current schema of all tables. The state is stored as part of 
> {{SchemaManager}} and is exposed to Flink via {{SchemaRegistry}} (the 
> coordinator for {{{}SchemaOperator{}}}). It would be logical to expect that 
> when the schema operator is restored from a checkpoint, it has the notion of 
> the same schema as it did at the time of the checkpoint, however it's not the 
> case.
>  
> Instead, {{SchemaOperator}} relies on {{{}DataSource{}}}s to re-emit 
> {{{}CreateTableEvent{}}}s. This looks like a design flaw, because this way 
> every source should be capable of re-emitting such events and thus have to be 
> stateful.
>  
> The MySQL source re-emits {{{}CreateTableEvent{}}}s by introspecting the 
> _current_ database schema, which may be different from the schema at a 
> checkpoint and lead to correctness issues.
>  
> Example of an issue:
>  # Start a MySQL source against two tables.
>  # Make data changes in one of them.
>  # Stop the source.
>  # Make more changes in that table.
>  # Drop the table.
>  # Start the source.
>  
> {*}Expected behavior{*}: the source emits all data changes before the table 
> is dropped. {*}Actual behavior{*}: the changes are ignored (data loss).
> h3. Proposed solution
>  # Lazily initialize table schemas in SchemaOperator before processing 
> {{ChangeEvent}}s. Use the same communication channel with {{SchemaRegistry}} 
> as used for communicating schema changes.
>  # Remove the code that re-emits \{{CreateTableEvent}}s from the MySQL source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to