This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 68ee0c902 [FLINK-39346] Improve error message of invalid
AddColumnEvents (#4353)
68ee0c902 is described below
commit 68ee0c902200ae421de12520ddf871a51863f3f2
Author: Pei Yu <[email protected]>
AuthorDate: Mon Mar 30 09:28:09 2026 +0800
[FLINK-39346] Improve error message of invalid AddColumnEvents (#4353)
---
.../apache/flink/cdc/common/utils/SchemaUtils.java | 12 ++++---
.../flink/cdc/common/utils/SchemaUtilsTest.java | 40 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 4 deletions(-)
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
index 5a3cfb001..8ef9cd298 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
@@ -143,8 +143,10 @@ public class SchemaUtils {
int index =
columnNames.indexOf(columnWithPosition.getExistedColumnName());
if (index < 0) {
throw new IllegalArgumentException(
- columnWithPosition.getExistedColumnName()
- + " of AddColumnEvent is not
existed");
+ String.format(
+ "BEFORE type AddColumnEvent error:
Column %s does not exist in table %s",
+
columnWithPosition.getExistedColumnName(),
+ event.tableId()));
}
columns.add(index, columnWithPosition.getAddColumn());
break;
@@ -159,8 +161,10 @@ public class SchemaUtils {
int index =
columnNames.indexOf(columnWithPosition.getExistedColumnName());
if (index < 0) {
throw new IllegalArgumentException(
- columnWithPosition.getExistedColumnName()
- + " of AddColumnEvent is not
existed");
+ String.format(
+ "AFTER type AddColumnEvent error:
Column %s does not exist in table %s",
+
columnWithPosition.getExistedColumnName(),
+ event.tableId()));
}
columns.add(index + 1,
columnWithPosition.getAddColumn());
break;
diff --git
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
index cdf1532d2..a5f3f86ad 100644
---
a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
+++
b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java
@@ -120,6 +120,46 @@ class SchemaUtilsTest {
.physicalColumn("col3", DataTypes.STRING())
.build());
+ // wrong add column in before type
+ final Schema finalSchema = schema;
+ Assertions.assertThatThrownBy(
+ () ->
+ SchemaUtils.applySchemaChangeEvent(
+ finalSchema,
+ new AddColumnEvent(
+ tableId,
+ List.of(
+ new
AddColumnEvent.ColumnWithPosition(
+
Column.physicalColumn(
+
"col6", DataTypes.STRING()),
+
AddColumnEvent.ColumnPosition
+
.BEFORE,
+ "col10")))))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ String.format(
+ "BEFORE type AddColumnEvent error: Column %s
does not exist in table %s",
+ "col10", tableId));
+
+ // wrong add column in after type
+ Assertions.assertThatThrownBy(
+ () ->
+ SchemaUtils.applySchemaChangeEvent(
+ finalSchema,
+ new AddColumnEvent(
+ tableId,
+ List.of(
+ new
AddColumnEvent.ColumnWithPosition(
+
Column.physicalColumn(
+
"col6", DataTypes.STRING()),
+
AddColumnEvent.ColumnPosition.AFTER,
+ "col10")))))
+ .isExactlyInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ String.format(
+ "AFTER type AddColumnEvent error: Column %s
does not exist in table %s",
+ "col10", tableId));
+
// drop columns
DropColumnEvent dropColumnEvent =
new DropColumnEvent(tableId, Arrays.asList("col3", "col5"));