yuxiqian commented on code in PR #3590:
URL: https://github.com/apache/flink-cdc/pull/3590#discussion_r2354519364


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java:
##########


Review Comment:
   Can we add similar cases for MySQL CDC pipeline connectors with 
`scan.binlog.newly-added-table.enabled`?
   
   IIUC if we can't obtain new schema from `CREATE TABLE ... LIKE ...` DDL, 
triggering failover and re-scanning table schemas should recover the job.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java:
##########
@@ -82,6 +83,29 @@ public CustomAlterTableParserListener(
         this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
     }
 
+    @Override
+    public void exitCopyCreateTable(MySqlParser.CopyCreateTableContext ctx) {
+        TableId tableId = 
parser.parseQualifiedTableId(ctx.tableName(0).fullId());
+        TableId originalTableId = 
parser.parseQualifiedTableId(ctx.tableName(1).fullId());
+        Table original = parser.databaseTables().forTable(originalTableId);
+        if (original != null) {
+            parser.databaseTables()
+                    .overwriteTable(
+                            tableId,
+                            original.columns(),
+                            original.primaryKeyColumnNames(),
+                            original.defaultCharsetName());
+            parser.signalCreateTable(tableId, ctx);
+            Schema.Builder builder = Schema.newBuilder();
+            original.columns().forEach(column -> 
builder.column(toCdcColumn(column)));
+            if (!original.primaryKeyColumnNames().isEmpty()) {
+                builder.primaryKey(original.primaryKeyColumnNames());
+            }
+            changes.add(new CreateTableEvent(toCdcTableId(tableId), 
builder.build()));
+        }
+        super.exitCopyCreateTable(ctx);
+    }

Review Comment:
   Print a warning log if `CREATE TABLE LIKE` original table isn't presented in 
the capture list?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java:
##########


Review Comment:
   Also, `MySqlPipelineRecordEmitter` may detect this case by checking if there 
are "DataChangeEvents" without corresponding "CreateTableEvent" and throw 
exceptions in advance. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to