yuxiqian commented on code in PR #3590:
URL: https://github.com/apache/flink-cdc/pull/3590#discussion_r2354519364
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java:
##########
Review Comment:
Can we add similar cases for MySQL CDC pipeline connectors with
`scan.binlog.newly-added-table.enabled`?
IIUC if we can't obtain new schema from `CREATE TABLE ... LIKE ...` DDL,
triggering failover and re-scanning table schemas should recover the job.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java:
##########
@@ -82,6 +83,29 @@ public CustomAlterTableParserListener(
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
}
+ @Override
+ public void exitCopyCreateTable(MySqlParser.CopyCreateTableContext ctx) {
+ TableId tableId =
parser.parseQualifiedTableId(ctx.tableName(0).fullId());
+ TableId originalTableId =
parser.parseQualifiedTableId(ctx.tableName(1).fullId());
+ Table original = parser.databaseTables().forTable(originalTableId);
+ if (original != null) {
+ parser.databaseTables()
+ .overwriteTable(
+ tableId,
+ original.columns(),
+ original.primaryKeyColumnNames(),
+ original.defaultCharsetName());
+ parser.signalCreateTable(tableId, ctx);
+ Schema.Builder builder = Schema.newBuilder();
+ original.columns().forEach(column ->
builder.column(toCdcColumn(column)));
+ if (!original.primaryKeyColumnNames().isEmpty()) {
+ builder.primaryKey(original.primaryKeyColumnNames());
+ }
+ changes.add(new CreateTableEvent(toCdcTableId(tableId),
builder.build()));
+ }
+ super.exitCopyCreateTable(ctx);
+ }
Review Comment:
Print a warning log if `CREATE TABLE LIKE` original table isn't presented in
the capture list?
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java:
##########
Review Comment:
Also, `MySqlPipelineRecordEmitter` may detect this case by checking if there
are "DataChangeEvents" without corresponding "CreateTableEvent" and throw
exceptions in advance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]