VinaySagarGonabavi commented on code in PR #4279:
URL: https://github.com/apache/flink-cdc/pull/4279#discussion_r2921457044


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java:
##########
@@ -120,7 +126,35 @@ public static Schema applySchemaChangeEvent(Schema schema, 
SchemaChangeEvent eve
 
     private static Schema applyAddColumnEvent(AddColumnEvent event, Schema 
oldSchema) {
         LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
+        Set<String> existingColumnNames =
+                columns.stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toCollection(HashSet::new));
         for (AddColumnEvent.ColumnWithPosition columnWithPosition : 
event.getAddedColumns()) {
+            // Skip columns that already exist in the schema to handle 
duplicate AddColumnEvents
+            // (e.g., from gh-ost online schema migrations)
+            if 
(existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) {
+                Column incomingColumn = columnWithPosition.getAddColumn();
+                columns.stream()
+                        .filter(c -> 
c.getName().equals(incomingColumn.getName()))
+                        .findFirst()
+                        .ifPresent(
+                                existingColumn -> {
+                                    if (!existingColumn
+                                            .getType()
+                                            .equals(incomingColumn.getType())) 
{

Review Comment:
   No coercions or implicit casting are performed. When a duplicate column is 
detected by name, the existing column definition is preserved as-is and the 
incoming duplicate is skipped. If the types differ, a WARN log is emitted: 
"Skipping duplicate column '{}' for table {} but types differ: existing={}, 
incoming={}". This is intentional for the gh-ost use case. Duplicate 
AddColumnEvents from online schema migration tools should have matching types. 
A type mismatch indicates a potential upstream inconsistency that should be 
investigated, but we don't want to crash the pipeline over it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to