This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 097aa9a6e [FLINK-38834][pipline-connector/starrocks] Support alter
column type DDL for starrocks sink connector (#4198)
097aa9a6e is described below
commit 097aa9a6eaa7ed525ab69de01eababdef6ac7a5f
Author: Mingliang Zhu <[email protected]>
AuthorDate: Thu Dec 25 16:57:11 2025 +0800
[FLINK-38834][pipline-connector/starrocks] Support alter column type DDL
for starrocks sink connector (#4198)
---
.../starrocks/sink/StarRocksEnrichedCatalog.java | 79 ++++++++++++++++++++++
.../starrocks/sink/StarRocksMetadataApplier.java | 27 +++++---
.../connectors/starrocks/sink/StarRocksUtils.java | 9 ++-
.../sink/StarRocksMetadataApplierITCase.java | 7 +-
.../sink/utils/StarRocksSinkTestBase.java | 28 ++++++++
5 files changed, 133 insertions(+), 17 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java
index 0c6b90aac..70f1f50f7 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java
@@ -22,11 +22,13 @@ import org.apache.flink.util.StringUtils;
import com.starrocks.connector.flink.catalog.StarRocksCatalog;
import com.starrocks.connector.flink.catalog.StarRocksCatalogException;
+import com.starrocks.connector.flink.catalog.StarRocksColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.Optional;
/** An enriched {@code StarRocksCatalog} with more schema evolution abilities.
*/
public class StarRocksEnrichedCatalog extends StarRocksCatalog {
@@ -104,6 +106,37 @@ public class StarRocksEnrichedCatalog extends
StarRocksCatalog {
}
}
+ public void alterColumnType(String databaseName, String tableName,
StarRocksColumn column)
+ throws StarRocksCatalogException {
+ checkTableArgument(databaseName, tableName);
+ Preconditions.checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(column.getColumnName()),
+ "column name cannot be null or empty.");
+ String alterSql = buildAlterColumnTypeSql(databaseName, tableName,
buildColumnStmt(column));
+ try {
+ long startTimeMillis = System.currentTimeMillis();
+ executeUpdateStatement(alterSql);
+ LOG.info(
+ "Success to alter table {}.{} modify column type,
duration: {}ms, sql: {}",
+ databaseName,
+ tableName,
+ System.currentTimeMillis() - startTimeMillis,
+ alterSql);
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to alter table {}.{} modify column type, sql: {}",
+ databaseName,
+ tableName,
+ alterSql,
+ e);
+ throw new StarRocksCatalogException(
+ String.format(
+ "Failed to alter table %s.%s modify column type",
+ databaseName, tableName),
+ e);
+ }
+ }
+
private String buildTruncateTableSql(String databaseName, String
tableName) {
return String.format("TRUNCATE TABLE `%s`.`%s`;", databaseName,
tableName);
}
@@ -119,6 +152,12 @@ public class StarRocksEnrichedCatalog extends
StarRocksCatalog {
databaseName, tableName, oldColumnName, newColumnName);
}
+ private String buildAlterColumnTypeSql(
+ String databaseName, String tableName, String columnStmt) {
+ return String.format(
+ "ALTER TABLE `%s`.`%s` MODIFY COLUMN %s", databaseName,
tableName, columnStmt);
+ }
+
private void executeUpdateStatement(String sql) throws
StarRocksCatalogException {
try {
Method m =
@@ -140,4 +179,44 @@ public class StarRocksEnrichedCatalog extends
StarRocksCatalog {
!StringUtils.isNullOrWhitespaceOnly(tableName),
"Table name cannot be null or empty.");
}
+
+ private String buildColumnStmt(StarRocksColumn column) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("`");
+ builder.append(column.getColumnName());
+ builder.append("` ");
+ builder.append(
+ getFullColumnType(
+ column.getDataType(), column.getColumnSize(),
column.getDecimalDigits()));
+ builder.append(" ");
+ builder.append(column.isNullable() ? "NULL" : "NOT NULL");
+ if (column.getDefaultValue().isPresent()) {
+ builder.append(String.format(" DEFAULT \"%s\"",
column.getDefaultValue().get()));
+ }
+
+ if (column.getColumnComment().isPresent()) {
+ builder.append(String.format(" COMMENT \"%s\"",
column.getColumnComment().get()));
+ }
+ return builder.toString();
+ }
+
+ private String getFullColumnType(
+ String type, Optional<Integer> columnSize, Optional<Integer>
decimalDigits) {
+ String dataType = type.toUpperCase();
+ switch (dataType) {
+ case "DECIMAL":
+ Preconditions.checkArgument(
+ columnSize.isPresent(), "DECIMAL type must have column
size");
+ Preconditions.checkArgument(
+ decimalDigits.isPresent(), "DECIMAL type must have
decimal digits");
+ return String.format("DECIMAL(%d, %s)", columnSize.get(),
decimalDigits.get());
+ case "CHAR":
+ case "VARCHAR":
+ Preconditions.checkArgument(
+ columnSize.isPresent(), type + " type must have column
size");
+ return String.format("%s(%d)", dataType, columnSize.get());
+ default:
+ return dataType;
+ }
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
index 763b1f906..279710950 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java
@@ -29,9 +29,9 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
-import
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
@@ -91,6 +91,7 @@ public class StarRocksMetadataApplier implements
MetadataApplier {
SchemaChangeEventType.ADD_COLUMN,
SchemaChangeEventType.DROP_COLUMN,
SchemaChangeEventType.RENAME_COLUMN,
+ SchemaChangeEventType.ALTER_COLUMN_TYPE,
SchemaChangeEventType.DROP_TABLE,
SchemaChangeEventType.TRUNCATE_TABLE);
}
@@ -325,15 +326,21 @@ public class StarRocksMetadataApplier implements
MetadataApplier {
}
}
- private void applyAlterColumnType(AlterColumnTypeEvent
alterColumnTypeEvent)
- throws SchemaEvolveException {
- // TODO There are limitations for data type conversions. We should
know the data types
- // before and after changing so that we can make a validation. But the
event only contains
- // data
- // types after changing. One way is that the framework delivers the
old schema. We can
- // support
- // the alter after a discussion.
- throw new UnsupportedSchemaChangeEventException(alterColumnTypeEvent);
+ private void applyAlterColumnType(AlterColumnTypeEvent event) throws
SchemaEvolveException {
+ try {
+ TableId tableId = event.tableId();
+ Map<String, DataType> typeMapping = event.getTypeMapping();
+
+ for (Map.Entry<String, DataType> entry : typeMapping.entrySet()) {
+ StarRocksColumn.Builder builder =
+ new
StarRocksColumn.Builder().setColumnName(entry.getKey());
+ toStarRocksDataType(entry.getValue(), false, builder);
+ catalog.alterColumnType(
+ tableId.getSchemaName(), tableId.getTableName(),
builder.build());
+ }
+ } catch (Exception e) {
+ throw new SchemaEvolveException(event, "fail to apply alter column
type event", e);
+ }
}
private void applyTruncateTable(TruncateTableEvent truncateTableEvent) {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
index e1363fd28..d302f297e 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java
@@ -113,10 +113,15 @@ public class StarRocksUtils {
/** Convert CDC data type to StarRocks data type. */
public static void toStarRocksDataType(
- Column cdcColumn, boolean isPrimaryKeys, StarRocksColumn.Builder
builder) {
+ DataType cdcDataType, boolean isPrimaryKeys,
StarRocksColumn.Builder builder) {
CdcDataTypeTransformer dataTypeTransformer =
new CdcDataTypeTransformer(isPrimaryKeys, builder);
- cdcColumn.getType().accept(dataTypeTransformer);
+ cdcDataType.accept(dataTypeTransformer);
+ }
+
+ public static void toStarRocksDataType(
+ Column cdcColumn, boolean isPrimaryKeys, StarRocksColumn.Builder
builder) {
+ toStarRocksDataType(cdcColumn.getType(), isPrimaryKeys, builder);
}
/** Format DATE type data. */
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
index d7a940a7e..e2dc50551 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java
@@ -55,7 +55,6 @@ import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -163,7 +162,7 @@ class StarRocksMetadataApplierITCase extends
StarRocksSinkTestBase {
Schema.newBuilder()
.column(new PhysicalColumn("id",
DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number",
DataTypes.DOUBLE(), null))
- .column(new PhysicalColumn("name",
DataTypes.VARCHAR(17), null))
+ .column(new PhysicalColumn("name",
DataTypes.VARCHAR(17).notNull(), null))
.primaryKey("id")
.build();
@@ -327,7 +326,6 @@ class StarRocksMetadataApplierITCase extends
StarRocksSinkTestBase {
}
@Test
- @Disabled("Alter column type is not supported currently.")
void testStarRocksAlterColumnType() throws Exception {
TableId tableId =
TableId.tableId(
@@ -335,7 +333,7 @@ class StarRocksMetadataApplierITCase extends
StarRocksSinkTestBase {
StarRocksContainer.STARROCKS_TABLE_NAME);
runJobWithEvents(generateAlterColumnTypeEvents(tableId));
-
+ waitAlterDone(tableId, 60000L);
List<String> actual = inspectTableSchema(tableId);
List<String> expected =
@@ -348,7 +346,6 @@ class StarRocksMetadataApplierITCase extends
StarRocksSinkTestBase {
}
@Test
- @Disabled("Alter column type is not supported currently.")
void testStarRocksNarrowingAlterColumnType() throws Exception {
Assertions.assertThatThrownBy(
() -> {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java
index ab12d902d..471690d09 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java
@@ -39,6 +39,7 @@ import org.testcontainers.containers.Container;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.lifecycle.Startables;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
@@ -227,6 +228,33 @@ public class StarRocksSinkTestBase extends TestLogger {
return results;
}
+ // Starrocks alter column is asynchronous and does not support Light mode.
+ public void waitAlterDone(TableId tableId, long timeout)
+ throws SQLException, InterruptedException {
+ Connection conn = STARROCKS_CONTAINER.createConnection("");
+ conn.createStatement().execute(String.format("USE `%s`",
tableId.getSchemaName()));
+ long t0 = System.currentTimeMillis();
+ while (System.currentTimeMillis() - t0 < timeout) {
+ ResultSet rs =
+ conn.createStatement()
+ .executeQuery(
+ String.format(
+ "SHOW ALTER TABLE COLUMN WHERE
TableName = '%s' ORDER BY CreateTime DESC LIMIT 1",
+ tableId.getTableName()));
+ if (rs.next()) {
+ String state = rs.getString("State");
+ if ("FINISHED".equals(state)) {
+ return;
+ }
+ if ("CANCELLED".equals(state)) {
+ throw new RuntimeException("Alter failed: " +
rs.getString("Msg"));
+ }
+ }
+ Thread.sleep(1000L);
+ }
+ throw new RuntimeException("Alter job timeout");
+ }
+
public static <T> void assertEqualsInAnyOrder(List<T> expected, List<T>
actual) {
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
}