[
https://issues.apache.org/jira/browse/FLINK-37710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-37710:
-----------------------------------
Labels: pull-request-available (was: )
> Schema operator relies on data sources re-emitting CreateTableEvents during
> startup
> -----------------------------------------------------------------------------------
>
> Key: FLINK-37710
> URL: https://issues.apache.org/jira/browse/FLINK-37710
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.3.0
> Reporter: Sergei Morozov
> Priority: Major
> Labels: pull-request-available
>
> The schema operator, in order to process incoming events, needs to be aware
> of the current schema of all tables. The state is stored as part of
> {{SchemaManager}} and is exposed to Flink via {{SchemaRegistry}} (the
> coordinator for {{{}SchemaOperator{}}}). It would be logical to expect that
> when the schema operator is restored from a checkpoint, it has the notion of
> the same schema as it did at the time of the checkpoint, however it's not the
> case.
>
> Instead, {{SchemaOperator}} relies on {{{}DataSource{}}}s to re-emit
> {{{}CreateTableEvent{}}}s. This looks like a design flaw, because this way
> every source should be capable of re-emitting such events and thus have to be
> stateful.
>
> The MySQL source re-emits {{{}CreateTableEvent{}}}s by introspecting the
> _current_ database schema, which may be different from the schema at a
> checkpoint and lead to correctness issues.
>
> Example of an issue:
> # Start a MySQL source against two tables.
> # Make data changes in one of them.
> # Stop the source.
> # Make more changes in that table.
> # Drop the table.
> # Start the source.
>
> {*}Expected behavior{*}: the source emits all data changes before the table
> is dropped. {*}Actual behavior{*}: the changes are ignored (data loss).
> h3. Proposed solution
> # Lazily initialize table schemas in SchemaOperator before processing
> {{ChangeEvent}}s. Use the same communication channel with {{SchemaRegistry}}
> as used for communicating schema changes.
> # Remove the code that re-emits \{{CreateTableEvent}}s from the MySQL source.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)