This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b0d70ce224 [Improve] Add SaveMode log of process detail (#6375)
b0d70ce224 is described below
commit b0d70ce2240ffca4b8b8a3ee3cdc9cdb4b2ef23a
Author: Jia Fan <[email protected]>
AuthorDate: Wed Mar 20 12:08:28 2024 +0800
[Improve] Add SaveMode log of process detail (#6375)
---
.../seatunnel/api/sink/DefaultSaveModeHandler.java | 83 +++-
...odeHandler.java => SaveModeExecuteWrapper.java} | 22 +-
.../apache/seatunnel/api/sink/SaveModeHandler.java | 11 +
.../seatunnel/api/table/catalog/Catalog.java | 13 +
.../catalog/InfoPreviewResult.java} | 20 +-
.../catalog/PreviewResult.java} | 22 +-
.../catalog/SQLPreviewResult.java} | 21 +-
.../connectors/doris/catalog/DorisCatalog.java | 31 +-
.../connectors/doris/util/DorisCatalogUtil.java | 4 +
.../doris/catalog/PreviewActionTest.java | 119 ++++++
.../catalog/ElasticSearchCatalog.java | 21 +
.../elasticsearch/catalog/PreviewActionTest.java | 96 +++++
.../seatunnel/iceberg/catalog/IcebergCatalog.java | 28 +-
.../iceberg/catalog/PreviewActionTest.java | 111 ++++++
.../jdbc/catalog/AbstractJdbcCatalog.java | 21 +
.../jdbc/catalog/psql/PostgresCatalog.java | 7 +
.../seatunnel/jdbc/catalog/PreviewActionTest.java | 441 +++++++++++++++++++++
.../starrocks/catalog/StarRocksCatalog.java | 66 +--
.../starrocks/sink/StarRocksSaveModeUtil.java | 31 +-
.../starrocks/catalog/PreviewActionTest.java | 126 ++++++
.../catalog/StarRocksCreateTableTest.java | 6 +-
.../flink/execution/SinkExecuteProcessor.java | 3 +-
.../flink/execution/SinkExecuteProcessor.java | 3 +-
.../spark/execution/SinkExecuteProcessor.java | 3 +-
.../spark/execution/SinkExecuteProcessor.java | 3 +-
.../core/parse/MultipleTableJobConfigParser.java | 3 +-
26 files changed, 1238 insertions(+), 77 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
index 9566658979..bbbe99281b 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
@@ -23,19 +23,26 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Optional;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SINK_TABLE_NOT_EXIST;
import static
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SOURCE_ALREADY_HAS_DATA;
@AllArgsConstructor
+@Slf4j
public class DefaultSaveModeHandler implements SaveModeHandler {
- public SchemaSaveMode schemaSaveMode;
- public DataSaveMode dataSaveMode;
- public Catalog catalog;
- public TablePath tablePath;
- public CatalogTable catalogTable;
- public String customSql;
+ @Nonnull public SchemaSaveMode schemaSaveMode;
+ @Nonnull public DataSaveMode dataSaveMode;
+ @Nonnull public Catalog catalog;
+ @Nonnull public TablePath tablePath;
+ @Nullable public CatalogTable catalogTable;
+ @Nullable public String customSql;
public DefaultSaveModeHandler(
SchemaSaveMode schemaSaveMode,
@@ -132,17 +139,58 @@ public class DefaultSaveModeHandler implements
SaveModeHandler {
}
protected void dropTable() {
+ try {
+ log.info(
+ "Dropping table {} with action {}",
+ tablePath,
+ catalog.previewAction(
+ Catalog.ActionType.DROP_TABLE, tablePath,
Optional.empty()));
+ } catch (UnsupportedOperationException ignore) {
+ log.info("Dropping table {}", tablePath);
+ }
catalog.dropTable(tablePath, true);
}
protected void createTable() {
if (!catalog.databaseExists(tablePath.getDatabaseName())) {
- catalog.createDatabase(TablePath.of(tablePath.getDatabaseName(),
""), true);
+ TablePath databasePath = TablePath.of(tablePath.getDatabaseName(),
"");
+ try {
+ log.info(
+ "Creating database {} with action {}",
+ tablePath.getDatabaseName(),
+ catalog.previewAction(
+ Catalog.ActionType.CREATE_DATABASE,
+ databasePath,
+ Optional.empty()));
+ } catch (UnsupportedOperationException ignore) {
+ log.info("Creating database {}", tablePath.getDatabaseName());
+ }
+ catalog.createDatabase(databasePath, true);
+ }
+ try {
+ log.info(
+ "Creating table {} with action {}",
+ tablePath,
+ catalog.previewAction(
+ Catalog.ActionType.CREATE_TABLE,
+ tablePath,
+ Optional.ofNullable(catalogTable)));
+ } catch (UnsupportedOperationException ignore) {
+ log.info("Creating table {}", tablePath);
}
catalog.createTable(tablePath, catalogTable, true);
}
protected void truncateTable() {
+ try {
+ log.info(
+ "Truncating table {} with action {}",
+ tablePath,
+ catalog.previewAction(
+ Catalog.ActionType.TRUNCATE_TABLE, tablePath,
Optional.empty()));
+ } catch (UnsupportedOperationException ignore) {
+ log.info("Truncating table {}", tablePath);
+ }
catalog.truncateTable(tablePath, true);
}
@@ -151,9 +199,30 @@ public class DefaultSaveModeHandler implements
SaveModeHandler {
}
protected void executeCustomSql() {
+ log.info("Executing custom SQL for table {} with SQL: {}", tablePath,
customSql);
catalog.executeSql(tablePath, customSql);
}
+ @Override
+ public TablePath getHandleTablePath() {
+ return tablePath;
+ }
+
+ @Override
+ public Catalog getHandleCatalog() {
+ return catalog;
+ }
+
+ @Override
+ public SchemaSaveMode getSchemaSaveMode() {
+ return schemaSaveMode;
+ }
+
+ @Override
+ public DataSaveMode getDataSaveMode() {
+ return dataSaveMode;
+ }
+
@Override
public void close() throws Exception {
catalog.close();
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteWrapper.java
similarity index 58%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteWrapper.java
index 6fe3d8b3c2..5da173a9a9 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteWrapper.java
@@ -17,14 +17,24 @@
package org.apache.seatunnel.api.sink;
-public interface SaveModeHandler extends AutoCloseable {
+import lombok.extern.slf4j.Slf4j;
- void handleSchemaSaveMode();
+@Slf4j
+public class SaveModeExecuteWrapper {
- void handleDataSaveMode();
+ public SaveModeExecuteWrapper(SaveModeHandler handler) {
+ this.handler = handler;
+ }
- default void handleSaveMode() {
- handleSchemaSaveMode();
- handleDataSaveMode();
+ public void execute() {
+ log.info(
+ "Executing save mode for table: {}, with SchemaSaveMode: {},
DataSaveMode: {} using Catalog: {}",
+ handler.getHandleTablePath(),
+ handler.getSchemaSaveMode(),
+ handler.getDataSaveMode(),
+ handler.getHandleCatalog().name());
+ handler.handleSaveMode();
}
+
+ private final SaveModeHandler handler;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
index 6fe3d8b3c2..e75c2215dd 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
@@ -17,12 +17,23 @@
package org.apache.seatunnel.api.sink;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
public interface SaveModeHandler extends AutoCloseable {
void handleSchemaSaveMode();
void handleDataSaveMode();
+ SchemaSaveMode getSchemaSaveMode();
+
+ DataSaveMode getDataSaveMode();
+
+ TablePath getHandleTablePath();
+
+ Catalog getHandleCatalog();
+
default void handleSaveMode() {
handleSchemaSaveMode();
handleDataSaveMode();
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index 560fa98d3b..842ec98782 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -262,6 +262,19 @@ public interface Catalog extends AutoCloseable {
default void executeSql(TablePath tablePath, String sql) {}
+ default PreviewResult previewAction(
+ ActionType actionType, TablePath tablePath, Optional<CatalogTable>
catalogTable) {
+ throw new UnsupportedOperationException("Preview action is not
supported");
+ }
+
+ enum ActionType {
+ CREATE_TABLE,
+ CREATE_DATABASE,
+ DROP_TABLE,
+ DROP_DATABASE,
+ TRUNCATE_TABLE
+ }
+
// todo: Support for update table metadata
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/InfoPreviewResult.java
similarity index 69%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/InfoPreviewResult.java
index 6fe3d8b3c2..9c6e06fce9 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/InfoPreviewResult.java
@@ -15,16 +15,22 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.api.table.catalog;
-public interface SaveModeHandler extends AutoCloseable {
+public class InfoPreviewResult extends PreviewResult {
+ private final String info;
- void handleSchemaSaveMode();
+ public String getInfo() {
+ return info;
+ }
- void handleDataSaveMode();
+ public InfoPreviewResult(String info) {
+ super(Type.INFO);
+ this.info = info;
+ }
- default void handleSaveMode() {
- handleSchemaSaveMode();
- handleDataSaveMode();
+ @Override
+ public String toString() {
+ return info;
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PreviewResult.java
similarity index 68%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PreviewResult.java
index 6fe3d8b3c2..ad4b3cc102 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PreviewResult.java
@@ -15,16 +15,24 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.api.table.catalog;
-public interface SaveModeHandler extends AutoCloseable {
+/** The result of a SQL preview action in {@link Catalog#previewAction}. */
+public abstract class PreviewResult {
- void handleSchemaSaveMode();
+ private final Type type;
- void handleDataSaveMode();
+ public PreviewResult(Type type) {
+ this.type = type;
+ }
+
+ public Type getType() {
+ return type;
+ }
- default void handleSaveMode() {
- handleSchemaSaveMode();
- handleDataSaveMode();
+ public enum Type {
+ SQL,
+ INFO,
+ OTHER
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SQLPreviewResult.java
similarity index 69%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SQLPreviewResult.java
index 6fe3d8b3c2..6cdcc33fc2 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SQLPreviewResult.java
@@ -15,16 +15,23 @@
* limitations under the License.
*/
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.api.table.catalog;
-public interface SaveModeHandler extends AutoCloseable {
+public class SQLPreviewResult extends PreviewResult {
- void handleSchemaSaveMode();
+ private final String sql;
- void handleDataSaveMode();
+ public String getSql() {
+ return sql;
+ }
+
+ public SQLPreviewResult(String sql) {
+ super(Type.SQL);
+ this.sql = sql;
+ }
- default void handleSaveMode() {
- handleSchemaSaveMode();
- handleDataSaveMode();
+ @Override
+ public String toString() {
+ return sql;
}
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
index 0e5faef550..1816585731 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
@@ -20,7 +20,9 @@ package org.apache.seatunnel.connectors.doris.catalog;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -49,6 +51,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkArgument;
public class DorisCatalog implements Catalog {
@@ -339,8 +344,7 @@ public class DorisCatalog implements Catalog {
throws TableNotExistException, CatalogException {
try {
if (ignoreIfNotExists) {
- conn.createStatement()
- .execute(String.format("TRUNCATE TABLE %s",
tablePath.getFullName()));
+
conn.createStatement().execute(DorisCatalogUtil.getTruncateTableQuery(tablePath));
}
} catch (Exception e) {
throw new CatalogException(
@@ -359,4 +363,27 @@ public class DorisCatalog implements Catalog {
throw new CatalogException(String.format("Failed executeSql error
%s", sql), e);
}
}
+
+ @Override
+ public PreviewResult previewAction(
+ ActionType actionType, TablePath tablePath, Optional<CatalogTable>
catalogTable) {
+ if (actionType == ActionType.CREATE_TABLE) {
+ checkArgument(catalogTable.isPresent(), "CatalogTable cannot be
null");
+ return new SQLPreviewResult(
+ DorisCatalogUtil.getCreateTableStatement(
+ dorisConfig.getCreateTableTemplate(), tablePath,
catalogTable.get()));
+ } else if (actionType == ActionType.DROP_TABLE) {
+ return new
SQLPreviewResult(DorisCatalogUtil.getDropTableQuery(tablePath, true));
+ } else if (actionType == ActionType.TRUNCATE_TABLE) {
+ return new
SQLPreviewResult(DorisCatalogUtil.getTruncateTableQuery(tablePath));
+ } else if (actionType == ActionType.CREATE_DATABASE) {
+ return new SQLPreviewResult(
+
DorisCatalogUtil.getCreateDatabaseQuery(tablePath.getDatabaseName(), true));
+ } else if (actionType == ActionType.DROP_DATABASE) {
+ return new SQLPreviewResult(
+
DorisCatalogUtil.getDropDatabaseQuery(tablePath.getDatabaseName(), true));
+ } else {
+ throw new UnsupportedOperationException("Unsupported action type:
" + actionType);
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
index 133bcffd33..f03d488de9 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
@@ -95,6 +95,10 @@ public class DorisCatalogUtil {
return "DROP TABLE " + (ignoreIfNotExists ? "IF EXISTS " : "") +
tablePath.getFullName();
}
+ public static String getTruncateTableQuery(TablePath tablePath) {
+ return "TRUNCATE TABLE " + tablePath.getFullName();
+ }
+
/**
* @param createTableTemplate create table template
* @param catalogTable catalog table
diff --git
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
new file mode 100644
index 0000000000..ffca59c5e1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class PreviewActionTest {
+
+ private static final CatalogTable CATALOG_TABLE =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", "database", "table"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "test",
+ BasicType.STRING_TYPE,
+ (Long) null,
+ true,
+ null,
+ ""))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "comment");
+
+ @Test
+ public void testDorisPreviewAction() {
+ DorisCatalogFactory factory = new DorisCatalogFactory();
+ Catalog catalog =
+ factory.createCatalog(
+ "test",
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put("fenodes", "localhost:9300");
+ put("username", "root");
+ put("password", "root");
+ }
+ }));
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_DATABASE,
+ "CREATE DATABASE IF NOT EXISTS testddatabase",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_DATABASE,
+ "DROP DATABASE IF EXISTS testddatabase",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.TRUNCATE_TABLE,
+ "TRUNCATE TABLE testddatabase.testtable",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_TABLE,
+ "DROP TABLE IF EXISTS testddatabase.testtable",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_TABLE,
+ "CREATE TABLE IF NOT EXISTS `testddatabase`.`testtable` (\n"
+ + "`test` STRING NULL \n"
+ + ") ENGINE=OLAP\n"
+ + " UNIQUE KEY ()\n"
+ + "DISTRIBUTED BY HASH ()\n"
+ + " PROPERTIES (\n"
+ + "\"replication_allocation\" =
\"tag.location.default: 1\",\n"
+ + "\"in_memory\" = \"false\",\n"
+ + "\"storage_format\" = \"V2\",\n"
+ + "\"disable_auto_compaction\" = \"false\"\n"
+ + ")",
+ Optional.of(CATALOG_TABLE));
+ }
+
+ private void assertPreviewResult(
+ Catalog catalog,
+ Catalog.ActionType actionType,
+ String expectedSql,
+ Optional<CatalogTable> catalogTable) {
+ PreviewResult previewResult =
+ catalog.previewAction(
+ actionType, TablePath.of("testddatabase.testtable"),
catalogTable);
+ Assertions.assertInstanceOf(SQLPreviewResult.class, previewResult);
+ Assertions.assertEquals(expectedSql, ((SQLPreviewResult)
previewResult).getSql());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 23265fa182..066a69c2dc 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -21,7 +21,9 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigUtil;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -44,6 +46,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -232,4 +235,22 @@ public class ElasticSearchCatalog implements Catalog {
options.put("config", ConfigUtil.convertToJsonString(tablePath));
return options;
}
+
+ @Override
+ public PreviewResult previewAction(
+ ActionType actionType, TablePath tablePath, Optional<CatalogTable>
catalogTable) {
+ if (actionType == ActionType.CREATE_TABLE) {
+ return new InfoPreviewResult("create index " +
tablePath.getTableName());
+ } else if (actionType == ActionType.DROP_TABLE) {
+ return new InfoPreviewResult("delete index " +
tablePath.getTableName());
+ } else if (actionType == ActionType.TRUNCATE_TABLE) {
+ return new InfoPreviewResult("delete and create index " +
tablePath.getTableName());
+ } else if (actionType == ActionType.CREATE_DATABASE) {
+ return new InfoPreviewResult("create index " +
tablePath.getTableName());
+ } else if (actionType == ActionType.DROP_DATABASE) {
+ return new InfoPreviewResult("delete index " +
tablePath.getTableName());
+ } else {
+ throw new UnsupportedOperationException("Unsupported action type:
" + actionType);
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/PreviewActionTest.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/PreviewActionTest.java
new file mode 100644
index 0000000000..a81ce8f19a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/PreviewActionTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class PreviewActionTest {
+
+ private static final CatalogTable CATALOG_TABLE =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", "database", "table"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "test",
+ BasicType.STRING_TYPE,
+ (Long) null,
+ true,
+ null,
+ ""))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "comment");
+
+ @Test
+ public void testElasticSearchPreviewAction() {
+ ElasticSearchCatalogFactory factory = new
ElasticSearchCatalogFactory();
+ Catalog catalog = factory.createCatalog("test",
ReadonlyConfig.fromMap(new HashMap<>()));
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_DATABASE,
+ "create index testtable",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_DATABASE,
+ "delete index testtable",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.TRUNCATE_TABLE,
+ "delete and create index testtable",
+ Optional.empty());
+ assertPreviewResult(
+ catalog, Catalog.ActionType.DROP_TABLE, "delete index
testtable", Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_TABLE,
+ "create index testtable",
+ Optional.of(CATALOG_TABLE));
+ }
+
+ private void assertPreviewResult(
+ Catalog catalog,
+ Catalog.ActionType actionType,
+ String expectedSql,
+ Optional<CatalogTable> catalogTable) {
+ PreviewResult previewResult =
+ catalog.previewAction(
+ actionType, TablePath.of("testddatabase.testtable"),
catalogTable);
+ Assertions.assertInstanceOf(InfoPreviewResult.class, previewResult);
+ Assertions.assertEquals(expectedSql, ((InfoPreviewResult)
previewResult).getInfo());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
index 32beff121b..520f9bdbac 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
@@ -20,7 +20,9 @@ package
org.apache.seatunnel.connectors.seatunnel.iceberg.catalog;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
@@ -29,13 +31,11 @@ import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistExceptio
import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogLoader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;
import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
@@ -49,8 +49,10 @@ import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils.toIcebergTableIdentifier;
import static
org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils.toTablePath;
@@ -254,10 +256,22 @@ public class IcebergCatalog implements Catalog {
catalogName);
}
- public Schema toIcebergSchema(TableSchema tableSchema) {
- // Generate struct type
- SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
- Types.StructType structType =
SchemaUtils.toIcebergType(rowType).asStructType();
- return new Schema(structType.fields());
+ @Override
+ public PreviewResult previewAction(
+ ActionType actionType, TablePath tablePath, Optional<CatalogTable>
catalogTable) {
+ if (actionType == ActionType.CREATE_TABLE) {
+ checkArgument(catalogTable.isPresent(), "CatalogTable cannot be
null");
+ return new InfoPreviewResult("create table " +
toIcebergTableIdentifier(tablePath));
+ } else if (actionType == ActionType.DROP_TABLE) {
+ return new InfoPreviewResult("drop table " +
toIcebergTableIdentifier(tablePath));
+ } else if (actionType == ActionType.TRUNCATE_TABLE) {
+ return new InfoPreviewResult("truncate table " +
toIcebergTableIdentifier(tablePath));
+ } else if (actionType == ActionType.CREATE_DATABASE) {
+ return new InfoPreviewResult("do nothing");
+ } else if (actionType == ActionType.DROP_DATABASE) {
+ return new InfoPreviewResult("do nothing");
+ } else {
+ throw new UnsupportedOperationException("Unsupported action type:
" + actionType);
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/PreviewActionTest.java
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/PreviewActionTest.java
new file mode 100644
index 0000000000..ca88c39a0c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/PreviewActionTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iceberg.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class PreviewActionTest {
+
+ private static final CatalogTable CATALOG_TABLE =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", "database", "table"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "test",
+ BasicType.STRING_TYPE,
+ (Long) null,
+ true,
+ null,
+ ""))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "comment");
+
+ @Test
+ public void testElasticSearchPreviewAction() {
+ IcebergCatalogFactory factory = new IcebergCatalogFactory();
+ Catalog catalog =
+ factory.createCatalog(
+ "test",
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put("catalog_name", "seatunnel_test");
+ put(
+ "iceberg.catalog.config",
+ new HashMap<String, Object>() {
+ {
+ put("type", "hadoop");
+ put(
+ "warehouse",
+
"file:///tmp/seatunnel/iceberg/hadoop-sink/");
+ }
+ });
+ }
+ }));
+ assertPreviewResult(
+ catalog, Catalog.ActionType.CREATE_DATABASE, "do nothing",
Optional.empty());
+ assertPreviewResult(
+ catalog, Catalog.ActionType.DROP_DATABASE, "do nothing",
Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.TRUNCATE_TABLE,
+ "truncate table testddatabase.testtable",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_TABLE,
+ "drop table testddatabase.testtable",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_TABLE,
+ "create table testddatabase.testtable",
+ Optional.of(CATALOG_TABLE));
+ }
+
+ private void assertPreviewResult(
+ Catalog catalog,
+ Catalog.ActionType actionType,
+ String expectedSql,
+ Optional<CatalogTable> catalogTable) {
+ PreviewResult previewResult =
+ catalog.previewAction(
+ actionType, TablePath.of("testddatabase.testtable"),
catalogTable);
+ Assertions.assertInstanceOf(InfoPreviewResult.class, previewResult);
+ Assertions.assertEquals(expectedSql, ((InfoPreviewResult)
previewResult).getInfo());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index f695cc30cb..3775f9f47d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -22,7 +22,9 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -577,4 +579,23 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
throw new CatalogException(String.format("Failed executeSql error
%s", sql), e);
}
}
+
+ @Override
+ public PreviewResult previewAction(
+ ActionType actionType, TablePath tablePath, Optional<CatalogTable>
catalogTable) {
+ if (actionType == ActionType.CREATE_TABLE) {
+ checkArgument(catalogTable.isPresent(), "CatalogTable cannot be
null");
+ return new SQLPreviewResult(getCreateTableSql(tablePath,
catalogTable.get()));
+ } else if (actionType == ActionType.DROP_TABLE) {
+ return new SQLPreviewResult(getDropTableSql(tablePath));
+ } else if (actionType == ActionType.TRUNCATE_TABLE) {
+ return new SQLPreviewResult(getTruncateTableSql(tablePath));
+ } else if (actionType == ActionType.CREATE_DATABASE) {
+ return new
SQLPreviewResult(getCreateDatabaseSql(tablePath.getDatabaseName()));
+ } else if (actionType == ActionType.DROP_DATABASE) {
+ return new
SQLPreviewResult(getDropDatabaseSql(tablePath.getDatabaseName()));
+ } else {
+ throw new UnsupportedOperationException("Unsupported action type:
" + actionType);
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
index 30e140c68a..4697d1999e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
@@ -186,6 +186,13 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
}
}
+ @Override
+ protected String getCreateTableSql(TablePath tablePath, CatalogTable
table) {
+ PostgresCreateTableSqlBuilder postgresCreateTableSqlBuilder =
+ new PostgresCreateTableSqlBuilder(table);
+ return postgresCreateTableSqlBuilder.build(tablePath);
+ }
+
@Override
protected String getDropTableSql(TablePath tablePath) {
return "DROP TABLE \""
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
new file mode 100644
index 0000000000..a0cdf7d8a8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm.DamengCatalogFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalogFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase.OceanBaseCatalogFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalogFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalogFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerCatalogFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class PreviewActionTest {
+
+ private static final CatalogTable CATALOG_TABLE =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", "database", "table"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "test",
+ BasicType.STRING_TYPE,
+ (Long) null,
+ true,
+ null,
+ ""))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "comment");
+
+ @Test
+ public void testMySQLPreviewAction() {
+ MySqlCatalogFactory factory = new MySqlCatalogFactory();
+ Catalog catalog =
+ factory.createCatalog(
+ "test",
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put("base-url",
"jdbc:mysql://localhost:3306/test");
+ put("username", "root");
+ put("password", "root");
+ }
+ }));
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_DATABASE,
+ "CREATE DATABASE `testddatabase`;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_DATABASE,
+ "DROP DATABASE `testddatabase`;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.TRUNCATE_TABLE,
+ "TRUNCATE TABLE `testddatabase`.`testtable`;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_TABLE,
+ "DROP TABLE `testddatabase`.`testtable`;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_TABLE,
+ "CREATE TABLE `testtable` (\n"
+ + "\t`test` LONGTEXT NULL COMMENT ''\n"
+ + ") COMMENT = 'comment';",
+ Optional.of(CATALOG_TABLE));
+ }
+
+ @Test
+ public void testDMPreviewAction() {
+ DamengCatalogFactory factory = new DamengCatalogFactory();
+ Catalog catalog =
+ factory.createCatalog(
+ "test",
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put("base-url",
"jdbc:mysql://localhost:3306/test");
+ put("username", "root");
+ put("password", "root");
+ }
+ }));
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_DATABASE,
+ "CREATE DATABASE `testddatabase`;",
+ Optional.empty()));
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_DATABASE,
+ "DROP DATABASE `testddatabase`;",
+ Optional.empty()));
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.TRUNCATE_TABLE,
+ "TRUNCATE TABLE `testddatabase`.`testtable`;",
+ Optional.empty()));
+ assertPreviewResult(
+ catalog, Catalog.ActionType.DROP_TABLE, "DROP TABLE
TESTTABLE", Optional.empty());
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_TABLE,
+ "CREATE TABLE `testtable` (\n"
+ + "\t`test` LONGTEXT NULL COMMENT ''\n"
+ + ") COMMENT = 'comment';",
+ Optional.of(CATALOG_TABLE)));
+ }
+
+ @Test
+ public void testOceanBasePreviewAction() {
+ OceanBaseCatalogFactory factory = new OceanBaseCatalogFactory();
+ Catalog catalog =
+ factory.createCatalog(
+ "test",
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put("base-url",
"jdbc:mysql://localhost:3306/test");
+ put("compatibleMode", "oracle");
+ put("username", "root");
+ put("password", "root");
+ }
+ }));
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_DATABASE,
+ "CREATE DATABASE `testddatabase`;",
+ Optional.empty()));
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_DATABASE,
+ "DROP DATABASE `testddatabase`;",
+ Optional.empty()));
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.TRUNCATE_TABLE,
+ "TRUNCATE TABLE \"null\".\"testtable\"",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_TABLE,
+ "DROP TABLE \"testtable\"",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_TABLE,
+ "CREATE TABLE \"testtable\" (\n" + "\"test\" VARCHAR2(4000)\n"
+ ")",
+ Optional.of(CATALOG_TABLE));
+
+ Catalog catalog2 =
+ factory.createCatalog(
+ "test",
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put("base-url",
"jdbc:mysql://localhost:3306/test");
+ put("compatibleMode", "mysql");
+ put("username", "root");
+ put("password", "root");
+ }
+ }));
+ assertPreviewResult(
+ catalog2,
+ Catalog.ActionType.CREATE_DATABASE,
+ "CREATE DATABASE `testddatabase`;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog2,
+ Catalog.ActionType.DROP_DATABASE,
+ "DROP DATABASE `testddatabase`;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog2,
+ Catalog.ActionType.TRUNCATE_TABLE,
+ "TRUNCATE TABLE `testddatabase`.`testtable`;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog2,
+ Catalog.ActionType.DROP_TABLE,
+ "DROP TABLE `testddatabase`.`testtable`;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog2,
+ Catalog.ActionType.CREATE_TABLE,
+ "CREATE TABLE `testtable` (\n"
+ + "\t`test` LONGTEXT NULL COMMENT ''\n"
+ + ") COMMENT = 'comment';",
+ Optional.of(CATALOG_TABLE));
+ }
+
+ @Test
+ public void testOraclePreviewAction() {
+ OracleCatalogFactory factory = new OracleCatalogFactory();
+ Catalog catalog =
+ factory.createCatalog(
+ "test",
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put("base-url",
"jdbc:mysql://localhost:3306/test");
+ put("username", "root");
+ put("password", "root");
+ }
+ }));
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_DATABASE,
+ "CREATE DATABASE `testddatabase`;",
+ Optional.empty()));
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_DATABASE,
+ "DROP DATABASE `testddatabase`;",
+ Optional.empty()));
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.TRUNCATE_TABLE,
+ "TRUNCATE TABLE \"null\".\"testtable\"",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_TABLE,
+ "DROP TABLE \"testtable\"",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_TABLE,
+ "CREATE TABLE \"testtable\" (\n" + "\"test\" VARCHAR2(4000)\n"
+ ")",
+ Optional.of(CATALOG_TABLE));
+ }
+
+ @Test
+ public void testPostgresPreviewAction() {
+ PostgresCatalogFactory factory = new PostgresCatalogFactory();
+ Catalog catalog =
+ factory.createCatalog(
+ "test",
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put("base-url",
"jdbc:mysql://localhost:3306/test");
+ put("username", "root");
+ put("password", "root");
+ }
+ }));
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_DATABASE,
+ "CREATE DATABASE \"testddatabase\"",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_DATABASE,
+ "DROP DATABASE \"testddatabase\"",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.TRUNCATE_TABLE,
+ "TRUNCATE TABLE \"null\".\"testtable\"",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_TABLE,
+ "DROP TABLE \"null\".\"testtable\"",
+ Optional.empty());
+
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_TABLE,
+ "CREATE TABLE \"testtable\" (\n" + "\"test\" text\n" + ");",
+ Optional.of(CATALOG_TABLE));
+ }
+
+ @Test
+ public void testSqlServerPreviewAction() {
+ SqlServerCatalogFactory factory = new SqlServerCatalogFactory();
+ Catalog catalog =
+ factory.createCatalog(
+ "test",
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+ "base-url",
+
"jdbc:sqlserver://localhost:1433;databaseName=column_type_test");
+ put("username", "root");
+ put("password", "root");
+ }
+ }));
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_DATABASE,
+ "CREATE DATABASE testddatabase",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_DATABASE,
+ "DROP DATABASE testddatabase;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.TRUNCATE_TABLE,
+ "TRUNCATE TABLE [testddatabase].[testtable]",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_TABLE,
+ "DROP TABLE testddatabase.testtable",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_TABLE,
+ "IF OBJECT_ID('[testddatabase].[testtable]', 'U') IS NULL \n"
+ + "BEGIN \n"
+ + "CREATE TABLE [testddatabase].[testtable] ( \n"
+ + "\t[test] TEXT NULL\n"
+ + ");\n"
+ + "EXEC testddatabase.sys.sp_addextendedproperty
'MS_Description', N'comment', 'schema', N'null', 'table', N'testtable';\n"
+ + "EXEC testddatabase.sys.sp_addextendedproperty
'MS_Description', N'', 'schema', N'null', 'table', N'testtable', 'column',
N'test';\n"
+ + "\n"
+ + "END",
+ Optional.of(CATALOG_TABLE));
+ }
+
+ @Test
+ public void testTiDBPreviewAction() {
+ TiDBCatalogFactory factory = new TiDBCatalogFactory();
+ Catalog catalog =
+ factory.createCatalog(
+ "test",
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put("base-url",
"jdbc:mysql://localhost:3306/test");
+ put("username", "root");
+ put("password", "root");
+ }
+ }));
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_DATABASE,
+ "CREATE DATABASE `testddatabase`;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_DATABASE,
+ "DROP DATABASE `testddatabase`;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.TRUNCATE_TABLE,
+ "TRUNCATE TABLE `testddatabase`.`testtable`;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_TABLE,
+ "DROP TABLE `testddatabase`.`testtable`;",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_TABLE,
+ "CREATE TABLE `testtable` (\n"
+ + "\t`test` LONGTEXT NULL COMMENT ''\n"
+ + ") COMMENT = 'comment';",
+ Optional.of(CATALOG_TABLE));
+ }
+
+ private void assertPreviewResult(
+ Catalog catalog,
+ Catalog.ActionType actionType,
+ String expectedSql,
+ Optional<CatalogTable> catalogTable) {
+ PreviewResult previewResult =
+ catalog.previewAction(
+ actionType, TablePath.of("testddatabase.testtable"),
catalogTable);
+ Assertions.assertInstanceOf(SQLPreviewResult.class, previewResult);
+ Assertions.assertEquals(expectedSql, ((SQLPreviewResult)
previewResult).getSql());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 3dc7eebfa6..8a14b08efe 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -20,7 +20,9 @@ package
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -44,6 +46,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.mysql.cj.MysqlType;
import lombok.extern.slf4j.Slf4j;
@@ -214,7 +217,7 @@ public class StarRocksCatalog implements Catalog {
public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
this.createTable(
- StarRocksSaveModeUtil.fillingCreateSql(
+ StarRocksSaveModeUtil.getCreateTableSql(
template,
tablePath.getDatabaseName(),
tablePath.getTableName(),
@@ -225,12 +228,8 @@ public class StarRocksCatalog implements Catalog {
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
- if (ignoreIfNotExists) {
- conn.createStatement().execute("DROP TABLE IF EXISTS " +
tablePath.getFullName());
- } else {
- conn.createStatement()
- .execute(String.format("DROP TABLE %s",
tablePath.getFullName()));
- }
+ conn.createStatement()
+ .execute(StarRocksSaveModeUtil.getDropTableSql(tablePath,
ignoreIfNotExists));
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s",
catalogName), e);
@@ -242,7 +241,7 @@ public class StarRocksCatalog implements Catalog {
try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
if (ignoreIfNotExists) {
conn.createStatement()
- .execute(String.format("TRUNCATE TABLE %s",
tablePath.getFullName()));
+
.execute(StarRocksSaveModeUtil.getTruncateTableSql(tablePath));
}
} catch (Exception e) {
throw new CatalogException(
@@ -277,16 +276,10 @@ public class StarRocksCatalog implements Catalog {
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
- if (ignoreIfExists) {
- conn.createStatement()
- .execute(
- "CREATE DATABASE IF NOT EXISTS `"
- + tablePath.getDatabaseName()
- + "`");
- } else {
- conn.createStatement()
- .execute("CREATE DATABASE `" +
tablePath.getDatabaseName() + "`");
- }
+ conn.createStatement()
+ .execute(
+ StarRocksSaveModeUtil.getCreateDatabaseSql(
+ tablePath.getDatabaseName(),
ignoreIfExists));
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s",
catalogName), e);
@@ -297,13 +290,10 @@ public class StarRocksCatalog implements Catalog {
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
- if (ignoreIfNotExists) {
- conn.createStatement()
- .execute("DROP DATABASE IF EXISTS `" +
tablePath.getDatabaseName() + "`");
- } else {
- conn.createStatement()
- .execute(String.format("DROP DATABASE `%s`",
tablePath.getDatabaseName()));
- }
+ conn.createStatement()
+ .execute(
+ StarRocksSaveModeUtil.getDropDatabaseSql(
+ tablePath.getDatabaseName(),
ignoreIfNotExists));
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s",
catalogName), e);
@@ -501,4 +491,30 @@ public class StarRocksCatalog implements Catalog {
return false;
}
}
+
+ @Override
+ public PreviewResult previewAction(
+ ActionType actionType, TablePath tablePath, Optional<CatalogTable>
catalogTable) {
+ if (actionType == ActionType.CREATE_TABLE) {
+ Preconditions.checkArgument(catalogTable.isPresent(),
"CatalogTable cannot be null");
+ return new SQLPreviewResult(
+ StarRocksSaveModeUtil.getCreateTableSql(
+ template,
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ catalogTable.get().getTableSchema()));
+ } else if (actionType == ActionType.DROP_TABLE) {
+ return new
SQLPreviewResult(StarRocksSaveModeUtil.getDropTableSql(tablePath, true));
+ } else if (actionType == ActionType.TRUNCATE_TABLE) {
+ return new
SQLPreviewResult(StarRocksSaveModeUtil.getTruncateTableSql(tablePath));
+ } else if (actionType == ActionType.CREATE_DATABASE) {
+ return new SQLPreviewResult(
+
StarRocksSaveModeUtil.getCreateDatabaseSql(tablePath.getDatabaseName(), true));
+ } else if (actionType == ActionType.DROP_DATABASE) {
+ return new SQLPreviewResult(
+ "DROP DATABASE IF EXISTS `" + tablePath.getDatabaseName()
+ "`");
+ } else {
+ throw new UnsupportedOperationException("Unsupported action type:
" + actionType);
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
index 0ca7774a7c..3b65d65a5b 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
import org.apache.seatunnel.api.sink.SaveModeConstants;
import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
@@ -37,7 +38,7 @@ import static
com.google.common.base.Preconditions.checkNotNull;
public class StarRocksSaveModeUtil {
- public static String fillingCreateSql(
+ public static String getCreateTableSql(
String template, String database, String table, TableSchema
tableSchema) {
String primaryKey = "";
if (tableSchema.getPrimaryKey() != null) {
@@ -178,4 +179,32 @@ public class StarRocksSaveModeUtil {
}
throw new IllegalArgumentException("Unsupported SeaTunnel's data type:
" + dataType);
}
+
+ public static String getCreateDatabaseSql(String database, boolean
ignoreIfExists) {
+ if (ignoreIfExists) {
+ return "CREATE DATABASE IF NOT EXISTS `" + database + "`";
+ } else {
+ return "CREATE DATABASE `" + database + "`";
+ }
+ }
+
+ public static String getDropDatabaseSql(String database, boolean
ignoreIfNotExists) {
+ if (ignoreIfNotExists) {
+ return "DROP DATABASE IF EXISTS `" + database + "`";
+ } else {
+ return "DROP DATABASE `" + database + "`";
+ }
+ }
+
+ public static String getDropTableSql(TablePath tablePath, boolean
ignoreIfNotExists) {
+ if (ignoreIfNotExists) {
+ return "DROP TABLE IF EXISTS " + tablePath.getFullName();
+ } else {
+ return "DROP TABLE " + tablePath.getFullName();
+ }
+ }
+
+ public static String getTruncateTableSql(TablePath tablePath) {
+ return "TRUNCATE TABLE " + tablePath.getFullName();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/PreviewActionTest.java
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/PreviewActionTest.java
new file mode 100644
index 0000000000..37c06345fd
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/PreviewActionTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class PreviewActionTest {
+
+ private static final CatalogTable CATALOG_TABLE =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", "database", "table"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "test",
+ BasicType.STRING_TYPE,
+ (Long) null,
+ true,
+ null,
+ ""))
+ .column(
+ PhysicalColumn.of(
+ "test2",
+ BasicType.STRING_TYPE,
+ (Long) null,
+ true,
+ null,
+ ""))
+ .primaryKey(PrimaryKey.of("test",
Collections.singletonList("test")))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "comment");
+
+ @Test
+ public void testDorisPreviewAction() {
+ StarRocksCatalogFactory factory = new StarRocksCatalogFactory();
+ Catalog catalog =
+ factory.createCatalog(
+ "test",
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put("base-url",
"jdbc:mysql://localhost:9030");
+ put("username", "root");
+ put("password", "root");
+ }
+ }));
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_DATABASE,
+ "CREATE DATABASE IF NOT EXISTS `testddatabase`",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_DATABASE,
+ "DROP DATABASE IF EXISTS `testddatabase`",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.TRUNCATE_TABLE,
+ "TRUNCATE TABLE testddatabase.testtable",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_TABLE,
+ "DROP TABLE IF EXISTS testddatabase.testtable",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_TABLE,
+ "CREATE TABLE IF NOT EXISTS `testddatabase`.`testtable` (\n"
+ + "`test` STRING NULL ,\n"
+ + "`test2` STRING NULL \n"
+ + ") ENGINE=OLAP\n"
+ + " PRIMARY KEY (`test`)\n"
+ + "DISTRIBUTED BY HASH (`test`)PROPERTIES (\n"
+ + " \"replication_num\" = \"1\" \n"
+ + ")",
+ Optional.of(CATALOG_TABLE));
+ }
+
+ private void assertPreviewResult(
+ Catalog catalog,
+ Catalog.ActionType actionType,
+ String expectedSql,
+ Optional<CatalogTable> catalogTable) {
+ PreviewResult previewResult =
+ catalog.previewAction(
+ actionType, TablePath.of("testddatabase.testtable"),
catalogTable);
+ Assertions.assertInstanceOf(SQLPreviewResult.class, previewResult);
+ Assertions.assertEquals(expectedSql, ((SQLPreviewResult)
previewResult).getSql());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
index e3471b6bf2..0a3f36196a 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
@@ -54,7 +54,7 @@ public class StarRocksCreateTableTest {
PhysicalColumn.of("create_time", BasicType.LONG_TYPE, (Long)
null, true, null, ""));
String result =
- StarRocksSaveModeUtil.fillingCreateSql(
+ StarRocksSaveModeUtil.getCreateTableSql(
"CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (
\n"
+ "${rowtype_primary_key} , \n"
+ "${rowtype_unique_key} , \n"
@@ -187,7 +187,7 @@ public class StarRocksCreateTableTest {
"L_COMMENT", BasicType.STRING_TYPE, (Long) null,
false, null, ""));
String result =
- StarRocksSaveModeUtil.fillingCreateSql(
+ StarRocksSaveModeUtil.getCreateTableSql(
"CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (\n"
+ "`L_COMMITDATE`,\n"
+ "${rowtype_primary_key},\n"
@@ -244,7 +244,7 @@ public class StarRocksCreateTableTest {
columns.add(PhysicalColumn.of("description", BasicType.STRING_TYPE,
70000, true, null, ""));
String result =
- StarRocksSaveModeUtil.fillingCreateSql(
+ StarRocksSaveModeUtil.getCreateTableSql(
"CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (
\n"
+ "${rowtype_primary_key} , \n"
+ "`create_time` DATETIME NOT NULL , \n"
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 05fde53336..f775bfb46f 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
@@ -123,7 +124,7 @@ public class SinkExecuteProcessor
Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
- handler.handleSaveMode();
+ new SaveModeExecuteWrapper(handler).execute();
} catch (Exception e) {
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index f4af78f81c..6257a94dde 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
@@ -124,7 +125,7 @@ public class SinkExecuteProcessor
Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
- handler.handleSaveMode();
+ new SaveModeExecuteWrapper(handler).execute();
} catch (Exception e) {
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 48baa7f746..886f6d6a15 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
@@ -142,7 +143,7 @@ public class SinkExecuteProcessor
Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
- handler.handleSaveMode();
+ new SaveModeExecuteWrapper(handler).execute();
} catch (Exception e) {
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index fc5eade3f4..654cfaa181 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
@@ -143,7 +144,7 @@ public class SinkExecuteProcessor
Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
- handler.handleSaveMode();
+ new SaveModeExecuteWrapper(handler).execute();
} catch (Exception e) {
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 50e35d9117..f988f293a5 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
@@ -677,7 +678,7 @@ public class MultipleTableJobConfigParser {
Optional<SaveModeHandler> saveModeHandler =
saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
- handler.handleSaveMode();
+ new SaveModeExecuteWrapper(handler).execute();
} catch (Exception e) {
throw new
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}