Sergei Morozov created FLINK-37710:
--------------------------------------
Summary: Schema operator relies on data sources re-emitting
CreateTableEvents during startup
Key: FLINK-37710
URL: https://issues.apache.org/jira/browse/FLINK-37710
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.3.0
Reporter: Sergei Morozov
The schema operator, in order to process incoming events, needs to be aware of
the current schema of all tables. The state is stored as part of
{{SchemaManager}} and is exposed to Flink via {{SchemaRegistry}} (the
coordinator for {{{}SchemaOperator{}}}). It would be logical to expect that
when the schema operator is restored from a checkpoint, it has the notion of
the same schema as it did at the time of the checkpoint, however it's not the
case.
Instead, {{SchemaOperator}} relies on {{{}DataSource{}}}s to re-emit
{{{}CreateTableEvent{}}}s. This looks like a design flaw, because this way
every source should be capable of re-emitting such events and thus have to be
stateful.
The MySQL source re-emits {{{}CreateTableEvent{}}}s by introspecting the
_current_ database schema, which may be different from the schema at a
checkpoint and lead to correctness issues.
Example of an issue:
# Start a MySQL source against two tables.
# Make data changes in one of them.
# Stop the source.
# Make more changes in that table.
# Drop the table.
# Start the source.
{*}Expected behavior{*}: the source emits all data changes before the table is
dropped. {*}Actual behavior{*}: the changes are ignored (data loss).
h3. Proposed solution
# Lazily initialize table schemas in SchemaOperator before processing
{{ChangeEvent}}s. Use the same communication channel with {{SchemaRegistry}} as
used for communicating schema changes.
# Remove the code that re-emits \{{CreateTableEvent}}s from the MySQL source.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)