Sergei Morozov created FLINK-37710: -------------------------------------- Summary: Schema operator relies on data sources re-emitting CreateTableEvents during startup Key: FLINK-37710 URL: https://issues.apache.org/jira/browse/FLINK-37710 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.3.0 Reporter: Sergei Morozov
The schema operator, in order to process incoming events, needs to be aware of the current schema of all tables. The state is stored as part of {{SchemaManager}} and is exposed to Flink via {{SchemaRegistry}} (the coordinator for {{{}SchemaOperator{}}}). It would be logical to expect that when the schema operator is restored from a checkpoint, it has the notion of the same schema as it did at the time of the checkpoint, however it's not the case. Instead, {{SchemaOperator}} relies on {{{}DataSource{}}}s to re-emit {{{}CreateTableEvent{}}}s. This looks like a design flaw, because this way every source should be capable of re-emitting such events and thus have to be stateful. The MySQL source re-emits {{{}CreateTableEvent{}}}s by introspecting the _current_ database schema, which may be different from the schema at a checkpoint and lead to correctness issues. Example of an issue: # Start a MySQL source against two tables. # Make data changes in one of them. # Stop the source. # Make more changes in that table. # Drop the table. # Start the source. {*}Expected behavior{*}: the source emits all data changes before the table is dropped. {*}Actual behavior{*}: the changes are ignored (data loss). h3. Proposed solution # Lazily initialize table schemas in SchemaOperator before processing {{ChangeEvent}}s. Use the same communication channel with {{SchemaRegistry}} as used for communicating schema changes. # Remove the code that re-emits \{{CreateTableEvent}}s from the MySQL source. -- This message was sent by Atlassian Jira (v8.20.10#820010)