yuxiqian commented on code in PR #4299:
URL: https://github.com/apache/flink-cdc/pull/4299#discussion_r2958207977
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java:
##########
@@ -288,12 +288,29 @@ private List<SchemaChangeEvent>
deduceEvolvedSchemaChanges(SchemaChangeEvent eve
List<SchemaChangeEvent> rawSchemaChangeEvents = new ArrayList<>();
if (upstreamDependencies.size() == 1) {
- // If it's a one-by-one routing rule, we can simply forward it
to downstream sink.
- SchemaChangeEvent rawEvent = event.copy(evolvedTableId);
- rawSchemaChangeEvents.add(rawEvent);
- LOG.info(
- "Step 3.3 - It's an one-by-one routing and could be
forwarded as {}.",
- rawEvent);
+ // If it's a one-by-one routing rule, we can simply forward it
to downstream
+ // sink. However, if the incoming event is a CreateTableEvent
for an
+ // already-known evolved table (e.g. after restart with
changed projections),
+ // we must compute the schema diff instead of forwarding the
raw
+ // CreateTableEvent, which would fail at the sink.
+ if (event instanceof CreateTableEvent && currentEvolvedSchema
!= null) {
+ CreateTableEvent createTableEvent = (CreateTableEvent)
event;
+ List<SchemaChangeEvent> diffEvents =
+ SchemaMergingUtils.getSchemaDifference(
+ evolvedTableId,
+ currentEvolvedSchema,
+ createTableEvent.getSchema());
+ rawSchemaChangeEvents.addAll(diffEvents);
+ LOG.info(
+ "Step 3.3 - It's a one-by-one routing but
CreateTableEvent for existing table. Computed diff events: {}.",
+ diffEvents);
+ } else {
+ SchemaChangeEvent rawEvent = event.copy(evolvedTableId);
+ rawSchemaChangeEvents.add(rawEvent);
+ LOG.info(
+ "Step 3.3 - It's an one-by-one routing and could
be forwarded as {}.",
+ rawEvent);
+ }
Review Comment:
Can you add E2e test cases in org.apache.flink.cdc.pipeline.tests.migration
to cover these scenarios?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]