This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 88263cd69f [Fix][Connector-v2] Fix the sql statement error of create 
table for doris and starrocks (#6679)
88263cd69f is described below

commit 88263cd69fb50a5eff1fbdaa57030f2b58208bee
Author: dailai <[email protected]>
AuthorDate: Tue Apr 16 10:03:43 2024 +0800

    [Fix][Connector-v2] Fix the sql statement error of create table for doris 
and starrocks (#6679)
---
 .../seatunnel/api/sink/SaveModeConstants.java      | 29 ----------
 .../seatunnel/api/sink/SaveModePlaceHolder.java    | 66 ++++++++++++++++++++++
 .../seatunnel/common/exception/CommonError.java    | 16 ++++++
 .../common/exception/CommonErrorCode.java          |  5 +-
 .../seatunnel/common/sql/template/SqlTemplate.java | 41 ++++++++++++++
 .../connectors/doris/config/DorisOptions.java      | 18 ++++--
 .../connectors/doris/util/DorisCatalogUtil.java    | 28 ++++++---
 .../doris/catalog/DorisCreateTableTest.java        | 38 +++++++++++++
 .../doris/catalog/PreviewActionTest.java           | 33 +++++++----
 .../starrocks/config/StarRocksSinkOptions.java     | 21 +++++--
 .../starrocks/sink/StarRocksSaveModeUtil.java      | 30 +++++++---
 .../catalog/StarRocksCreateTableTest.java          | 44 +++++++++++++++
 12 files changed, 303 insertions(+), 66 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeConstants.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeConstants.java
deleted file mode 100644
index 04a57db785..0000000000
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeConstants.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.sink;
-
-public class SaveModeConstants {
-
-    public static final String ROWTYPE_PRIMARY_KEY = "rowtype_primary_key";
-    public static final String ROWTYPE_FIELDS = "rowtype_fields";
-    public static final String ROWTYPE_UNIQUE_KEY = "rowtype_unique_key";
-
-    public static final String TABLE_NAME = "table_name";
-
-    public static final String DATABASE = "database";
-}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java
new file mode 100644
index 0000000000..9db13bdae3
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+public enum SaveModePlaceHolder {
+    ROWTYPE_PRIMARY_KEY("rowtype_primary_key", "primary keys"),
+    ROWTYPE_UNIQUE_KEY("rowtype_unique_key", "unique keys"),
+    ROWTYPE_FIELDS("rowtype_fields", "fields"),
+    TABLE_NAME("table_name", "table name"),
+    DATABASE("database", "database");
+
+    private String keyValue;
+    private String display;
+
+    private static final String REPLACE_PLACE_HOLDER = "\\$\\{%s\\}";
+    private static final String PLACE_HOLDER = "${%s}";
+
+    SaveModePlaceHolder(String keyValue, String display) {
+        this.keyValue = keyValue;
+        this.display = display;
+    }
+
+    public static String getDisplay(String placeholder) {
+        Optional<SaveModePlaceHolder> saveModePlaceHolderEnumOptional =
+                Arrays.stream(SaveModePlaceHolder.values())
+                        .filter(
+                                saveModePlaceHolderEnum ->
+                                        placeholder.equals(
+                                                
saveModePlaceHolderEnum.getPlaceHolder()))
+                        .findFirst();
+        if (saveModePlaceHolderEnumOptional.isPresent()) {
+            return saveModePlaceHolderEnumOptional.get().display;
+        }
+        throw new RuntimeException(String.format("Not support the placeholder: 
%s", placeholder));
+    }
+
+    public String getPlaceHolderKey() {
+        return this.keyValue;
+    }
+
+    public String getPlaceHolder() {
+        return String.format(PLACE_HOLDER, getPlaceHolderKey());
+    }
+
+    public String getReplacePlaceHolder() {
+        return String.format(REPLACE_PLACE_HOLDER, getPlaceHolderKey());
+    }
+}
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
index c491666a35..71a0e14676 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
@@ -36,6 +36,7 @@ import static 
org.apache.seatunnel.common.exception.CommonErrorCode.FILE_OPERATI
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.JSON_OPERATION_FAILED;
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.SQL_TEMPLATE_HANDLED_ERROR;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ENCODING;
 import static 
org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;
@@ -183,4 +184,19 @@ public class CommonError {
             return new SeaTunnelRuntimeException(code, params);
         }
     }
+
+    public static SeaTunnelRuntimeException sqlTemplateHandledError(
+            String tableName,
+            String keyName,
+            String template,
+            String placeholder,
+            String optionName) {
+        Map<String, String> params = new HashMap<>();
+        params.put("tableName", tableName);
+        params.put("keyName", keyName);
+        params.put("template", template);
+        params.put("placeholder", placeholder);
+        params.put("optionName", optionName);
+        return new SeaTunnelRuntimeException(SQL_TEMPLATE_HANDLED_ERROR, 
params);
+    }
 }
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 326187e697..4175050705 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -48,7 +48,10 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
             "<identifier> <operation> file '<fileName>' failed, because it not 
existed."),
     WRITE_SEATUNNEL_ROW_ERROR(
             "COMMON-23",
-            "<connector> write SeaTunnelRow failed, the SeaTunnelRow value is 
'<seaTunnelRow>'.");
+            "<connector> write SeaTunnelRow failed, the SeaTunnelRow value is 
'<seaTunnelRow>'."),
+    SQL_TEMPLATE_HANDLED_ERROR(
+            "COMMON-24",
+            "The table of <tableName> has no <keyName>, but the template \n 
<template> \n which has the place holder named <placeholder>. Please use the 
option named <optionName> to specify sql template");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sql/template/SqlTemplate.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sql/template/SqlTemplate.java
new file mode 100644
index 0000000000..dfe6dbc8f0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sql/template/SqlTemplate.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.sql.template;
+
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
+import org.apache.seatunnel.common.exception.CommonError;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class SqlTemplate {
+    public static void canHandledByTemplateWithPlaceholder(
+            String createTemplate,
+            String placeholder,
+            String actualPlaceHolderValue,
+            String tableName,
+            String optionsKey) {
+        if (createTemplate.contains(placeholder) && 
StringUtils.isBlank(actualPlaceHolderValue)) {
+            throw CommonError.sqlTemplateHandledError(
+                    tableName,
+                    SaveModePlaceHolder.getDisplay(placeholder),
+                    createTemplate,
+                    placeholder,
+                    optionsKey);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
index 87f9ddcff8..fa66489eee 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
 import java.util.Map;
@@ -227,11 +228,20 @@ public interface DorisOptions {
             Options.key("save_mode_create_template")
                     .stringType()
                     .defaultValue(
-                            "CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (\n"
-                                    + "${rowtype_fields}\n"
+                            "CREATE TABLE IF NOT EXISTS `"
+                                    + 
SaveModePlaceHolder.DATABASE.getPlaceHolder()
+                                    + "`.`"
+                                    + 
SaveModePlaceHolder.TABLE_NAME.getPlaceHolder()
+                                    + "` (\n"
+                                    + 
SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
+                                    + "\n"
                                     + ") ENGINE=OLAP\n"
-                                    + " UNIQUE KEY (${rowtype_primary_key})\n"
-                                    + "DISTRIBUTED BY HASH 
(${rowtype_primary_key})\n "
+                                    + " UNIQUE KEY ("
+                                    + 
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+                                    + ")\n"
+                                    + "DISTRIBUTED BY HASH ("
+                                    + 
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+                                    + ")\n "
                                     + "PROPERTIES (\n"
                                     + "\"replication_allocation\" = 
\"tag.location.default: 1\",\n"
                                     + "\"in_memory\" = \"false\",\n"
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
index f03d488de9..3d3489cae0 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.doris.util;
 
-import org.apache.seatunnel.api.sink.SaveModeConstants;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -28,6 +28,8 @@ import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.doris.config.DorisOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sql.template.SqlTemplate;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -125,14 +127,25 @@ public class DorisCatalogUtil {
                             .map(r -> "`" + r.getColumnName() + "`")
                             .collect(Collectors.joining(","));
         }
+        SqlTemplate.canHandledByTemplateWithPlaceholder(
+                template,
+                SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder(),
+                primaryKey,
+                tablePath.getFullName(),
+                DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key());
         template =
                 template.replaceAll(
-                        String.format("\\$\\{%s\\}", 
SaveModeConstants.ROWTYPE_PRIMARY_KEY),
+                        
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(),
                         primaryKey);
+        SqlTemplate.canHandledByTemplateWithPlaceholder(
+                template,
+                SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getPlaceHolder(),
+                uniqueKey,
+                tablePath.getFullName(),
+                DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key());
         template =
                 template.replaceAll(
-                        String.format("\\$\\{%s\\}", 
SaveModeConstants.ROWTYPE_UNIQUE_KEY),
-                        uniqueKey);
+                        
SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), uniqueKey);
         Map<String, CreateTableParser.ColumnInfo> columnInTemplate =
                 CreateTableParser.getColumnList(template);
         template = mergeColumnInTemplate(columnInTemplate, tableSchema, 
template);
@@ -143,14 +156,13 @@ public class DorisCatalogUtil {
                         .map(DorisCatalogUtil::columnToDorisType)
                         .collect(Collectors.joining(",\n"));
         return template.replaceAll(
-                        String.format("\\$\\{%s\\}", 
SaveModeConstants.DATABASE),
+                        SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(),
                         tablePath.getDatabaseName())
                 .replaceAll(
-                        String.format("\\$\\{%s\\}", 
SaveModeConstants.TABLE_NAME),
+                        SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(),
                         tablePath.getTableName())
                 .replaceAll(
-                        String.format("\\$\\{%s\\}", 
SaveModeConstants.ROWTYPE_FIELDS),
-                        rowTypeFields);
+                        
SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields);
     }
 
     private static String mergeColumnInTemplate(
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
index d33e1747a1..dd338d4ff8 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.doris.catalog;
 
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
@@ -28,8 +29,13 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.connectors.doris.config.DorisOptions;
 import org.apache.seatunnel.connectors.doris.util.DorisCatalogUtil;
 
+import org.apache.commons.lang3.StringUtils;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -130,6 +136,38 @@ public class DorisCreateTableTest {
                         + "\"storage_format\" = \"V2\",\n"
                         + "\"disable_auto_compaction\" = \"false\"\n"
                         + ")");
+
+        String createTemplate = 
DorisOptions.SAVE_MODE_CREATE_TEMPLATE.defaultValue();
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("test", "test1", "test2"),
+                        TableSchema.builder()
+                                .primaryKey(
+                                        PrimaryKey.of(StringUtils.EMPTY, 
Collections.emptyList()))
+                                .constraintKey(Collections.emptyList())
+                                .columns(columns)
+                                .build(),
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        "");
+        TablePath tablePath = TablePath.of("test1.test2");
+        SeaTunnelRuntimeException actualSeaTunnelRuntimeException =
+                Assertions.assertThrows(
+                        SeaTunnelRuntimeException.class,
+                        () ->
+                                DorisCatalogUtil.getCreateTableStatement(
+                                        createTemplate, tablePath, 
catalogTable));
+        String primaryKeyHolder = 
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder();
+        SeaTunnelRuntimeException exceptSeaTunnelRuntimeException =
+                CommonError.sqlTemplateHandledError(
+                        tablePath.getFullName(),
+                        SaveModePlaceHolder.getDisplay(primaryKeyHolder),
+                        createTemplate,
+                        primaryKeyHolder,
+                        DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+        Assertions.assertEquals(
+                exceptSeaTunnelRuntimeException.getMessage(),
+                actualSeaTunnelRuntimeException.getMessage());
     }
 
     @Test
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
index ffca59c5e1..8e1d2ef1bd 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -31,6 +32,8 @@ import org.apache.seatunnel.api.table.type.BasicType;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import com.google.common.collect.Lists;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Optional;
@@ -41,14 +44,23 @@ public class PreviewActionTest {
             CatalogTable.of(
                     TableIdentifier.of("catalog", "database", "table"),
                     TableSchema.builder()
-                            .column(
-                                    PhysicalColumn.of(
-                                            "test",
-                                            BasicType.STRING_TYPE,
-                                            (Long) null,
-                                            true,
-                                            null,
-                                            ""))
+                            .primaryKey(PrimaryKey.of("", 
Lists.newArrayList("id")))
+                            .columns(
+                                    Lists.newArrayList(
+                                            PhysicalColumn.of(
+                                                    "id",
+                                                    BasicType.LONG_TYPE,
+                                                    (Long) null,
+                                                    false,
+                                                    null,
+                                                    ""),
+                                            PhysicalColumn.of(
+                                                    "test",
+                                                    BasicType.STRING_TYPE,
+                                                    (Long) null,
+                                                    true,
+                                                    null,
+                                                    "")))
                             .build(),
                     Collections.emptyMap(),
                     Collections.emptyList(),
@@ -92,10 +104,11 @@ public class PreviewActionTest {
                 catalog,
                 Catalog.ActionType.CREATE_TABLE,
                 "CREATE TABLE IF NOT EXISTS `testddatabase`.`testtable` (\n"
+                        + "`id` BIGINT(1) NOT NULL ,\n"
                         + "`test` STRING NULL \n"
                         + ") ENGINE=OLAP\n"
-                        + " UNIQUE KEY ()\n"
-                        + "DISTRIBUTED BY HASH ()\n"
+                        + " UNIQUE KEY (`id`)\n"
+                        + "DISTRIBUTED BY HASH (`id`)\n"
                         + " PROPERTIES (\n"
                         + "\"replication_allocation\" = 
\"tag.location.default: 1\",\n"
                         + "\"in_memory\" = \"false\",\n"
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index c6cd605063..937284cd66 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat;
 
@@ -57,12 +58,22 @@ public interface StarRocksSinkOptions {
             Options.key("save_mode_create_template")
                     .stringType()
                     .defaultValue(
-                            "CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (\n"
-                                    + "${rowtype_primary_key},\n"
-                                    + "${rowtype_fields}\n"
+                            "CREATE TABLE IF NOT EXISTS `"
+                                    + 
SaveModePlaceHolder.DATABASE.getPlaceHolder()
+                                    + "`.`"
+                                    + 
SaveModePlaceHolder.TABLE_NAME.getPlaceHolder()
+                                    + "` (\n"
+                                    + 
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+                                    + ",\n"
+                                    + 
SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
+                                    + "\n"
                                     + ") ENGINE=OLAP\n"
-                                    + " PRIMARY KEY (${rowtype_primary_key})\n"
-                                    + "DISTRIBUTED BY HASH 
(${rowtype_primary_key})"
+                                    + " PRIMARY KEY ("
+                                    + 
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+                                    + ")\n"
+                                    + "DISTRIBUTED BY HASH ("
+                                    + 
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+                                    + ")"
                                     + "PROPERTIES (\n"
                                     + "    \"replication_num\" = \"1\" \n"
                                     + ")")
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
index 3b65d65a5b..0c2718d0b8 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
@@ -17,13 +17,15 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
 
-import org.apache.seatunnel.api.sink.SaveModeConstants;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sql.template.SqlTemplate;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.util.CreateTableParser;
 
 import org.apache.commons.lang3.StringUtils;
@@ -55,14 +57,26 @@ public class StarRocksSaveModeUtil {
                             .map(r -> "`" + r.getColumnName() + "`")
                             .collect(Collectors.joining(","));
         }
+        SqlTemplate.canHandledByTemplateWithPlaceholder(
+                template,
+                SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder(),
+                primaryKey,
+                TablePath.of(database, table).getFullName(),
+                StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
         template =
                 template.replaceAll(
-                        String.format("\\$\\{%s\\}", 
SaveModeConstants.ROWTYPE_PRIMARY_KEY),
+                        
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(),
                         primaryKey);
+        SqlTemplate.canHandledByTemplateWithPlaceholder(
+                template,
+                SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getPlaceHolder(),
+                uniqueKey,
+                TablePath.of(database, table).getFullName(),
+                StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+
         template =
                 template.replaceAll(
-                        String.format("\\$\\{%s\\}", 
SaveModeConstants.ROWTYPE_UNIQUE_KEY),
-                        uniqueKey);
+                        
SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), uniqueKey);
         Map<String, CreateTableParser.ColumnInfo> columnInTemplate =
                 CreateTableParser.getColumnList(template);
         template = mergeColumnInTemplate(columnInTemplate, tableSchema, 
template);
@@ -72,12 +86,10 @@ public class StarRocksSaveModeUtil {
                         .filter(column -> 
!columnInTemplate.containsKey(column.getName()))
                         .map(StarRocksSaveModeUtil::columnToStarrocksType)
                         .collect(Collectors.joining(",\n"));
-        return template.replaceAll(
-                        String.format("\\$\\{%s\\}", 
SaveModeConstants.DATABASE), database)
-                .replaceAll(String.format("\\$\\{%s\\}", 
SaveModeConstants.TABLE_NAME), table)
+        return 
template.replaceAll(SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(), 
database)
+                
.replaceAll(SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(), table)
                 .replaceAll(
-                        String.format("\\$\\{%s\\}", 
SaveModeConstants.ROWTYPE_FIELDS),
-                        rowTypeFields);
+                        
SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields);
     }
 
     private static String columnToStarrocksType(Column column) {
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
index 2e15ea3911..d7f759de2a 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
@@ -17,16 +17,25 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
 
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSaveModeUtil;
 
+import org.apache.commons.lang3.StringUtils;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -119,6 +128,41 @@ public class StarRocksCreateTableTest {
                         + "    \"dynamic_partition.prefix\" = \"p\"            
                                                                                
                                                                               
\n"
                         + ");",
                 result);
+
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("test", "test1", "test2"),
+                        TableSchema.builder()
+                                .primaryKey(
+                                        PrimaryKey.of(StringUtils.EMPTY, 
Collections.emptyList()))
+                                .constraintKey(Collections.emptyList())
+                                .columns(columns)
+                                .build(),
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        "");
+        TablePath tablePath = TablePath.of("test1.test2");
+        String createTemplate = 
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.defaultValue();
+        RuntimeException actualSeaTunnelRuntimeException =
+                Assertions.assertThrows(
+                        RuntimeException.class,
+                        () ->
+                                StarRocksSaveModeUtil.getCreateTableSql(
+                                        createTemplate,
+                                        tablePath.getDatabaseName(),
+                                        tablePath.getTableName(),
+                                        catalogTable.getTableSchema()));
+        String primaryKeyHolder = 
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder();
+        SeaTunnelRuntimeException exceptSeaTunnelRuntimeException =
+                CommonError.sqlTemplateHandledError(
+                        tablePath.getFullName(),
+                        SaveModePlaceHolder.getDisplay(primaryKeyHolder),
+                        createTemplate,
+                        primaryKeyHolder,
+                        StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+        Assertions.assertEquals(
+                exceptSeaTunnelRuntimeException.getMessage(),
+                actualSeaTunnelRuntimeException.getMessage());
     }
 
     @Test


Reply via email to