[
https://issues.apache.org/jira/browse/FLINK-39412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
kianwee updated FLINK-39412:
----------------------------
Description:
### Problem
When a Flink CDC pipeline recovers from a checkpoint/savepoint, binlog events
may be replayed, causing {{AddColumnEvent}} to be applied for columns that
already exist in the cached schema. This leads to a {{RowType}} validation
failure:
\{code:java}
org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException:
Failed to pre-transform with
AddColumnEvent\{tableId=ecrm_btwl.kd_store_coupon,
addedColumns=[ColumnWithPosition {column=`valid_date` STRING, position=LAST,
existedColumnName=null}
]}
...
Caused by: java.lang.IllegalArgumentException: Field names must be unique.
Found duplicates: [valid_date]
at
org.apache.flink.cdc.common.types.RowType.validateFields(RowType.java:158)
at
org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.processElement(PreTransformOperator.java:230)
\{code}
### Root Cause
{{SchemaUtils.applyAddColumnEvent()}} blindly adds columns without checking
if a column with the same name already exists. While
{{isSchemaChangeEventRedundant()}} exists as a utility method,
{{PreTransformOperator.cacheChangeSchema()}} does
not call it before applying schema changes.
This can be triggered when:
* A job restores from checkpoint/savepoint and the binlog offset rolls back,
replaying a historical {{ALTER TABLE ADD COLUMN}} DDL.
* The snapshot phase captures a schema that already includes the column, but
the binlog stream still contains the corresponding DDL event.
### Fix
Add an idempotency check in {{SchemaUtils.applyAddColumnEvent()}} to skip
columns whose name already exists in the current schema. This is the most
defensive fix location since it protects all callers of
{{{}applySchemaChangeEvent(){}}}, not just
{{{}PreTransformOperator{}}}.
PR: [https://github.com/apache/flink-cdc/pull/4370]
Priority: Major
Type: Bug
was:
### Problem
When a Flink CDC pipeline recovers from a checkpoint/savepoint, binlog events
may be replayed, causing \{{AddColumnEvent}} to be applied for columns that
already exist in the cached schema. This leads to a \{{RowType}} validation
failure:
\{code:java}
org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException:
Failed to pre-transform with
AddColumnEvent\{tableId=ecrm_btwl.kd_store_coupon,
addedColumns=[ColumnWithPosition{column=`valid_date` STRING, position=LAST,
existedColumnName=null}]}
...
Caused by: java.lang.IllegalArgumentException: Field names must be unique.
Found duplicates: [valid_date]
at
org.apache.flink.cdc.common.types.RowType.validateFields(RowType.java:158)
at
org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.processElement(PreTransformOperator.java:230)
\{code}
### Root Cause
\{{SchemaUtils.applyAddColumnEvent()}} blindly adds columns without checking
if a column with the same name already exists. While
\{{isSchemaChangeEventRedundant()}} exists as a utility method,
\{{PreTransformOperator.cacheChangeSchema()}} does
not call it before applying schema changes.
This can be triggered when:
* A job restores from checkpoint/savepoint and the binlog offset rolls back,
replaying a historical \{{ALTER TABLE ADD COLUMN}} DDL.
* The snapshot phase captures a schema that already includes the column, but
the binlog stream still contains the corresponding DDL event.
### Fix
Add an idempotency check in \{{SchemaUtils.applyAddColumnEvent()}} to skip
columns whose name already exists in the current schema. This is the most
defensive fix location since it protects all callers of
\{{applySchemaChangeEvent()}}, not just
\{{PreTransformOperator}}.
PR: https://github.com/apache/flink-cdc/pull/4370
Priority: Major
Type: Bug
> AddColumnEvent fails with duplicate field names when schema change events are
> replayed after failover
> -----------------------------------------------------------------------------------------------------
>
> Key: FLINK-39412
> URL: https://issues.apache.org/jira/browse/FLINK-39412
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Reporter: kianwee
> Priority: Major
>
> ### Problem
>
> When a Flink CDC pipeline recovers from a checkpoint/savepoint, binlog
> events may be replayed, causing {{AddColumnEvent}} to be applied for columns
> that already exist in the cached schema. This leads to a {{RowType}}
> validation failure:
>
> \{code:java}
>
>
>
>
> org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException:
> Failed to pre-transform with
> AddColumnEvent\{tableId=ecrm_btwl.kd_store_coupon,
> addedColumns=[ColumnWithPosition {column=`valid_date` STRING, position=LAST,
> existedColumnName=null}
> ]}
>
> ...
>
>
>
> Caused by: java.lang.IllegalArgumentException: Field names must be unique.
> Found duplicates: [valid_date]
>
>
> at
> org.apache.flink.cdc.common.types.RowType.validateFields(RowType.java:158)
>
>
>
> at
> org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.processElement(PreTransformOperator.java:230)
> \{code}
>
>
>
>
>
> ### Root Cause
>
>
>
>
>
> {{SchemaUtils.applyAddColumnEvent()}} blindly adds columns without checking
> if a column with the same name already exists. While
> {{isSchemaChangeEventRedundant()}} exists as a utility method,
> {{PreTransformOperator.cacheChangeSchema()}} does
> not call it before applying schema changes.
>
>
>
>
> This can be triggered when:
>
> * A job restores from checkpoint/savepoint and the binlog offset rolls
> back, replaying a historical {{ALTER TABLE ADD COLUMN}} DDL.
> * The snapshot phase captures a schema that already includes the column,
> but the binlog stream still contains the corresponding DDL event.
>
>
>
>
>
>
> ### Fix
>
>
>
>
>
>
>
> Add an idempotency check in {{SchemaUtils.applyAddColumnEvent()}} to skip
> columns whose name already exists in the current schema. This is the most
> defensive fix location since it protects all callers of
> {{{}applySchemaChangeEvent(){}}}, not just
> {{{}PreTransformOperator{}}}.
>
>
>
>
> PR: [https://github.com/apache/flink-cdc/pull/4370]
>
>
> Priority: Major
>
>
>
>
> Type: Bug
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
