[
https://issues.apache.org/jira/browse/FLINK-39965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39965:
-----------------------------------
Labels: pull-request-available (was: )
> Flink CDC should clear schema state and table-level caches after
> DropTableEvent
> -------------------------------------------------------------------------------
>
> Key: FLINK-39965
> URL: https://issues.apache.org/jira/browse/FLINK-39965
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.5.0, cdc-3.6.0, cdc-3.7.0
> Reporter: Ran Tao
> Priority: Major
> Labels: pull-request-available
>
> When a Flink CDC pipeline job synchronizes MySQL tables to a sink that
> supports schema evolution, a table lifecycle like:
> 1. initial snapshot + binlog sync succeeds
> 2. DROP TABLE on MySQL
> 3. CREATE TABLE again with the same table identifier
> 4. INSERT into the recreated table
> the job will fail.
> The observed failure was reproduced with MySQL pipeline source and StarRocks
> pipeline sink. StarRocks reports that the target table is unknown during
> stream-load flush, because Flink CDC has already applied the DROP TABLE to
> the sink, but the following CreateTableEvent is considered redundant and is
> skipped.
> Although the failure is observed with StarRocks, the root cause appears to be
> in the Flink CDC runtime schema state / table-level caches, so this may
> affect other sinks that support DROP_TABLE as well (such as paimon below).
> Minimal pipeline YAML:
> {code:yaml}
> source:
> type: mysql
> name: MySQL Source
> hostname: mysql.example.com
> port: 3306
> username: flink
> password: ****
> tables: flink.orders_.*
> server-id: 5100-5104
> scan.startup.mode: initial
> scan.newly-added-table.enabled: false
> scan.binlog.newly-added-table.enabled: true
> sink:
> type: starrocks
> name: StarRocks Sink
> jdbc-url: jdbc:mysql://starrocks-fe.example.com:9030
> load-url: starrocks-fe.example.com:8030
> username: flink
> password: ****
> pipeline:
> name: mysql-to-starrocks-drop-recreate
> schema.change.behavior: EVOLVE
> parallelism: 2
> {code}
> Minimal SQL to reproduce:
> {code:sql}
> CREATE DATABASE IF NOT EXISTS flink;
> CREATE TABLE flink.orders_0 (
> id BIGINT NOT NULL,
> order_no VARCHAR(64) NOT NULL,
> user_name VARCHAR(64),
> status VARCHAR(32) NOT NULL,
> total_amount DECIMAL(18, 2) NOT NULL,
> create_at TIMESTAMP NOT NULL,
> PRIMARY KEY (id)
> );
> INSERT INTO flink.orders_0 VALUES
> (1, 'order-1', 'Alice', 'NEW', 10.00, CURRENT_TIMESTAMP);
> -- Wait until the initial snapshot and binlog events are synchronized.
> DROP TABLE flink.orders_0;
> CREATE TABLE flink.orders_0 (
> id BIGINT NOT NULL,
> order_no VARCHAR(64) NOT NULL,
> user_name VARCHAR(64),
> status VARCHAR(32) NOT NULL,
> total_amount DECIMAL(18, 2) NOT NULL,
> create_at TIMESTAMP NOT NULL,
> PRIMARY KEY (id)
> );
> INSERT INTO flink.orders_0 VALUES
> (2, 'order-2', 'Bob', 'NEW', 20.00, CURRENT_TIMESTAMP);
> {code}
> *Observed behavior*
> The DROP TABLE event is applied to the sink successfully:
> {code}
> Successfully applied schema change event
> DropTableEvent\{tableId=flink.orders_0} to external system.
> {code}
> Then the recreated table's CreateTableEvent is skipped as redundant:
> {code}
> Schema change event CreateTableEvent\{tableId=flink.orders_0, schema=...} is
> redundant for current schema ..., just skip it.
> {code}
> Later, on checkpoint flush, the StarRocks sink fails because the sink table
> was dropped and was not created again:
> {code}
> Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException:
> Transaction start failed, db: flink, table: orders_0, responseBody: {
> "Status": "ANALYSIS_ERROR",
> "Message": "unknown table \"flink.orders_0\""
> }
> at
> com.starrocks.data.load.stream.TransactionStreamLoader.doBegin(TransactionStreamLoader.java:161)
> at
> com.starrocks.data.load.stream.TransactionStreamLoader.begin(TransactionStreamLoader.java:103)
> at
> com.starrocks.data.load.stream.DefaultStreamLoader.send(DefaultStreamLoader.java:175)
> at
> com.starrocks.data.load.stream.v2.TransactionTableRegion.streamLoad(TransactionTableRegion.java:378)
> at
> com.starrocks.data.load.stream.v2.TransactionTableRegion.flush(TransactionTableRegion.java:238)
> at
> com.starrocks.data.load.stream.v2.StreamLoadManagerV2.flush(StreamLoadManagerV2.java:356)
> at
> com.starrocks.connector.flink.table.sink.v2.StarRocksWriter.flush(StarRocksWriter.java:153)
> {code}
> *Expected behavior*
> After a DROP TABLE event ends the table lifecycle, Flink CDC should clear the
> corresponding original/evolved schema state and table-level caches. A later
> CREATE TABLE event with the same table identifier should not be treated as a
> duplicate CreateTableEvent from the previous lifecycle.
> The recreated table should be initialized again in the sink, and subsequent
> data changes should be written successfully.
> *Suspected root cause*
> The runtime keeps schema state and table-level caches for the dropped table
> id. Therefore, when a CreateTableEvent for the same table id arrives later,
> SchemaUtils.isSchemaChangeEventRedundant(...) may treat it as already applied
> because a previous schema is still present.
> Related runtime states/caches include:
> * SchemaManager original/evolved schema state
> * regular/distributed schema coordinator upstream schema views
> * sink wrapper processed table id cache
> * transform / partitioning table-level caches
> *Relation to FLINK-39328*
> This issue is related to FLINK-39328, where a MySQL to Paimon pipeline fails
> immediately after DROP TABLE because RegularPrePartitionOperator tries to
> recreate the table hash function after the downstream Paimon table has
> already been dropped.
> The root cause is in the same area: DropTableEvent is not fully treated as
> the end of a table lifecycle in the Flink CDC runtime, so table-level runtime
> state/caches may still be reused after the downstream table has been
> physically removed.
> This issue covers a broader scenario than FLINK-39328
--
This message was sent by Atlassian Jira
(v8.20.10#820010)