[ 
https://issues.apache.org/jira/browse/FLINK-38830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-38830:
-----------------------------------
    Labels: Flink-CDC pull-request-available  (was: Flink-CDC)

> PreTransformOperator needs to avoid crashing with "Field names must be 
> unique" error and handle processing a duplicate AddColumnEvent better
> --------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38830
>                 URL: https://issues.apache.org/jira/browse/FLINK-38830
>             Project: Flink
>          Issue Type: New Feature
>          Components: Flink CDC
>    Affects Versions: cdc-3.4.0
>         Environment: * Flink CDC Version: 3.3+ (issue found on 3.4.0, present 
> on master f5204243)
>  * Flink Version: 1.20+
>  * Migration Tool: gh-ost
>  * Configuration: Shadow tables properly excluded via `tables.exclude`
>            Reporter: Vinay Sagar Gonabavi
>            Priority: Major
>              Labels: Flink-CDC, pull-request-available
>
> Flink CDC pipelines can crash upon duplicate ADD COLUMN events (happened in 
> our case during MySQL source gh-ost online schema migrations) with
>  
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Field names must be unique. 
> Found duplicates: [new_column_name]{code}
>  
> Even with gh-ost shadow tables properly excluded, noticed duplicate 
> `AddColumnEvent`s for the same column, causing operators to fail
>  
> *Steps to Reproduce*
> 1. Configure Flink CDC pipeline with gh-ost shadow table exclusion:
> {code:java}
> source:
>    type: mysql
>    tables.exclude: '^\.+.\.+(\_del$|\_ghc$|\_gho)$'
> {code}
> 2. Run gh-ost migration (In our case) but any form of duplication ADD COLUMN 
> events
> {code:java}
> gh-ost --alter="ADD COLUMN new_column INT" --table=users --force-table-names 
> {table_prefix}{code}
>  
> 3. Operators crash: "Field names must be unique"
> *Expected Behavior*
>  * Operators detect column already exists in schema
>  * Skip duplicate `AddColumnEvent` gracefully
>  * Continue processing without crashes
> *Actual Behavior*
>  * Operators crashes and pipeline fails with duplicate column error
>  * Requires manual intervention and job restart
>  * Blocks zero-downtime schema migrations
> *Proposed Solution*
> Add idempotent `AddColumnEvent` handling - operators should check if column 
> exists before applying:
>  * *Core Logic*
>  
> {code:java}
> if (event instanceof AddColumnEvent) {
>     AddColumnEvent addEvent = (AddColumnEvent) event;
>     Schema currentSchema = getCurrentSchema(addEvent.tableId());
>     Set<String> existingColumns = new 
> HashSet<>(currentSchema.getColumnNames());
>     // Filter columns that already exist
>     List<ColumnWithPosition> columnsToAdd =
>         addEvent.getAddedColumns().stream()
>             .filter(col -> 
> !existingColumns.contains(col.getAddColumn().getName()))
>             .collect(Collectors.toList());
>     if (columnsToAdd.isEmpty()) {
>         LOG.info("Skipping duplicate AddColumnEvent for table {} " +
>                 "(likely from gh-ost cutover)", addEvent.tableId());
>         return Optional.empty(); // Skip event
>     }
>     // Create new event with only non-duplicate columns
>     event = new AddColumnEvent(addEvent.tableId(), columnsToAdd);
> }
> {code}
>  * *Apply to Multiple Operators*
> 1. **PreTransformOperator**: Filter duplicates before applying to source 
> schema
> 2. **PostTransformOperator**: Filter duplicates before applying to 
> post-transform schema
> 3. **Sink Operators** (e.g., Paimon BucketAssignOperator): Filter before 
> broadcasting
>  * *Handle Position References*
> When filtering columns, adjust position references:
> - If position references a filtered column → change to `LAST`
> - Prevents dangling references to non-existent columns
>  * *Includes thread-safety fixes for serializers*
>  * *Preserves state consistency*
>  
> I have a working patch validated in production that I can contribute after 
> community review.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to