This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 88263cd69f [Fix][Connector-v2] Fix the sql statement error of create
table for doris and starrocks (#6679)
88263cd69f is described below
commit 88263cd69fb50a5eff1fbdaa57030f2b58208bee
Author: dailai <[email protected]>
AuthorDate: Tue Apr 16 10:03:43 2024 +0800
[Fix][Connector-v2] Fix the sql statement error of create table for doris
and starrocks (#6679)
---
.../seatunnel/api/sink/SaveModeConstants.java | 29 ----------
.../seatunnel/api/sink/SaveModePlaceHolder.java | 66 ++++++++++++++++++++++
.../seatunnel/common/exception/CommonError.java | 16 ++++++
.../common/exception/CommonErrorCode.java | 5 +-
.../seatunnel/common/sql/template/SqlTemplate.java | 41 ++++++++++++++
.../connectors/doris/config/DorisOptions.java | 18 ++++--
.../connectors/doris/util/DorisCatalogUtil.java | 28 ++++++---
.../doris/catalog/DorisCreateTableTest.java | 38 +++++++++++++
.../doris/catalog/PreviewActionTest.java | 33 +++++++----
.../starrocks/config/StarRocksSinkOptions.java | 21 +++++--
.../starrocks/sink/StarRocksSaveModeUtil.java | 30 +++++++---
.../catalog/StarRocksCreateTableTest.java | 44 +++++++++++++++
12 files changed, 303 insertions(+), 66 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeConstants.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeConstants.java
deleted file mode 100644
index 04a57db785..0000000000
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeConstants.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.sink;
-
-public class SaveModeConstants {
-
- public static final String ROWTYPE_PRIMARY_KEY = "rowtype_primary_key";
- public static final String ROWTYPE_FIELDS = "rowtype_fields";
- public static final String ROWTYPE_UNIQUE_KEY = "rowtype_unique_key";
-
- public static final String TABLE_NAME = "table_name";
-
- public static final String DATABASE = "database";
-}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java
new file mode 100644
index 0000000000..9db13bdae3
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+public enum SaveModePlaceHolder {
+ ROWTYPE_PRIMARY_KEY("rowtype_primary_key", "primary keys"),
+ ROWTYPE_UNIQUE_KEY("rowtype_unique_key", "unique keys"),
+ ROWTYPE_FIELDS("rowtype_fields", "fields"),
+ TABLE_NAME("table_name", "table name"),
+ DATABASE("database", "database");
+
+ private String keyValue;
+ private String display;
+
+ private static final String REPLACE_PLACE_HOLDER = "\\$\\{%s\\}";
+ private static final String PLACE_HOLDER = "${%s}";
+
+ SaveModePlaceHolder(String keyValue, String display) {
+ this.keyValue = keyValue;
+ this.display = display;
+ }
+
+ public static String getDisplay(String placeholder) {
+ Optional<SaveModePlaceHolder> saveModePlaceHolderEnumOptional =
+ Arrays.stream(SaveModePlaceHolder.values())
+ .filter(
+ saveModePlaceHolderEnum ->
+ placeholder.equals(
+
saveModePlaceHolderEnum.getPlaceHolder()))
+ .findFirst();
+ if (saveModePlaceHolderEnumOptional.isPresent()) {
+ return saveModePlaceHolderEnumOptional.get().display;
+ }
+ throw new RuntimeException(String.format("Not support the placeholder:
%s", placeholder));
+ }
+
+ public String getPlaceHolderKey() {
+ return this.keyValue;
+ }
+
+ public String getPlaceHolder() {
+ return String.format(PLACE_HOLDER, getPlaceHolderKey());
+ }
+
+ public String getReplacePlaceHolder() {
+ return String.format(REPLACE_PLACE_HOLDER, getPlaceHolderKey());
+ }
+}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
index c491666a35..71a0e14676 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java
@@ -36,6 +36,7 @@ import static
org.apache.seatunnel.common.exception.CommonErrorCode.FILE_OPERATI
import static
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.JSON_OPERATION_FAILED;
+import static
org.apache.seatunnel.common.exception.CommonErrorCode.SQL_TEMPLATE_HANDLED_ERROR;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_ENCODING;
import static
org.apache.seatunnel.common.exception.CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR;
@@ -183,4 +184,19 @@ public class CommonError {
return new SeaTunnelRuntimeException(code, params);
}
}
+
+ public static SeaTunnelRuntimeException sqlTemplateHandledError(
+ String tableName,
+ String keyName,
+ String template,
+ String placeholder,
+ String optionName) {
+ Map<String, String> params = new HashMap<>();
+ params.put("tableName", tableName);
+ params.put("keyName", keyName);
+ params.put("template", template);
+ params.put("placeholder", placeholder);
+ params.put("optionName", optionName);
+ return new SeaTunnelRuntimeException(SQL_TEMPLATE_HANDLED_ERROR,
params);
+ }
}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 326187e697..4175050705 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -48,7 +48,10 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
"<identifier> <operation> file '<fileName>' failed, because it not
existed."),
WRITE_SEATUNNEL_ROW_ERROR(
"COMMON-23",
- "<connector> write SeaTunnelRow failed, the SeaTunnelRow value is
'<seaTunnelRow>'.");
+ "<connector> write SeaTunnelRow failed, the SeaTunnelRow value is
'<seaTunnelRow>'."),
+ SQL_TEMPLATE_HANDLED_ERROR(
+ "COMMON-24",
+ "The table of <tableName> has no <keyName>, but the template \n
<template> \n which has the place holder named <placeholder>. Please use the
option named <optionName> to specify sql template");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sql/template/SqlTemplate.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sql/template/SqlTemplate.java
new file mode 100644
index 0000000000..dfe6dbc8f0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sql/template/SqlTemplate.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.sql.template;
+
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
+import org.apache.seatunnel.common.exception.CommonError;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class SqlTemplate {
+ public static void canHandledByTemplateWithPlaceholder(
+ String createTemplate,
+ String placeholder,
+ String actualPlaceHolderValue,
+ String tableName,
+ String optionsKey) {
+ if (createTemplate.contains(placeholder) &&
StringUtils.isBlank(actualPlaceHolderValue)) {
+ throw CommonError.sqlTemplateHandledError(
+ tableName,
+ SaveModePlaceHolder.getDisplay(placeholder),
+ createTemplate,
+ placeholder,
+ optionsKey);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
index 87f9ddcff8..fa66489eee 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import java.util.Map;
@@ -227,11 +228,20 @@ public interface DorisOptions {
Options.key("save_mode_create_template")
.stringType()
.defaultValue(
- "CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (\n"
- + "${rowtype_fields}\n"
+ "CREATE TABLE IF NOT EXISTS `"
+ +
SaveModePlaceHolder.DATABASE.getPlaceHolder()
+ + "`.`"
+ +
SaveModePlaceHolder.TABLE_NAME.getPlaceHolder()
+ + "` (\n"
+ +
SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
+ + "\n"
+ ") ENGINE=OLAP\n"
- + " UNIQUE KEY (${rowtype_primary_key})\n"
- + "DISTRIBUTED BY HASH
(${rowtype_primary_key})\n "
+ + " UNIQUE KEY ("
+ +
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+ + ")\n"
+ + "DISTRIBUTED BY HASH ("
+ +
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+ + ")\n "
+ "PROPERTIES (\n"
+ "\"replication_allocation\" =
\"tag.location.default: 1\",\n"
+ "\"in_memory\" = \"false\",\n"
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
index f03d488de9..3d3489cae0 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.doris.util;
-import org.apache.seatunnel.api.sink.SaveModeConstants;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -28,6 +28,8 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.doris.config.DorisOptions;
+import
org.apache.seatunnel.connectors.seatunnel.common.sql.template.SqlTemplate;
import org.apache.commons.lang3.StringUtils;
@@ -125,14 +127,25 @@ public class DorisCatalogUtil {
.map(r -> "`" + r.getColumnName() + "`")
.collect(Collectors.joining(","));
}
+ SqlTemplate.canHandledByTemplateWithPlaceholder(
+ template,
+ SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder(),
+ primaryKey,
+ tablePath.getFullName(),
+ DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key());
template =
template.replaceAll(
- String.format("\\$\\{%s\\}",
SaveModeConstants.ROWTYPE_PRIMARY_KEY),
+
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(),
primaryKey);
+ SqlTemplate.canHandledByTemplateWithPlaceholder(
+ template,
+ SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getPlaceHolder(),
+ uniqueKey,
+ tablePath.getFullName(),
+ DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key());
template =
template.replaceAll(
- String.format("\\$\\{%s\\}",
SaveModeConstants.ROWTYPE_UNIQUE_KEY),
- uniqueKey);
+
SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), uniqueKey);
Map<String, CreateTableParser.ColumnInfo> columnInTemplate =
CreateTableParser.getColumnList(template);
template = mergeColumnInTemplate(columnInTemplate, tableSchema,
template);
@@ -143,14 +156,13 @@ public class DorisCatalogUtil {
.map(DorisCatalogUtil::columnToDorisType)
.collect(Collectors.joining(",\n"));
return template.replaceAll(
- String.format("\\$\\{%s\\}",
SaveModeConstants.DATABASE),
+ SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(),
tablePath.getDatabaseName())
.replaceAll(
- String.format("\\$\\{%s\\}",
SaveModeConstants.TABLE_NAME),
+ SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(),
tablePath.getTableName())
.replaceAll(
- String.format("\\$\\{%s\\}",
SaveModeConstants.ROWTYPE_FIELDS),
- rowTypeFields);
+
SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields);
}
private static String mergeColumnInTemplate(
diff --git
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
index d33e1747a1..dd338d4ff8 100644
---
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
+++
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.doris.catalog;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
@@ -28,8 +29,13 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.connectors.doris.config.DorisOptions;
import org.apache.seatunnel.connectors.doris.util.DorisCatalogUtil;
+import org.apache.commons.lang3.StringUtils;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -130,6 +136,38 @@ public class DorisCreateTableTest {
+ "\"storage_format\" = \"V2\",\n"
+ "\"disable_auto_compaction\" = \"false\"\n"
+ ")");
+
+ String createTemplate =
DorisOptions.SAVE_MODE_CREATE_TEMPLATE.defaultValue();
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("test", "test1", "test2"),
+ TableSchema.builder()
+ .primaryKey(
+ PrimaryKey.of(StringUtils.EMPTY,
Collections.emptyList()))
+ .constraintKey(Collections.emptyList())
+ .columns(columns)
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "");
+ TablePath tablePath = TablePath.of("test1.test2");
+ SeaTunnelRuntimeException actualSeaTunnelRuntimeException =
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class,
+ () ->
+ DorisCatalogUtil.getCreateTableStatement(
+ createTemplate, tablePath,
catalogTable));
+ String primaryKeyHolder =
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder();
+ SeaTunnelRuntimeException exceptSeaTunnelRuntimeException =
+ CommonError.sqlTemplateHandledError(
+ tablePath.getFullName(),
+ SaveModePlaceHolder.getDisplay(primaryKeyHolder),
+ createTemplate,
+ primaryKeyHolder,
+ DorisOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+ Assertions.assertEquals(
+ exceptSeaTunnelRuntimeException.getMessage(),
+ actualSeaTunnelRuntimeException.getMessage());
}
@Test
diff --git
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
index ffca59c5e1..8e1d2ef1bd 100644
---
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
+++
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -31,6 +32,8 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import com.google.common.collect.Lists;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
@@ -41,14 +44,23 @@ public class PreviewActionTest {
CatalogTable.of(
TableIdentifier.of("catalog", "database", "table"),
TableSchema.builder()
- .column(
- PhysicalColumn.of(
- "test",
- BasicType.STRING_TYPE,
- (Long) null,
- true,
- null,
- ""))
+ .primaryKey(PrimaryKey.of("",
Lists.newArrayList("id")))
+ .columns(
+ Lists.newArrayList(
+ PhysicalColumn.of(
+ "id",
+ BasicType.LONG_TYPE,
+ (Long) null,
+ false,
+ null,
+ ""),
+ PhysicalColumn.of(
+ "test",
+ BasicType.STRING_TYPE,
+ (Long) null,
+ true,
+ null,
+ "")))
.build(),
Collections.emptyMap(),
Collections.emptyList(),
@@ -92,10 +104,11 @@ public class PreviewActionTest {
catalog,
Catalog.ActionType.CREATE_TABLE,
"CREATE TABLE IF NOT EXISTS `testddatabase`.`testtable` (\n"
+ + "`id` BIGINT(1) NOT NULL ,\n"
+ "`test` STRING NULL \n"
+ ") ENGINE=OLAP\n"
- + " UNIQUE KEY ()\n"
- + "DISTRIBUTED BY HASH ()\n"
+ + " UNIQUE KEY (`id`)\n"
+ + "DISTRIBUTED BY HASH (`id`)\n"
+ " PROPERTIES (\n"
+ "\"replication_allocation\" =
\"tag.location.default: 1\",\n"
+ "\"in_memory\" = \"false\",\n"
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index c6cd605063..937284cd66 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.StreamLoadFormat;
@@ -57,12 +58,22 @@ public interface StarRocksSinkOptions {
Options.key("save_mode_create_template")
.stringType()
.defaultValue(
- "CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (\n"
- + "${rowtype_primary_key},\n"
- + "${rowtype_fields}\n"
+ "CREATE TABLE IF NOT EXISTS `"
+ +
SaveModePlaceHolder.DATABASE.getPlaceHolder()
+ + "`.`"
+ +
SaveModePlaceHolder.TABLE_NAME.getPlaceHolder()
+ + "` (\n"
+ +
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+ + ",\n"
+ +
SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
+ + "\n"
+ ") ENGINE=OLAP\n"
- + " PRIMARY KEY (${rowtype_primary_key})\n"
- + "DISTRIBUTED BY HASH
(${rowtype_primary_key})"
+ + " PRIMARY KEY ("
+ +
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+ + ")\n"
+ + "DISTRIBUTED BY HASH ("
+ +
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+ + ")"
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\" \n"
+ ")")
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
index 3b65d65a5b..0c2718d0b8 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
@@ -17,13 +17,15 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
-import org.apache.seatunnel.api.sink.SaveModeConstants;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sql.template.SqlTemplate;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.util.CreateTableParser;
import org.apache.commons.lang3.StringUtils;
@@ -55,14 +57,26 @@ public class StarRocksSaveModeUtil {
.map(r -> "`" + r.getColumnName() + "`")
.collect(Collectors.joining(","));
}
+ SqlTemplate.canHandledByTemplateWithPlaceholder(
+ template,
+ SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder(),
+ primaryKey,
+ TablePath.of(database, table).getFullName(),
+ StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
template =
template.replaceAll(
- String.format("\\$\\{%s\\}",
SaveModeConstants.ROWTYPE_PRIMARY_KEY),
+
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(),
primaryKey);
+ SqlTemplate.canHandledByTemplateWithPlaceholder(
+ template,
+ SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getPlaceHolder(),
+ uniqueKey,
+ TablePath.of(database, table).getFullName(),
+ StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+
template =
template.replaceAll(
- String.format("\\$\\{%s\\}",
SaveModeConstants.ROWTYPE_UNIQUE_KEY),
- uniqueKey);
+
SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), uniqueKey);
Map<String, CreateTableParser.ColumnInfo> columnInTemplate =
CreateTableParser.getColumnList(template);
template = mergeColumnInTemplate(columnInTemplate, tableSchema,
template);
@@ -72,12 +86,10 @@ public class StarRocksSaveModeUtil {
.filter(column ->
!columnInTemplate.containsKey(column.getName()))
.map(StarRocksSaveModeUtil::columnToStarrocksType)
.collect(Collectors.joining(",\n"));
- return template.replaceAll(
- String.format("\\$\\{%s\\}",
SaveModeConstants.DATABASE), database)
- .replaceAll(String.format("\\$\\{%s\\}",
SaveModeConstants.TABLE_NAME), table)
+ return
template.replaceAll(SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(),
database)
+
.replaceAll(SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(), table)
.replaceAll(
- String.format("\\$\\{%s\\}",
SaveModeConstants.ROWTYPE_FIELDS),
- rowTypeFields);
+
SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields);
}
private static String columnToStarrocksType(Column column) {
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
index 2e15ea3911..d7f759de2a 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
@@ -17,16 +17,25 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSaveModeUtil;
+import org.apache.commons.lang3.StringUtils;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -119,6 +128,41 @@ public class StarRocksCreateTableTest {
+ " \"dynamic_partition.prefix\" = \"p\"
\n"
+ ");",
result);
+
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("test", "test1", "test2"),
+ TableSchema.builder()
+ .primaryKey(
+ PrimaryKey.of(StringUtils.EMPTY,
Collections.emptyList()))
+ .constraintKey(Collections.emptyList())
+ .columns(columns)
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "");
+ TablePath tablePath = TablePath.of("test1.test2");
+ String createTemplate =
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.defaultValue();
+ RuntimeException actualSeaTunnelRuntimeException =
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () ->
+ StarRocksSaveModeUtil.getCreateTableSql(
+ createTemplate,
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ catalogTable.getTableSchema()));
+ String primaryKeyHolder =
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder();
+ SeaTunnelRuntimeException exceptSeaTunnelRuntimeException =
+ CommonError.sqlTemplateHandledError(
+ tablePath.getFullName(),
+ SaveModePlaceHolder.getDisplay(primaryKeyHolder),
+ createTemplate,
+ primaryKeyHolder,
+ StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+ Assertions.assertEquals(
+ exceptSeaTunnelRuntimeException.getMessage(),
+ actualSeaTunnelRuntimeException.getMessage());
}
@Test