This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 1796eb66c [FLINK-38596][pipeline-connector][maxcompute] Fix the column 
comment of CreateTableEvent and AddColumnEvent (#4215)
1796eb66c is described below

commit 1796eb66cf03fea53643728de3a6aca2ee097bf4
Author: SkylerLin <[email protected]>
AuthorDate: Fri Jan 9 18:00:06 2026 +0800

    [FLINK-38596][pipeline-connector][maxcompute] Fix the column comment of 
CreateTableEvent and AddColumnEvent (#4215)
---
 .../maxcompute/utils/SchemaEvolutionUtils.java     | 13 ++++---
 .../maxcompute/utils/TypeConvertUtils.java         |  4 +++
 .../maxcompute/utils/TypeConvertUtilsTest.java     | 42 ++++++++++++++++++++++
 3 files changed, 55 insertions(+), 4 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java
index fc31d8f95..573fd9890 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java
@@ -122,10 +122,15 @@ public class SchemaEvolutionUtils {
                 sqlBuilder
                         .append(addColumn.getAddColumn().getName())
                         .append(" ")
-                        .append(string(addColumn.getAddColumn().getType()))
-                        .append(" comment '")
-                        
.append(addColumn.getAddColumn().getType().asSummaryString())
-                        .append("',");
+                        .append(string(addColumn.getAddColumn().getType()));
+                // Add comment if available
+                if (addColumn.getAddColumn().getComment() != null) {
+                    sqlBuilder
+                            .append(" comment '")
+                            .append(addColumn.getAddColumn().getComment())
+                            .append("'");
+                }
+                sqlBuilder.append(",");
             } else {
                 throw new UnsupportedOperationException(
                         "Not support position: "
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtils.java
index 41d303a3b..a76bd8ef2 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtils.java
@@ -117,6 +117,10 @@ public class TypeConvertUtils {
         if (!type.isNullable() || notNull) {
             columnBuilder.notNull();
         }
+        // Set column comment if available
+        if (flinkColumn.getComment() != null) {
+            columnBuilder.withComment(flinkColumn.getComment());
+        }
         return columnBuilder.build();
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtilsTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtilsTest.java
index ad6dd684c..26edede0d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtilsTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/test/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtilsTest.java
@@ -184,4 +184,46 @@ class TypeConvertUtilsTest {
                 
"char,varchar,string,false,=01=02=03=04=05,=01=02=03=04=05=06=07=08=09=0A,0.00,1,2,12345,12345,123.456,123456.789,00:20:34.567,2003-10-20,1970-01-01T00:00,1970-01-01T00:00:00Z,1970-01-01T00:00:00Z";
         assertThat(arrayRecord).hasToString(expect);
     }
+
+    @Test
+    void testColumnCommentConversion() {
+        // Test column with comment
+        org.apache.flink.cdc.common.schema.Column columnWithComment =
+                org.apache.flink.cdc.common.schema.Column.physicalColumn(
+                        "user_id", DataTypes.BIGINT(), "Primary key for user 
ID");
+        com.aliyun.odps.Column maxComputeColumn =
+                TypeConvertUtils.toMaxCompute(columnWithComment, false);
+
+        assertThat(maxComputeColumn.getName()).isEqualTo("user_id");
+        
assertThat(maxComputeColumn.getTypeInfo().getTypeName().toLowerCase()).isEqualTo("bigint");
+        assertThat(maxComputeColumn.getComment()).isEqualTo("Primary key for 
user ID");
+
+        // Test column without comment
+        org.apache.flink.cdc.common.schema.Column columnWithoutComment =
+                org.apache.flink.cdc.common.schema.Column.physicalColumn(
+                        "name", DataTypes.STRING());
+        com.aliyun.odps.Column maxComputeColumnNoComment =
+                TypeConvertUtils.toMaxCompute(columnWithoutComment, false);
+
+        assertThat(maxComputeColumnNoComment.getName()).isEqualTo("name");
+        
assertThat(maxComputeColumnNoComment.getTypeInfo().getTypeName().toLowerCase())
+                .isEqualTo("string");
+        assertThat(maxComputeColumnNoComment.getComment()).isNull();
+
+        // Test schema conversion with comments
+        Schema schemaWithComments =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT(), "Primary key 
ID")
+                        .physicalColumn("username", DataTypes.STRING(), "User 
name")
+                        .physicalColumn("email", DataTypes.STRING(), "Email 
address")
+                        .build();
+
+        TableSchema maxComputeSchema = 
TypeConvertUtils.toMaxCompute(schemaWithComments);
+        List<com.aliyun.odps.Column> columns = 
maxComputeSchema.getAllColumns();
+
+        assertThat(columns).hasSize(3);
+        assertThat(columns.get(0).getComment()).isEqualTo("Primary key ID");
+        assertThat(columns.get(1).getComment()).isEqualTo("User name");
+        assertThat(columns.get(2).getComment()).isEqualTo("Email address");
+    }
 }

Reply via email to