taoran92 opened a new pull request, #4448:
URL: https://github.com/apache/flink-cdc/pull/4448
# What is the purpose of the change
This PR fixes incorrect runtime state after `DropTableEvent` in Flink CDC
pipeline jobs.
When a source table is dropped, the downstream table may be removed while
Flink CDC still keeps original/evolved schema state and table-level caches for
the same `TableId`. If
the source table is later created again with the same identifier, the
following `CreateTableEvent` may be treated as redundant, skipped by sink
initialization, or forwarded as
a duplicate create event when `drop.table` is filtered out.
This can cause sinks such as StarRocks to fail when writing data to the
recreated table because the downstream table was dropped but not initialized
again.
This PR makes `DropTableEvent` end the table lifecycle in runtime state
and clears related table-level caches so recreated tables can be initialized
with their latest schema.
It also covers the pre-partition hash function issue reported in
FLINK-39328.
# Brief change log
- Remove original/evolved schema state from `SchemaManager` when applying
`DropTableEvent`
- Update regular schema derivation to forward `DropTableEvent` only when
no upstream dependency remains
- Derive schema differences instead of forwarding duplicate
`CreateTableEvent` when `drop.table` is filtered out and the downstream table
still exists
- Update distributed schema coordination to remove dropped upstream schema
entries and return `null` evolved schema views for dropped sink tables
- Clear table-level caches in transform, pre-partition, and sink wrapper
operators after `DropTableEvent`
- Add unit tests for drop-and-recreate table handling in regular and
distributed schema evolution
- Add unit tests for sink wrapper, transform, partitioning, and schema
manager drop-table behavior
No configuration option is added because this fixes runtime state handling
for existing `DropTableEvent` semantics.
# Verifying this change
This change is verified by unit tests:
- `SchemaManagerTest#testHandlingDropTableEvent`
-
`org.apache.flink.cdc.runtime.operators.schema.regular.SchemaEvolveTest#testDropAndRecreateTable`
-
`org.apache.flink.cdc.runtime.operators.schema.regular.SchemaEvolveTest#testDropAndRecreateTableWithDropTableExcluded`
-
`org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaEvolveTest#testDropAndRecreateTable`
- `DataSinkOperatorWithSchemaEvolveTest#testDataChangeEventAfterDropTable`
- `PreTransformOperatorTest`
- `PrePartitionOperatorTest`
The targeted runtime test set was run locally:
- `mvn -pl flink-cdc-runtime -am -DskipITs -Dcheckstyle.skip=true
-Drat.skip=true -Dspotless.check.skip=true -DfailIfNoTests=false
-Dtest=SchemaManagerTest,org.apache.flink.cdc.runtime.operators.schema.regular.SchemaEvolveTest,org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaEvolveTest,DataS
inkOperatorWithSchemaEvolveTest,PreTransformOperatorTest,PrePartitionOperatorTest
test`
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e. is any changed class annotated with
@Public(@PublicEvolving): no
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes
- Anything that affects deployment or recovery: yes
# Documentation
Does this pull request introduce a new feature? no
If yes, how is the feature documented? not applicable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]