This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 097aa9a6e [FLINK-38834][pipline-connector/starrocks] Support alter 
column type DDL for starrocks sink connector (#4198)
097aa9a6e is described below

commit 097aa9a6eaa7ed525ab69de01eababdef6ac7a5f
Author: Mingliang Zhu <[email protected]>
AuthorDate: Thu Dec 25 16:57:11 2025 +0800

    [FLINK-38834][pipline-connector/starrocks] Support alter column type DDL 
for starrocks sink connector (#4198)
---
 .../starrocks/sink/StarRocksEnrichedCatalog.java   | 79 ++++++++++++++++++++++
 .../starrocks/sink/StarRocksMetadataApplier.java   | 27 +++++---
 .../connectors/starrocks/sink/StarRocksUtils.java  |  9 ++-
 .../sink/StarRocksMetadataApplierITCase.java       |  7 +-
 .../sink/utils/StarRocksSinkTestBase.java          | 28 ++++++++
 5 files changed, 133 insertions(+), 17 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java
index 0c6b90aac..70f1f50f7 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java
@@ -22,11 +22,13 @@ import org.apache.flink.util.StringUtils;
 
 import com.starrocks.connector.flink.catalog.StarRocksCatalog;
 import com.starrocks.connector.flink.catalog.StarRocksCatalogException;
+import com.starrocks.connector.flink.catalog.StarRocksColumn;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Optional;
 
 /** An enriched {@code StarRocksCatalog} with more schema evolution abilities. 
*/
 public class StarRocksEnrichedCatalog extends StarRocksCatalog {
@@ -104,6 +106,37 @@ public class StarRocksEnrichedCatalog extends 
StarRocksCatalog {
         }
     }
 
+    public void alterColumnType(String databaseName, String tableName, 
StarRocksColumn column)
+            throws StarRocksCatalogException {
+        checkTableArgument(databaseName, tableName);
+        Preconditions.checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(column.getColumnName()),
+                "column name cannot be null or empty.");
+        String alterSql = buildAlterColumnTypeSql(databaseName, tableName, 
buildColumnStmt(column));
+        try {
+            long startTimeMillis = System.currentTimeMillis();
+            executeUpdateStatement(alterSql);
+            LOG.info(
+                    "Success to alter table {}.{} modify column type, 
duration: {}ms, sql: {}",
+                    databaseName,
+                    tableName,
+                    System.currentTimeMillis() - startTimeMillis,
+                    alterSql);
+        } catch (Exception e) {
+            LOG.error(
+                    "Failed to alter table {}.{} modify column type, sql: {}",
+                    databaseName,
+                    tableName,
+                    alterSql,
+                    e);
+            throw new StarRocksCatalogException(
+                    String.format(
+                            "Failed to alter table %s.%s modify column type",
+                            databaseName, tableName),
+                    e);
+        }
+    }
+
     private String buildTruncateTableSql(String databaseName, String 
tableName) {
         return String.format("TRUNCATE TABLE `%s`.`%s`;", databaseName, 
tableName);
     }
@@ -119,6 +152,12 @@ public class StarRocksEnrichedCatalog extends 
StarRocksCatalog {
                 databaseName, tableName, oldColumnName, newColumnName);
     }
 
+    private String buildAlterColumnTypeSql(
+            String databaseName, String tableName, String columnStmt) {
+        return String.format(
+                "ALTER TABLE `%s`.`%s` MODIFY COLUMN %s", databaseName, 
tableName, columnStmt);
+    }
+
     private void executeUpdateStatement(String sql) throws 
StarRocksCatalogException {
         try {
             Method m =
@@ -140,4 +179,44 @@ public class StarRocksEnrichedCatalog extends 
StarRocksCatalog {
                 !StringUtils.isNullOrWhitespaceOnly(tableName),
                 "Table name cannot be null or empty.");
     }
+
+    private String buildColumnStmt(StarRocksColumn column) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("`");
+        builder.append(column.getColumnName());
+        builder.append("` ");
+        builder.append(
+                getFullColumnType(
+                        column.getDataType(), column.getColumnSize(), 
column.getDecimalDigits()));
+        builder.append(" ");
+        builder.append(column.isNullable() ? "NULL" : "NOT NULL");
+        if (column.getDefaultValue().isPresent()) {
+            builder.append(String.format(" DEFAULT \"%s\"", 
column.getDefaultValue().get()));
+        }
+
+        if (column.getColumnComment().isPresent()) {
+            builder.append(String.format(" COMMENT \"%s\"", 
column.getColumnComment().get()));
+        }
+        return builder.toString();
+    }
+
+    private String getFullColumnType(
+            String type, Optional<Integer> columnSize, Optional<Integer> 
decimalDigits) {
+        String dataType = type.toUpperCase();
+        switch (dataType) {
+            case "DECIMAL":
+                Preconditions.checkArgument(
+                        columnSize.isPresent(), "DECIMAL type must have column 
size");
+                Preconditions.checkArgument(
+                        decimalDigits.isPresent(), "DECIMAL type must have 
decimal digits");
+                return String.format("DECIMAL(%d, %s)", columnSize.get(), 
decimalDigits.get());
+            case "CHAR":
+            case "VARCHAR":
+                Preconditions.checkArgument(
+                        columnSize.isPresent(), type + " type must have column 
size");
+                return String.format("%s(%d)", dataType, columnSize.get());
+            default:
+                return dataType;
+        }
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
index 763b1f906..279710950 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
@@ -29,9 +29,9 @@ import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.event.TruncateTableEvent;
 import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
 import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
-import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.common.types.DataType;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
 
@@ -91,6 +91,7 @@ public class StarRocksMetadataApplier implements 
MetadataApplier {
                 SchemaChangeEventType.ADD_COLUMN,
                 SchemaChangeEventType.DROP_COLUMN,
                 SchemaChangeEventType.RENAME_COLUMN,
+                SchemaChangeEventType.ALTER_COLUMN_TYPE,
                 SchemaChangeEventType.DROP_TABLE,
                 SchemaChangeEventType.TRUNCATE_TABLE);
     }
@@ -325,15 +326,21 @@ public class StarRocksMetadataApplier implements 
MetadataApplier {
         }
     }
 
-    private void applyAlterColumnType(AlterColumnTypeEvent 
alterColumnTypeEvent)
-            throws SchemaEvolveException {
-        // TODO There are limitations for data type conversions. We should 
know the data types
-        // before and after changing so that we can make a validation. But the 
event only contains
-        // data
-        // types after changing. One way is that the framework delivers the 
old schema. We can
-        // support
-        // the alter after a discussion.
-        throw new UnsupportedSchemaChangeEventException(alterColumnTypeEvent);
+    private void applyAlterColumnType(AlterColumnTypeEvent event) throws 
SchemaEvolveException {
+        try {
+            TableId tableId = event.tableId();
+            Map<String, DataType> typeMapping = event.getTypeMapping();
+
+            for (Map.Entry<String, DataType> entry : typeMapping.entrySet()) {
+                StarRocksColumn.Builder builder =
+                        new 
StarRocksColumn.Builder().setColumnName(entry.getKey());
+                toStarRocksDataType(entry.getValue(), false, builder);
+                catalog.alterColumnType(
+                        tableId.getSchemaName(), tableId.getTableName(), 
builder.build());
+            }
+        } catch (Exception e) {
+            throw new SchemaEvolveException(event, "fail to apply alter column 
type event", e);
+        }
     }
 
     private void applyTruncateTable(TruncateTableEvent truncateTableEvent) {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
index e1363fd28..d302f297e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
@@ -113,10 +113,15 @@ public class StarRocksUtils {
 
     /** Convert CDC data type to StarRocks data type. */
     public static void toStarRocksDataType(
-            Column cdcColumn, boolean isPrimaryKeys, StarRocksColumn.Builder 
builder) {
+            DataType cdcDataType, boolean isPrimaryKeys, 
StarRocksColumn.Builder builder) {
         CdcDataTypeTransformer dataTypeTransformer =
                 new CdcDataTypeTransformer(isPrimaryKeys, builder);
-        cdcColumn.getType().accept(dataTypeTransformer);
+        cdcDataType.accept(dataTypeTransformer);
+    }
+
+    public static void toStarRocksDataType(
+            Column cdcColumn, boolean isPrimaryKeys, StarRocksColumn.Builder 
builder) {
+        toStarRocksDataType(cdcColumn.getType(), isPrimaryKeys, builder);
     }
 
     /** Format DATE type data. */
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
index d7a940a7e..e2dc50551 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -55,7 +55,6 @@ import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
@@ -163,7 +162,7 @@ class StarRocksMetadataApplierITCase extends 
StarRocksSinkTestBase {
                 Schema.newBuilder()
                         .column(new PhysicalColumn("id", 
DataTypes.INT().notNull(), null))
                         .column(new PhysicalColumn("number", 
DataTypes.DOUBLE(), null))
-                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17), null))
+                        .column(new PhysicalColumn("name", 
DataTypes.VARCHAR(17).notNull(), null))
                         .primaryKey("id")
                         .build();
 
@@ -327,7 +326,6 @@ class StarRocksMetadataApplierITCase extends 
StarRocksSinkTestBase {
     }
 
     @Test
-    @Disabled("Alter column type is not supported currently.")
     void testStarRocksAlterColumnType() throws Exception {
         TableId tableId =
                 TableId.tableId(
@@ -335,7 +333,7 @@ class StarRocksMetadataApplierITCase extends 
StarRocksSinkTestBase {
                         StarRocksContainer.STARROCKS_TABLE_NAME);
 
         runJobWithEvents(generateAlterColumnTypeEvents(tableId));
-
+        waitAlterDone(tableId, 60000L);
         List<String> actual = inspectTableSchema(tableId);
 
         List<String> expected =
@@ -348,7 +346,6 @@ class StarRocksMetadataApplierITCase extends 
StarRocksSinkTestBase {
     }
 
     @Test
-    @Disabled("Alter column type is not supported currently.")
     void testStarRocksNarrowingAlterColumnType() throws Exception {
         Assertions.assertThatThrownBy(
                 () -> {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java
index ab12d902d..471690d09 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java
@@ -39,6 +39,7 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
 import org.testcontainers.lifecycle.Startables;
 
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.time.Duration;
@@ -227,6 +228,33 @@ public class StarRocksSinkTestBase extends TestLogger {
         return results;
     }
 
+    // Starrocks alter column is asynchronous and does not support Light mode.
+    public void waitAlterDone(TableId tableId, long timeout)
+            throws SQLException, InterruptedException {
+        Connection conn = STARROCKS_CONTAINER.createConnection("");
+        conn.createStatement().execute(String.format("USE `%s`", 
tableId.getSchemaName()));
+        long t0 = System.currentTimeMillis();
+        while (System.currentTimeMillis() - t0 < timeout) {
+            ResultSet rs =
+                    conn.createStatement()
+                            .executeQuery(
+                                    String.format(
+                                            "SHOW ALTER TABLE COLUMN WHERE 
TableName = '%s' ORDER BY CreateTime DESC LIMIT 1",
+                                            tableId.getTableName()));
+            if (rs.next()) {
+                String state = rs.getString("State");
+                if ("FINISHED".equals(state)) {
+                    return;
+                }
+                if ("CANCELLED".equals(state)) {
+                    throw new RuntimeException("Alter failed: " + 
rs.getString("Msg"));
+                }
+            }
+            Thread.sleep(1000L);
+        }
+        throw new RuntimeException("Alter job timeout");
+    }
+
     public static <T> void assertEqualsInAnyOrder(List<T> expected, List<T> 
actual) {
         
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
     }

Reply via email to