This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 1796eb66c [FLINK-38596][pipeline-connector][maxcompute] Fix the column
comment of CreateTableEvent and AddColumnEvent (#4215)
1796eb66c is described below
commit 1796eb66cf03fea53643728de3a6aca2ee097bf4
Author: SkylerLin <[email protected]>
AuthorDate: Fri Jan 9 18:00:06 2026 +0800
[FLINK-38596][pipeline-connector][maxcompute] Fix the column comment of
CreateTableEvent and AddColumnEvent (#4215)
---
.../maxcompute/utils/SchemaEvolutionUtils.java | 13 ++++---
.../maxcompute/utils/TypeConvertUtils.java | 4 +++
.../maxcompute/utils/TypeConvertUtilsTest.java | 42 ++++++++++++++++++++++
3 files changed, 55 insertions(+), 4 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java
index fc31d8f95..573fd9890 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java
@@ -122,10 +122,15 @@ public class SchemaEvolutionUtils {
sqlBuilder
.append(addColumn.getAddColumn().getName())
.append(" ")
- .append(string(addColumn.getAddColumn().getType()))
- .append(" comment '")
-
.append(addColumn.getAddColumn().getType().asSummaryString())
- .append("',");
+ .append(string(addColumn.getAddColumn().getType()));
+ // Add comment if available
+ if (addColumn.getAddColumn().getComment() != null) {
+ sqlBuilder
+ .append(" comment '")
+ .append(addColumn.getAddColumn().getComment())
+ .append("'");
+ }
+ sqlBuilder.append(",");
} else {
throw new UnsupportedOperationException(
"Not support position: "
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtils.java
index 41d303a3b..a76bd8ef2 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtils.java
@@ -117,6 +117,10 @@ public class TypeConvertUtils {
if (!type.isNullable() || notNull) {
columnBuilder.notNull();
}
+ // Set column comment if available
+ if (flinkColumn.getComment() != null) {
+ columnBuilder.withComment(flinkColumn.getComment());
+ }
return columnBuilder.build();
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtilsTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtilsTest.java
index ad6dd684c..26edede0d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtilsTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtilsTest.java
@@ -184,4 +184,46 @@ class TypeConvertUtilsTest {
"char,varchar,string,false,=01=02=03=04=05,=01=02=03=04=05=06=07=08=09=0A,0.00,1,2,12345,12345,123.456,123456.789,00:20:34.567,2003-10-20,1970-01-01T00:00,1970-01-01T00:00:00Z,1970-01-01T00:00:00Z";
assertThat(arrayRecord).hasToString(expect);
}
+
+ @Test
+ void testColumnCommentConversion() {
+ // Test column with comment
+ org.apache.flink.cdc.common.schema.Column columnWithComment =
+ org.apache.flink.cdc.common.schema.Column.physicalColumn(
+ "user_id", DataTypes.BIGINT(), "Primary key for user
ID");
+ com.aliyun.odps.Column maxComputeColumn =
+ TypeConvertUtils.toMaxCompute(columnWithComment, false);
+
+ assertThat(maxComputeColumn.getName()).isEqualTo("user_id");
+
assertThat(maxComputeColumn.getTypeInfo().getTypeName().toLowerCase()).isEqualTo("bigint");
+ assertThat(maxComputeColumn.getComment()).isEqualTo("Primary key for
user ID");
+
+ // Test column without comment
+ org.apache.flink.cdc.common.schema.Column columnWithoutComment =
+ org.apache.flink.cdc.common.schema.Column.physicalColumn(
+ "name", DataTypes.STRING());
+ com.aliyun.odps.Column maxComputeColumnNoComment =
+ TypeConvertUtils.toMaxCompute(columnWithoutComment, false);
+
+ assertThat(maxComputeColumnNoComment.getName()).isEqualTo("name");
+
assertThat(maxComputeColumnNoComment.getTypeInfo().getTypeName().toLowerCase())
+ .isEqualTo("string");
+ assertThat(maxComputeColumnNoComment.getComment()).isNull();
+
+ // Test schema conversion with comments
+ Schema schemaWithComments =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.BIGINT(), "Primary key
ID")
+ .physicalColumn("username", DataTypes.STRING(), "User
name")
+ .physicalColumn("email", DataTypes.STRING(), "Email
address")
+ .build();
+
+ TableSchema maxComputeSchema =
TypeConvertUtils.toMaxCompute(schemaWithComments);
+ List<com.aliyun.odps.Column> columns =
maxComputeSchema.getAllColumns();
+
+ assertThat(columns).hasSize(3);
+ assertThat(columns.get(0).getComment()).isEqualTo("Primary key ID");
+ assertThat(columns.get(1).getComment()).isEqualTo("User name");
+ assertThat(columns.get(2).getComment()).isEqualTo("Email address");
+ }
}