Copilot commented on code in PR #4370:
URL: https://github.com/apache/flink-cdc/pull/4370#discussion_r3057050784


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java:
##########
@@ -121,7 +121,16 @@ public static Schema applySchemaChangeEvent(Schema schema, 
SchemaChangeEvent eve
 
     private static Schema applyAddColumnEvent(AddColumnEvent event, Schema 
oldSchema) {
         LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
+        Set<String> existingColumnNames =
+                columns.stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toCollection(HashSet::new));
         for (AddColumnEvent.ColumnWithPosition columnWithPosition : 
event.getAddedColumns()) {
+            // Skip adding the column if it already exists in the schema to 
ensure idempotency.
+            // This can happen when schema change events are replayed after a 
failover recovery.
+            if 
(existingColumnNames.contains(columnWithPosition.getAddColumn().getName())) {
+                continue;
+            }

Review Comment:
   Skipping duplicates purely by name can hide a real schema conflict (e.g., 
replay vs. a different upstream `AddColumnEvent` that reuses the same column 
name but with a different type/nullable/default/comment). To keep idempotency 
while avoiding silent corruption, consider: if the name exists, look up the 
existing `Column` and compare with `columnWithPosition.getAddColumn()`; only 
skip when they are equivalent, otherwise throw an informative exception.



##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java:
##########
@@ -160,6 +160,46 @@ void testApplyColumnSchemaChangeEvent() {
                                 "AFTER type AddColumnEvent error: Column %s 
does not exist in table %s",
                                 "col10", tableId));
 
+        // add duplicate column should be ignored (idempotency)
+        addedColumns = new ArrayList<>();
+        addedColumns.add(
+                new AddColumnEvent.ColumnWithPosition(
+                        Column.physicalColumn("col3", DataTypes.STRING()),
+                        AddColumnEvent.ColumnPosition.LAST,
+                        null));
+        addColumnEvent = new AddColumnEvent(tableId, addedColumns);
+        schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);

Review Comment:
   Current tests cover replay of a single-column `AddColumnEvent`. Since the 
implementation now keeps `existingColumnNames` across iterations, add a unit 
test where a single `AddColumnEvent` contains the same column name twice (e.g., 
two `ColumnWithPosition` entries for `col3`), and assert only one column is 
added. This directly validates the new intra-event dedup behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to