[ 
https://issues.apache.org/jira/browse/FLINK-39412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kianwee updated FLINK-39412:
----------------------------
    Description: 
### Problem               
          
  When a Flink CDC pipeline recovers from a checkpoint/savepoint, binlog events 
may be replayed, causing {{AddColumnEvent}} to be applied for columns that 
already exist in the cached schema. This leads to a {{RowType}} validation 
failure:       
                            
  \{code:java}                                                                  
                                                                                
                                                                                
      
  
org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException: 
Failed to pre-transform with
      AddColumnEvent\{tableId=ecrm_btwl.kd_store_coupon, 
addedColumns=[ColumnWithPosition {column=`valid_date` STRING, position=LAST, 
existedColumnName=null}

]}                                                                              
          
  ...                                                                           
                                                                                
                                                                                
     
  Caused by: java.lang.IllegalArgumentException: Field names must be unique. 
Found duplicates: [valid_date]                                                  
                                                                                
        
      at 
org.apache.flink.cdc.common.types.RowType.validateFields(RowType.java:158)      
                                                                                
                                                                            
      at 
org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.processElement(PreTransformOperator.java:230)
  \{code}                                                                       
                                                                                
                                                                                
      
                                                                                
                      
 ### Root Cause                                                                 
                                                                                
                                                                                
    
                                                                                
                      
  {{SchemaUtils.applyAddColumnEvent()}} blindly adds columns without checking 
if a column with the same name already exists. While 
{{isSchemaChangeEventRedundant()}} exists as a utility method, 
{{PreTransformOperator.cacheChangeSchema()}} does  
  not call it before applying schema changes.
                                                                                
                                                                                
                                                                                
     
  This can be triggered when:                                                   
                      
  * A job restores from checkpoint/savepoint and the binlog offset rolls back, 
replaying a historical {{ALTER TABLE ADD COLUMN}} DDL.
  * The snapshot phase captures a schema that already includes the column, but 
the binlog stream still contains the corresponding DDL event.                   
                                                                                
      
                                                                                
                                                                                
                                                                                
     
 ### Fix                                                                        
                                                                                
                                                                                
    
                                                                                
                                                                                
                                                                                
     
  Add an idempotency check in {{SchemaUtils.applyAddColumnEvent()}} to skip 
columns whose name already exists in the current schema. This is the most 
defensive fix location since it protects all callers of 
{{{}applySchemaChangeEvent(){}}}, not just 
  {{{}PreTransformOperator{}}}.
                                                                                
                                                                                
                                                                                
     
  PR: [https://github.com/apache/flink-cdc/pull/4370]                           
                        
                            
  Priority: Major                                                               
                                                                                
                                                                                
     
                                    
  Type: Bug                                                                     
                              

  was:
### Problem               
          
  When a Flink CDC pipeline recovers from a checkpoint/savepoint, binlog events 
may be replayed, causing \{{AddColumnEvent}} to be applied for columns that 
already exist in the cached schema. This leads to a \{{RowType}} validation 
failure:       
                            
  \{code:java}                                                                  
                                                                                
                                                                                
      
  
org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException: 
Failed to pre-transform with
      AddColumnEvent\{tableId=ecrm_btwl.kd_store_coupon, 
addedColumns=[ColumnWithPosition{column=`valid_date` STRING, position=LAST, 
existedColumnName=null}]}                                                       
                                 
  ...                                                                           
                                                                                
                                                                                
     
  Caused by: java.lang.IllegalArgumentException: Field names must be unique. 
Found duplicates: [valid_date]                                                  
                                                                                
        
      at 
org.apache.flink.cdc.common.types.RowType.validateFields(RowType.java:158)      
                                                                                
                                                                            
      at 
org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.processElement(PreTransformOperator.java:230)
  \{code}                                                                       
                                                                                
                                                                                
      
                                                                                
                      
### Root Cause                                                                  
                                                                                
                                                                                
   
                                                                                
                      
  \{{SchemaUtils.applyAddColumnEvent()}} blindly adds columns without checking 
if a column with the same name already exists. While 
\{{isSchemaChangeEventRedundant()}} exists as a utility method, 
\{{PreTransformOperator.cacheChangeSchema()}} does  
  not call it before applying schema changes.
                                                                                
                                                                                
                                                                                
     
  This can be triggered when:                                                   
                      
  * A job restores from checkpoint/savepoint and the binlog offset rolls back, 
replaying a historical \{{ALTER TABLE ADD COLUMN}} DDL.
  * The snapshot phase captures a schema that already includes the column, but 
the binlog stream still contains the corresponding DDL event.                   
                                                                                
      
                                                                                
                                                                                
                                                                                
     
 ### Fix                                                                        
                                                                                
                                                                                
    
                                                                                
                                                                                
                                                                                
     
  Add an idempotency check in \{{SchemaUtils.applyAddColumnEvent()}} to skip 
columns whose name already exists in the current schema. This is the most 
defensive fix location since it protects all callers of 
\{{applySchemaChangeEvent()}}, not just 
  \{{PreTransformOperator}}.
                                                                                
                                                                                
                                                                                
     
  PR: https://github.com/apache/flink-cdc/pull/4370                             
                      
                            
  Priority: Major                                                               
                                                                                
                                                                                
     
                                    
  Type: Bug                                                                     
                              


> AddColumnEvent fails with duplicate field names when schema change events are 
> replayed after failover
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39412
>                 URL: https://issues.apache.org/jira/browse/FLINK-39412
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: kianwee
>            Priority: Major
>
> ### Problem               
>           
>   When a Flink CDC pipeline recovers from a checkpoint/savepoint, binlog 
> events may be replayed, causing {{AddColumnEvent}} to be applied for columns 
> that already exist in the cached schema. This leads to a {{RowType}} 
> validation failure:       
>                             
>   \{code:java}                                                                
>                                                                               
>                                                                               
>             
>   
> org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException:
>  Failed to pre-transform with
>       AddColumnEvent\{tableId=ecrm_btwl.kd_store_coupon, 
> addedColumns=[ColumnWithPosition {column=`valid_date` STRING, position=LAST, 
> existedColumnName=null}
> ]}                                                                            
>             
>   ...                                                                         
>                                                                               
>                                                                               
>            
>   Caused by: java.lang.IllegalArgumentException: Field names must be unique. 
> Found duplicates: [valid_date]                                                
>                                                                               
>             
>       at 
> org.apache.flink.cdc.common.types.RowType.validateFields(RowType.java:158)    
>                                                                               
>                                                                               
>   
>       at 
> org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.processElement(PreTransformOperator.java:230)
>   \{code}                                                                     
>                                                                               
>                                                                               
>             
>                                                                               
>                         
>  ### Root Cause                                                               
>                                                                               
>                                                                               
>           
>                                                                               
>                         
>   {{SchemaUtils.applyAddColumnEvent()}} blindly adds columns without checking 
> if a column with the same name already exists. While 
> {{isSchemaChangeEventRedundant()}} exists as a utility method, 
> {{PreTransformOperator.cacheChangeSchema()}} does  
>   not call it before applying schema changes.
>                                                                               
>                                                                               
>                                                                               
>            
>   This can be triggered when:                                                 
>                         
>   * A job restores from checkpoint/savepoint and the binlog offset rolls 
> back, replaying a historical {{ALTER TABLE ADD COLUMN}} DDL.
>   * The snapshot phase captures a schema that already includes the column, 
> but the binlog stream still contains the corresponding DDL event.             
>                                                                               
>               
>                                                                               
>                                                                               
>                                                                               
>            
>  ### Fix                                                                      
>                                                                               
>                                                                               
>           
>                                                                               
>                                                                               
>                                                                               
>            
>   Add an idempotency check in {{SchemaUtils.applyAddColumnEvent()}} to skip 
> columns whose name already exists in the current schema. This is the most 
> defensive fix location since it protects all callers of 
> {{{}applySchemaChangeEvent(){}}}, not just 
>   {{{}PreTransformOperator{}}}.
>                                                                               
>                                                                               
>                                                                               
>            
>   PR: [https://github.com/apache/flink-cdc/pull/4370]                         
>                           
>                             
>   Priority: Major                                                             
>                                                                               
>                                                                               
>            
>                                     
>   Type: Bug                                                                   
>                                 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to