This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b0d70ce224 [Improve] Add SaveMode log of process detail (#6375)
b0d70ce224 is described below

commit b0d70ce2240ffca4b8b8a3ee3cdc9cdb4b2ef23a
Author: Jia Fan <[email protected]>
AuthorDate: Wed Mar 20 12:08:28 2024 +0800

    [Improve] Add SaveMode log of process detail (#6375)
---
 .../seatunnel/api/sink/DefaultSaveModeHandler.java |  83 +++-
 ...odeHandler.java => SaveModeExecuteWrapper.java} |  22 +-
 .../apache/seatunnel/api/sink/SaveModeHandler.java |  11 +
 .../seatunnel/api/table/catalog/Catalog.java       |  13 +
 .../catalog/InfoPreviewResult.java}                |  20 +-
 .../catalog/PreviewResult.java}                    |  22 +-
 .../catalog/SQLPreviewResult.java}                 |  21 +-
 .../connectors/doris/catalog/DorisCatalog.java     |  31 +-
 .../connectors/doris/util/DorisCatalogUtil.java    |   4 +
 .../doris/catalog/PreviewActionTest.java           | 119 ++++++
 .../catalog/ElasticSearchCatalog.java              |  21 +
 .../elasticsearch/catalog/PreviewActionTest.java   |  96 +++++
 .../seatunnel/iceberg/catalog/IcebergCatalog.java  |  28 +-
 .../iceberg/catalog/PreviewActionTest.java         | 111 ++++++
 .../jdbc/catalog/AbstractJdbcCatalog.java          |  21 +
 .../jdbc/catalog/psql/PostgresCatalog.java         |   7 +
 .../seatunnel/jdbc/catalog/PreviewActionTest.java  | 441 +++++++++++++++++++++
 .../starrocks/catalog/StarRocksCatalog.java        |  66 +--
 .../starrocks/sink/StarRocksSaveModeUtil.java      |  31 +-
 .../starrocks/catalog/PreviewActionTest.java       | 126 ++++++
 .../catalog/StarRocksCreateTableTest.java          |   6 +-
 .../flink/execution/SinkExecuteProcessor.java      |   3 +-
 .../flink/execution/SinkExecuteProcessor.java      |   3 +-
 .../spark/execution/SinkExecuteProcessor.java      |   3 +-
 .../spark/execution/SinkExecuteProcessor.java      |   3 +-
 .../core/parse/MultipleTableJobConfigParser.java   |   3 +-
 26 files changed, 1238 insertions(+), 77 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
index 9566658979..bbbe99281b 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
@@ -23,19 +23,26 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
 import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SINK_TABLE_NOT_EXIST;
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.SOURCE_ALREADY_HAS_DATA;
 
 @AllArgsConstructor
+@Slf4j
 public class DefaultSaveModeHandler implements SaveModeHandler {
 
-    public SchemaSaveMode schemaSaveMode;
-    public DataSaveMode dataSaveMode;
-    public Catalog catalog;
-    public TablePath tablePath;
-    public CatalogTable catalogTable;
-    public String customSql;
+    @Nonnull public SchemaSaveMode schemaSaveMode;
+    @Nonnull public DataSaveMode dataSaveMode;
+    @Nonnull public Catalog catalog;
+    @Nonnull public TablePath tablePath;
+    @Nullable public CatalogTable catalogTable;
+    @Nullable public String customSql;
 
     public DefaultSaveModeHandler(
             SchemaSaveMode schemaSaveMode,
@@ -132,17 +139,58 @@ public class DefaultSaveModeHandler implements 
SaveModeHandler {
     }
 
     protected void dropTable() {
+        try {
+            log.info(
+                    "Dropping table {} with action {}",
+                    tablePath,
+                    catalog.previewAction(
+                            Catalog.ActionType.DROP_TABLE, tablePath, 
Optional.empty()));
+        } catch (UnsupportedOperationException ignore) {
+            log.info("Dropping table {}", tablePath);
+        }
         catalog.dropTable(tablePath, true);
     }
 
     protected void createTable() {
         if (!catalog.databaseExists(tablePath.getDatabaseName())) {
-            catalog.createDatabase(TablePath.of(tablePath.getDatabaseName(), 
""), true);
+            TablePath databasePath = TablePath.of(tablePath.getDatabaseName(), 
"");
+            try {
+                log.info(
+                        "Creating database {} with action {}",
+                        tablePath.getDatabaseName(),
+                        catalog.previewAction(
+                                Catalog.ActionType.CREATE_DATABASE,
+                                databasePath,
+                                Optional.empty()));
+            } catch (UnsupportedOperationException ignore) {
+                log.info("Creating database {}", tablePath.getDatabaseName());
+            }
+            catalog.createDatabase(databasePath, true);
+        }
+        try {
+            log.info(
+                    "Creating table {} with action {}",
+                    tablePath,
+                    catalog.previewAction(
+                            Catalog.ActionType.CREATE_TABLE,
+                            tablePath,
+                            Optional.ofNullable(catalogTable)));
+        } catch (UnsupportedOperationException ignore) {
+            log.info("Creating table {}", tablePath);
         }
         catalog.createTable(tablePath, catalogTable, true);
     }
 
     protected void truncateTable() {
+        try {
+            log.info(
+                    "Truncating table {} with action {}",
+                    tablePath,
+                    catalog.previewAction(
+                            Catalog.ActionType.TRUNCATE_TABLE, tablePath, 
Optional.empty()));
+        } catch (UnsupportedOperationException ignore) {
+            log.info("Truncating table {}", tablePath);
+        }
         catalog.truncateTable(tablePath, true);
     }
 
@@ -151,9 +199,30 @@ public class DefaultSaveModeHandler implements 
SaveModeHandler {
     }
 
     protected void executeCustomSql() {
+        log.info("Executing custom SQL for table {} with SQL: {}", tablePath, 
customSql);
         catalog.executeSql(tablePath, customSql);
     }
 
+    @Override
+    public TablePath getHandleTablePath() {
+        return tablePath;
+    }
+
+    @Override
+    public Catalog getHandleCatalog() {
+        return catalog;
+    }
+
+    @Override
+    public SchemaSaveMode getSchemaSaveMode() {
+        return schemaSaveMode;
+    }
+
+    @Override
+    public DataSaveMode getDataSaveMode() {
+        return dataSaveMode;
+    }
+
     @Override
     public void close() throws Exception {
         catalog.close();
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteWrapper.java
similarity index 58%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteWrapper.java
index 6fe3d8b3c2..5da173a9a9 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeExecuteWrapper.java
@@ -17,14 +17,24 @@
 
 package org.apache.seatunnel.api.sink;
 
-public interface SaveModeHandler extends AutoCloseable {
+import lombok.extern.slf4j.Slf4j;
 
-    void handleSchemaSaveMode();
+@Slf4j
+public class SaveModeExecuteWrapper {
 
-    void handleDataSaveMode();
+    public SaveModeExecuteWrapper(SaveModeHandler handler) {
+        this.handler = handler;
+    }
 
-    default void handleSaveMode() {
-        handleSchemaSaveMode();
-        handleDataSaveMode();
+    public void execute() {
+        log.info(
+                "Executing save mode for table: {}, with SchemaSaveMode: {}, 
DataSaveMode: {} using Catalog: {}",
+                handler.getHandleTablePath(),
+                handler.getSchemaSaveMode(),
+                handler.getDataSaveMode(),
+                handler.getHandleCatalog().name());
+        handler.handleSaveMode();
     }
+
+    private final SaveModeHandler handler;
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
index 6fe3d8b3c2..e75c2215dd 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
@@ -17,12 +17,23 @@
 
 package org.apache.seatunnel.api.sink;
 
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
 public interface SaveModeHandler extends AutoCloseable {
 
     void handleSchemaSaveMode();
 
     void handleDataSaveMode();
 
+    SchemaSaveMode getSchemaSaveMode();
+
+    DataSaveMode getDataSaveMode();
+
+    TablePath getHandleTablePath();
+
+    Catalog getHandleCatalog();
+
     default void handleSaveMode() {
         handleSchemaSaveMode();
         handleDataSaveMode();
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index 560fa98d3b..842ec98782 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -262,6 +262,19 @@ public interface Catalog extends AutoCloseable {
 
     default void executeSql(TablePath tablePath, String sql) {}
 
+    default PreviewResult previewAction(
+            ActionType actionType, TablePath tablePath, Optional<CatalogTable> 
catalogTable) {
+        throw new UnsupportedOperationException("Preview action is not 
supported");
+    }
+
+    enum ActionType {
+        CREATE_TABLE,
+        CREATE_DATABASE,
+        DROP_TABLE,
+        DROP_DATABASE,
+        TRUNCATE_TABLE
+    }
+
     // todo: Support for update table metadata
 
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/InfoPreviewResult.java
similarity index 69%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/InfoPreviewResult.java
index 6fe3d8b3c2..9c6e06fce9 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/InfoPreviewResult.java
@@ -15,16 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.api.table.catalog;
 
-public interface SaveModeHandler extends AutoCloseable {
+public class InfoPreviewResult extends PreviewResult {
+    private final String info;
 
-    void handleSchemaSaveMode();
+    public String getInfo() {
+        return info;
+    }
 
-    void handleDataSaveMode();
+    public InfoPreviewResult(String info) {
+        super(Type.INFO);
+        this.info = info;
+    }
 
-    default void handleSaveMode() {
-        handleSchemaSaveMode();
-        handleDataSaveMode();
+    @Override
+    public String toString() {
+        return info;
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PreviewResult.java
similarity index 68%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PreviewResult.java
index 6fe3d8b3c2..ad4b3cc102 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PreviewResult.java
@@ -15,16 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.api.table.catalog;
 
-public interface SaveModeHandler extends AutoCloseable {
+/** The result of a SQL preview action in {@link Catalog#previewAction}. */
+public abstract class PreviewResult {
 
-    void handleSchemaSaveMode();
+    private final Type type;
 
-    void handleDataSaveMode();
+    public PreviewResult(Type type) {
+        this.type = type;
+    }
+
+    public Type getType() {
+        return type;
+    }
 
-    default void handleSaveMode() {
-        handleSchemaSaveMode();
-        handleDataSaveMode();
+    public enum Type {
+        SQL,
+        INFO,
+        OTHER
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SQLPreviewResult.java
similarity index 69%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SQLPreviewResult.java
index 6fe3d8b3c2..6cdcc33fc2 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeHandler.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SQLPreviewResult.java
@@ -15,16 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.api.sink;
+package org.apache.seatunnel.api.table.catalog;
 
-public interface SaveModeHandler extends AutoCloseable {
+public class SQLPreviewResult extends PreviewResult {
 
-    void handleSchemaSaveMode();
+    private final String sql;
 
-    void handleDataSaveMode();
+    public String getSql() {
+        return sql;
+    }
+
+    public SQLPreviewResult(String sql) {
+        super(Type.SQL);
+        this.sql = sql;
+    }
 
-    default void handleSaveMode() {
-        handleSchemaSaveMode();
-        handleDataSaveMode();
+    @Override
+    public String toString() {
+        return sql;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
index 0e5faef550..1816585731 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
@@ -20,7 +20,9 @@ package org.apache.seatunnel.connectors.doris.catalog;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -49,6 +51,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkArgument;
 
 public class DorisCatalog implements Catalog {
 
@@ -339,8 +344,7 @@ public class DorisCatalog implements Catalog {
             throws TableNotExistException, CatalogException {
         try {
             if (ignoreIfNotExists) {
-                conn.createStatement()
-                        .execute(String.format("TRUNCATE TABLE  %s", 
tablePath.getFullName()));
+                
conn.createStatement().execute(DorisCatalogUtil.getTruncateTableQuery(tablePath));
             }
         } catch (Exception e) {
             throw new CatalogException(
@@ -359,4 +363,27 @@ public class DorisCatalog implements Catalog {
             throw new CatalogException(String.format("Failed executeSql error 
%s", sql), e);
         }
     }
+
+    @Override
+    public PreviewResult previewAction(
+            ActionType actionType, TablePath tablePath, Optional<CatalogTable> 
catalogTable) {
+        if (actionType == ActionType.CREATE_TABLE) {
+            checkArgument(catalogTable.isPresent(), "CatalogTable cannot be 
null");
+            return new SQLPreviewResult(
+                    DorisCatalogUtil.getCreateTableStatement(
+                            dorisConfig.getCreateTableTemplate(), tablePath, 
catalogTable.get()));
+        } else if (actionType == ActionType.DROP_TABLE) {
+            return new 
SQLPreviewResult(DorisCatalogUtil.getDropTableQuery(tablePath, true));
+        } else if (actionType == ActionType.TRUNCATE_TABLE) {
+            return new 
SQLPreviewResult(DorisCatalogUtil.getTruncateTableQuery(tablePath));
+        } else if (actionType == ActionType.CREATE_DATABASE) {
+            return new SQLPreviewResult(
+                    
DorisCatalogUtil.getCreateDatabaseQuery(tablePath.getDatabaseName(), true));
+        } else if (actionType == ActionType.DROP_DATABASE) {
+            return new SQLPreviewResult(
+                    
DorisCatalogUtil.getDropDatabaseQuery(tablePath.getDatabaseName(), true));
+        } else {
+            throw new UnsupportedOperationException("Unsupported action type: 
" + actionType);
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
index 133bcffd33..f03d488de9 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
@@ -95,6 +95,10 @@ public class DorisCatalogUtil {
         return "DROP TABLE " + (ignoreIfNotExists ? "IF EXISTS " : "") + 
tablePath.getFullName();
     }
 
+    public static String getTruncateTableQuery(TablePath tablePath) {
+        return "TRUNCATE TABLE " + tablePath.getFullName();
+    }
+
     /**
      * @param createTableTemplate create table template
      * @param catalogTable catalog table
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
new file mode 100644
index 0000000000..ffca59c5e1
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class PreviewActionTest {
+
+    private static final CatalogTable CATALOG_TABLE =
+            CatalogTable.of(
+                    TableIdentifier.of("catalog", "database", "table"),
+                    TableSchema.builder()
+                            .column(
+                                    PhysicalColumn.of(
+                                            "test",
+                                            BasicType.STRING_TYPE,
+                                            (Long) null,
+                                            true,
+                                            null,
+                                            ""))
+                            .build(),
+                    Collections.emptyMap(),
+                    Collections.emptyList(),
+                    "comment");
+
+    @Test
+    public void testDorisPreviewAction() {
+        DorisCatalogFactory factory = new DorisCatalogFactory();
+        Catalog catalog =
+                factory.createCatalog(
+                        "test",
+                        ReadonlyConfig.fromMap(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("fenodes", "localhost:9300");
+                                        put("username", "root");
+                                        put("password", "root");
+                                    }
+                                }));
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_DATABASE,
+                "CREATE DATABASE IF NOT EXISTS testddatabase",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_DATABASE,
+                "DROP DATABASE IF EXISTS testddatabase",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.TRUNCATE_TABLE,
+                "TRUNCATE TABLE testddatabase.testtable",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_TABLE,
+                "DROP TABLE IF EXISTS testddatabase.testtable",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_TABLE,
+                "CREATE TABLE IF NOT EXISTS `testddatabase`.`testtable` (\n"
+                        + "`test` STRING NULL \n"
+                        + ") ENGINE=OLAP\n"
+                        + " UNIQUE KEY ()\n"
+                        + "DISTRIBUTED BY HASH ()\n"
+                        + " PROPERTIES (\n"
+                        + "\"replication_allocation\" = 
\"tag.location.default: 1\",\n"
+                        + "\"in_memory\" = \"false\",\n"
+                        + "\"storage_format\" = \"V2\",\n"
+                        + "\"disable_auto_compaction\" = \"false\"\n"
+                        + ")",
+                Optional.of(CATALOG_TABLE));
+    }
+
+    private void assertPreviewResult(
+            Catalog catalog,
+            Catalog.ActionType actionType,
+            String expectedSql,
+            Optional<CatalogTable> catalogTable) {
+        PreviewResult previewResult =
+                catalog.previewAction(
+                        actionType, TablePath.of("testddatabase.testtable"), 
catalogTable);
+        Assertions.assertInstanceOf(SQLPreviewResult.class, previewResult);
+        Assertions.assertEquals(expectedSql, ((SQLPreviewResult) 
previewResult).getSql());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index 23265fa182..066a69c2dc 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -21,7 +21,9 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigUtil;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -44,6 +46,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -232,4 +235,22 @@ public class ElasticSearchCatalog implements Catalog {
         options.put("config", ConfigUtil.convertToJsonString(tablePath));
         return options;
     }
+
+    @Override
+    public PreviewResult previewAction(
+            ActionType actionType, TablePath tablePath, Optional<CatalogTable> 
catalogTable) {
+        if (actionType == ActionType.CREATE_TABLE) {
+            return new InfoPreviewResult("create index " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.DROP_TABLE) {
+            return new InfoPreviewResult("delete index " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.TRUNCATE_TABLE) {
+            return new InfoPreviewResult("delete and create index " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.CREATE_DATABASE) {
+            return new InfoPreviewResult("create index " + 
tablePath.getTableName());
+        } else if (actionType == ActionType.DROP_DATABASE) {
+            return new InfoPreviewResult("delete index " + 
tablePath.getTableName());
+        } else {
+            throw new UnsupportedOperationException("Unsupported action type: 
" + actionType);
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/PreviewActionTest.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/PreviewActionTest.java
new file mode 100644
index 0000000000..a81ce8f19a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/PreviewActionTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class PreviewActionTest {
+
+    private static final CatalogTable CATALOG_TABLE =
+            CatalogTable.of(
+                    TableIdentifier.of("catalog", "database", "table"),
+                    TableSchema.builder()
+                            .column(
+                                    PhysicalColumn.of(
+                                            "test",
+                                            BasicType.STRING_TYPE,
+                                            (Long) null,
+                                            true,
+                                            null,
+                                            ""))
+                            .build(),
+                    Collections.emptyMap(),
+                    Collections.emptyList(),
+                    "comment");
+
+    @Test
+    public void testElasticSearchPreviewAction() {
+        ElasticSearchCatalogFactory factory = new 
ElasticSearchCatalogFactory();
+        Catalog catalog = factory.createCatalog("test", 
ReadonlyConfig.fromMap(new HashMap<>()));
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_DATABASE,
+                "create index testtable",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_DATABASE,
+                "delete index testtable",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.TRUNCATE_TABLE,
+                "delete and create index testtable",
+                Optional.empty());
+        assertPreviewResult(
+                catalog, Catalog.ActionType.DROP_TABLE, "delete index 
testtable", Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_TABLE,
+                "create index testtable",
+                Optional.of(CATALOG_TABLE));
+    }
+
+    private void assertPreviewResult(
+            Catalog catalog,
+            Catalog.ActionType actionType,
+            String expectedSql,
+            Optional<CatalogTable> catalogTable) {
+        PreviewResult previewResult =
+                catalog.previewAction(
+                        actionType, TablePath.of("testddatabase.testtable"), 
catalogTable);
+        Assertions.assertInstanceOf(InfoPreviewResult.class, previewResult);
+        Assertions.assertEquals(expectedSql, ((InfoPreviewResult) 
previewResult).getInfo());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
index 32beff121b..520f9bdbac 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
@@ -20,7 +20,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.iceberg.catalog;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
@@ -29,13 +31,11 @@ import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistExceptio
 import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogLoader;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;
 
 import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Namespace;
@@ -49,8 +49,10 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils.toIcebergTableIdentifier;
 import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils.toTablePath;
 
@@ -254,10 +256,22 @@ public class IcebergCatalog implements Catalog {
                 catalogName);
     }
 
-    public Schema toIcebergSchema(TableSchema tableSchema) {
-        // Generate struct type
-        SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
-        Types.StructType structType = 
SchemaUtils.toIcebergType(rowType).asStructType();
-        return new Schema(structType.fields());
+    @Override
+    public PreviewResult previewAction(
+            ActionType actionType, TablePath tablePath, Optional<CatalogTable> 
catalogTable) {
+        if (actionType == ActionType.CREATE_TABLE) {
+            checkArgument(catalogTable.isPresent(), "CatalogTable cannot be 
null");
+            return new InfoPreviewResult("create table " + 
toIcebergTableIdentifier(tablePath));
+        } else if (actionType == ActionType.DROP_TABLE) {
+            return new InfoPreviewResult("drop table " + 
toIcebergTableIdentifier(tablePath));
+        } else if (actionType == ActionType.TRUNCATE_TABLE) {
+            return new InfoPreviewResult("truncate table " + 
toIcebergTableIdentifier(tablePath));
+        } else if (actionType == ActionType.CREATE_DATABASE) {
+            return new InfoPreviewResult("do nothing");
+        } else if (actionType == ActionType.DROP_DATABASE) {
+            return new InfoPreviewResult("do nothing");
+        } else {
+            throw new UnsupportedOperationException("Unsupported action type: 
" + actionType);
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/PreviewActionTest.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/PreviewActionTest.java
new file mode 100644
index 0000000000..ca88c39a0c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/PreviewActionTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iceberg.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class PreviewActionTest {
+
+    private static final CatalogTable CATALOG_TABLE =
+            CatalogTable.of(
+                    TableIdentifier.of("catalog", "database", "table"),
+                    TableSchema.builder()
+                            .column(
+                                    PhysicalColumn.of(
+                                            "test",
+                                            BasicType.STRING_TYPE,
+                                            (Long) null,
+                                            true,
+                                            null,
+                                            ""))
+                            .build(),
+                    Collections.emptyMap(),
+                    Collections.emptyList(),
+                    "comment");
+
+    @Test
+    public void testElasticSearchPreviewAction() {
+        IcebergCatalogFactory factory = new IcebergCatalogFactory();
+        Catalog catalog =
+                factory.createCatalog(
+                        "test",
+                        ReadonlyConfig.fromMap(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("catalog_name", "seatunnel_test");
+                                        put(
+                                                "iceberg.catalog.config",
+                                                new HashMap<String, Object>() {
+                                                    {
+                                                        put("type", "hadoop");
+                                                        put(
+                                                                "warehouse",
+                                                                
"file:///tmp/seatunnel/iceberg/hadoop-sink/");
+                                                    }
+                                                });
+                                    }
+                                }));
+        assertPreviewResult(
+                catalog, Catalog.ActionType.CREATE_DATABASE, "do nothing", 
Optional.empty());
+        assertPreviewResult(
+                catalog, Catalog.ActionType.DROP_DATABASE, "do nothing", 
Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.TRUNCATE_TABLE,
+                "truncate table testddatabase.testtable",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_TABLE,
+                "drop table testddatabase.testtable",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_TABLE,
+                "create table testddatabase.testtable",
+                Optional.of(CATALOG_TABLE));
+    }
+
+    private void assertPreviewResult(
+            Catalog catalog,
+            Catalog.ActionType actionType,
+            String expectedSql,
+            Optional<CatalogTable> catalogTable) {
+        PreviewResult previewResult =
+                catalog.previewAction(
+                        actionType, TablePath.of("testddatabase.testtable"), 
catalogTable);
+        Assertions.assertInstanceOf(InfoPreviewResult.class, previewResult);
+        Assertions.assertEquals(expectedSql, ((InfoPreviewResult) 
previewResult).getInfo());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index f695cc30cb..3775f9f47d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -22,7 +22,9 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -577,4 +579,23 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
             throw new CatalogException(String.format("Failed executeSql error 
%s", sql), e);
         }
     }
+
+    @Override
+    public PreviewResult previewAction(
+            ActionType actionType, TablePath tablePath, Optional<CatalogTable> 
catalogTable) {
+        if (actionType == ActionType.CREATE_TABLE) {
+            checkArgument(catalogTable.isPresent(), "CatalogTable cannot be 
null");
+            return new SQLPreviewResult(getCreateTableSql(tablePath, 
catalogTable.get()));
+        } else if (actionType == ActionType.DROP_TABLE) {
+            return new SQLPreviewResult(getDropTableSql(tablePath));
+        } else if (actionType == ActionType.TRUNCATE_TABLE) {
+            return new SQLPreviewResult(getTruncateTableSql(tablePath));
+        } else if (actionType == ActionType.CREATE_DATABASE) {
+            return new 
SQLPreviewResult(getCreateDatabaseSql(tablePath.getDatabaseName()));
+        } else if (actionType == ActionType.DROP_DATABASE) {
+            return new 
SQLPreviewResult(getDropDatabaseSql(tablePath.getDatabaseName()));
+        } else {
+            throw new UnsupportedOperationException("Unsupported action type: 
" + actionType);
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
index 30e140c68a..4697d1999e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
@@ -186,6 +186,13 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
         }
     }
 
+    @Override
+    protected String getCreateTableSql(TablePath tablePath, CatalogTable 
table) {
+        PostgresCreateTableSqlBuilder postgresCreateTableSqlBuilder =
+                new PostgresCreateTableSqlBuilder(table);
+        return postgresCreateTableSqlBuilder.build(tablePath);
+    }
+
     @Override
     protected String getDropTableSql(TablePath tablePath) {
         return "DROP TABLE \""
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
new file mode 100644
index 0000000000..a0cdf7d8a8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm.DamengCatalogFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalogFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase.OceanBaseCatalogFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCatalogFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalogFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerCatalogFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class PreviewActionTest {
+
+    private static final CatalogTable CATALOG_TABLE =
+            CatalogTable.of(
+                    TableIdentifier.of("catalog", "database", "table"),
+                    TableSchema.builder()
+                            .column(
+                                    PhysicalColumn.of(
+                                            "test",
+                                            BasicType.STRING_TYPE,
+                                            (Long) null,
+                                            true,
+                                            null,
+                                            ""))
+                            .build(),
+                    Collections.emptyMap(),
+                    Collections.emptyList(),
+                    "comment");
+
+    @Test
+    public void testMySQLPreviewAction() {
+        MySqlCatalogFactory factory = new MySqlCatalogFactory();
+        Catalog catalog =
+                factory.createCatalog(
+                        "test",
+                        ReadonlyConfig.fromMap(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("base-url", 
"jdbc:mysql://localhost:3306/test");
+                                        put("username", "root");
+                                        put("password", "root");
+                                    }
+                                }));
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_DATABASE,
+                "CREATE DATABASE `testddatabase`;",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_DATABASE,
+                "DROP DATABASE `testddatabase`;",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.TRUNCATE_TABLE,
+                "TRUNCATE TABLE `testddatabase`.`testtable`;",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_TABLE,
+                "DROP TABLE `testddatabase`.`testtable`;",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_TABLE,
+                "CREATE TABLE `testtable` (\n"
+                        + "\t`test` LONGTEXT NULL COMMENT ''\n"
+                        + ") COMMENT = 'comment';",
+                Optional.of(CATALOG_TABLE));
+    }
+
+    @Test
+    public void testDMPreviewAction() {
+        DamengCatalogFactory factory = new DamengCatalogFactory();
+        Catalog catalog =
+                factory.createCatalog(
+                        "test",
+                        ReadonlyConfig.fromMap(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("base-url", 
"jdbc:mysql://localhost:3306/test");
+                                        put("username", "root");
+                                        put("password", "root");
+                                    }
+                                }));
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () ->
+                        assertPreviewResult(
+                                catalog,
+                                Catalog.ActionType.CREATE_DATABASE,
+                                "CREATE DATABASE `testddatabase`;",
+                                Optional.empty()));
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () ->
+                        assertPreviewResult(
+                                catalog,
+                                Catalog.ActionType.DROP_DATABASE,
+                                "DROP DATABASE `testddatabase`;",
+                                Optional.empty()));
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () ->
+                        assertPreviewResult(
+                                catalog,
+                                Catalog.ActionType.TRUNCATE_TABLE,
+                                "TRUNCATE TABLE `testddatabase`.`testtable`;",
+                                Optional.empty()));
+        assertPreviewResult(
+                catalog, Catalog.ActionType.DROP_TABLE, "DROP TABLE 
TESTTABLE", Optional.empty());
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () ->
+                        assertPreviewResult(
+                                catalog,
+                                Catalog.ActionType.CREATE_TABLE,
+                                "CREATE TABLE `testtable` (\n"
+                                        + "\t`test` LONGTEXT NULL COMMENT ''\n"
+                                        + ") COMMENT = 'comment';",
+                                Optional.of(CATALOG_TABLE)));
+    }
+
+    @Test
+    public void testOceanBasePreviewAction() {
+        OceanBaseCatalogFactory factory = new OceanBaseCatalogFactory();
+        Catalog catalog =
+                factory.createCatalog(
+                        "test",
+                        ReadonlyConfig.fromMap(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("base-url", 
"jdbc:mysql://localhost:3306/test");
+                                        put("compatibleMode", "oracle");
+                                        put("username", "root");
+                                        put("password", "root");
+                                    }
+                                }));
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () ->
+                        assertPreviewResult(
+                                catalog,
+                                Catalog.ActionType.CREATE_DATABASE,
+                                "CREATE DATABASE `testddatabase`;",
+                                Optional.empty()));
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () ->
+                        assertPreviewResult(
+                                catalog,
+                                Catalog.ActionType.DROP_DATABASE,
+                                "DROP DATABASE `testddatabase`;",
+                                Optional.empty()));
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.TRUNCATE_TABLE,
+                "TRUNCATE TABLE \"null\".\"testtable\"",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_TABLE,
+                "DROP TABLE \"testtable\"",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_TABLE,
+                "CREATE TABLE \"testtable\" (\n" + "\"test\" VARCHAR2(4000)\n" 
+ ")",
+                Optional.of(CATALOG_TABLE));
+
+        Catalog catalog2 =
+                factory.createCatalog(
+                        "test",
+                        ReadonlyConfig.fromMap(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("base-url", 
"jdbc:mysql://localhost:3306/test");
+                                        put("compatibleMode", "mysql");
+                                        put("username", "root");
+                                        put("password", "root");
+                                    }
+                                }));
+        assertPreviewResult(
+                catalog2,
+                Catalog.ActionType.CREATE_DATABASE,
+                "CREATE DATABASE `testddatabase`;",
+                Optional.empty());
+        assertPreviewResult(
+                catalog2,
+                Catalog.ActionType.DROP_DATABASE,
+                "DROP DATABASE `testddatabase`;",
+                Optional.empty());
+        assertPreviewResult(
+                catalog2,
+                Catalog.ActionType.TRUNCATE_TABLE,
+                "TRUNCATE TABLE `testddatabase`.`testtable`;",
+                Optional.empty());
+        assertPreviewResult(
+                catalog2,
+                Catalog.ActionType.DROP_TABLE,
+                "DROP TABLE `testddatabase`.`testtable`;",
+                Optional.empty());
+        assertPreviewResult(
+                catalog2,
+                Catalog.ActionType.CREATE_TABLE,
+                "CREATE TABLE `testtable` (\n"
+                        + "\t`test` LONGTEXT NULL COMMENT ''\n"
+                        + ") COMMENT = 'comment';",
+                Optional.of(CATALOG_TABLE));
+    }
+
+    @Test
+    public void testOraclePreviewAction() {
+        OracleCatalogFactory factory = new OracleCatalogFactory();
+        Catalog catalog =
+                factory.createCatalog(
+                        "test",
+                        ReadonlyConfig.fromMap(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("base-url", 
"jdbc:mysql://localhost:3306/test");
+                                        put("username", "root");
+                                        put("password", "root");
+                                    }
+                                }));
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () ->
+                        assertPreviewResult(
+                                catalog,
+                                Catalog.ActionType.CREATE_DATABASE,
+                                "CREATE DATABASE `testddatabase`;",
+                                Optional.empty()));
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () ->
+                        assertPreviewResult(
+                                catalog,
+                                Catalog.ActionType.DROP_DATABASE,
+                                "DROP DATABASE `testddatabase`;",
+                                Optional.empty()));
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.TRUNCATE_TABLE,
+                "TRUNCATE TABLE \"null\".\"testtable\"",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_TABLE,
+                "DROP TABLE \"testtable\"",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_TABLE,
+                "CREATE TABLE \"testtable\" (\n" + "\"test\" VARCHAR2(4000)\n" 
+ ")",
+                Optional.of(CATALOG_TABLE));
+    }
+
+    @Test
+    public void testPostgresPreviewAction() {
+        PostgresCatalogFactory factory = new PostgresCatalogFactory();
+        Catalog catalog =
+                factory.createCatalog(
+                        "test",
+                        ReadonlyConfig.fromMap(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("base-url", 
"jdbc:mysql://localhost:3306/test");
+                                        put("username", "root");
+                                        put("password", "root");
+                                    }
+                                }));
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_DATABASE,
+                "CREATE DATABASE \"testddatabase\"",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_DATABASE,
+                "DROP DATABASE \"testddatabase\"",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.TRUNCATE_TABLE,
+                "TRUNCATE TABLE  \"null\".\"testtable\"",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_TABLE,
+                "DROP TABLE \"null\".\"testtable\"",
+                Optional.empty());
+
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_TABLE,
+                "CREATE TABLE \"testtable\" (\n" + "\"test\" text\n" + ");",
+                Optional.of(CATALOG_TABLE));
+    }
+
+    @Test
+    public void testSqlServerPreviewAction() {
+        SqlServerCatalogFactory factory = new SqlServerCatalogFactory();
+        Catalog catalog =
+                factory.createCatalog(
+                        "test",
+                        ReadonlyConfig.fromMap(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put(
+                                                "base-url",
+                                                
"jdbc:sqlserver://localhost:1433;databaseName=column_type_test");
+                                        put("username", "root");
+                                        put("password", "root");
+                                    }
+                                }));
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_DATABASE,
+                "CREATE DATABASE testddatabase",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_DATABASE,
+                "DROP DATABASE testddatabase;",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.TRUNCATE_TABLE,
+                "TRUNCATE TABLE  [testddatabase].[testtable]",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_TABLE,
+                "DROP TABLE testddatabase.testtable",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_TABLE,
+                "IF OBJECT_ID('[testddatabase].[testtable]', 'U') IS NULL \n"
+                        + "BEGIN \n"
+                        + "CREATE TABLE [testddatabase].[testtable] ( \n"
+                        + "\t[test] TEXT NULL\n"
+                        + ");\n"
+                        + "EXEC testddatabase.sys.sp_addextendedproperty 
'MS_Description', N'comment', 'schema', N'null', 'table', N'testtable';\n"
+                        + "EXEC testddatabase.sys.sp_addextendedproperty 
'MS_Description', N'', 'schema', N'null', 'table', N'testtable', 'column', 
N'test';\n"
+                        + "\n"
+                        + "END",
+                Optional.of(CATALOG_TABLE));
+    }
+
+    @Test
+    public void testTiDBPreviewAction() {
+        TiDBCatalogFactory factory = new TiDBCatalogFactory();
+        Catalog catalog =
+                factory.createCatalog(
+                        "test",
+                        ReadonlyConfig.fromMap(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("base-url", 
"jdbc:mysql://localhost:3306/test");
+                                        put("username", "root");
+                                        put("password", "root");
+                                    }
+                                }));
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_DATABASE,
+                "CREATE DATABASE `testddatabase`;",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_DATABASE,
+                "DROP DATABASE `testddatabase`;",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.TRUNCATE_TABLE,
+                "TRUNCATE TABLE `testddatabase`.`testtable`;",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_TABLE,
+                "DROP TABLE `testddatabase`.`testtable`;",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_TABLE,
+                "CREATE TABLE `testtable` (\n"
+                        + "\t`test` LONGTEXT NULL COMMENT ''\n"
+                        + ") COMMENT = 'comment';",
+                Optional.of(CATALOG_TABLE));
+    }
+
+    private void assertPreviewResult(
+            Catalog catalog,
+            Catalog.ActionType actionType,
+            String expectedSql,
+            Optional<CatalogTable> catalogTable) {
+        PreviewResult previewResult =
+                catalog.previewAction(
+                        actionType, TablePath.of("testddatabase.testtable"), 
catalogTable);
+        Assertions.assertInstanceOf(SQLPreviewResult.class, previewResult);
+        Assertions.assertEquals(expectedSql, ((SQLPreviewResult) 
previewResult).getSql());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 3dc7eebfa6..8a14b08efe 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -20,7 +20,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -44,6 +46,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.mysql.cj.MysqlType;
 import lombok.extern.slf4j.Slf4j;
 
@@ -214,7 +217,7 @@ public class StarRocksCatalog implements Catalog {
     public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
         this.createTable(
-                StarRocksSaveModeUtil.fillingCreateSql(
+                StarRocksSaveModeUtil.getCreateTableSql(
                         template,
                         tablePath.getDatabaseName(),
                         tablePath.getTableName(),
@@ -225,12 +228,8 @@ public class StarRocksCatalog implements Catalog {
     public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
             throws TableNotExistException, CatalogException {
         try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
-            if (ignoreIfNotExists) {
-                conn.createStatement().execute("DROP TABLE IF EXISTS " + 
tablePath.getFullName());
-            } else {
-                conn.createStatement()
-                        .execute(String.format("DROP TABLE %s", 
tablePath.getFullName()));
-            }
+            conn.createStatement()
+                    .execute(StarRocksSaveModeUtil.getDropTableSql(tablePath, 
ignoreIfNotExists));
         } catch (Exception e) {
             throw new CatalogException(
                     String.format("Failed listing database in catalog %s", 
catalogName), e);
@@ -242,7 +241,7 @@ public class StarRocksCatalog implements Catalog {
         try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
             if (ignoreIfNotExists) {
                 conn.createStatement()
-                        .execute(String.format("TRUNCATE TABLE  %s", 
tablePath.getFullName()));
+                        
.execute(StarRocksSaveModeUtil.getTruncateTableSql(tablePath));
             }
         } catch (Exception e) {
             throw new CatalogException(
@@ -277,16 +276,10 @@ public class StarRocksCatalog implements Catalog {
     public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
             throws DatabaseAlreadyExistException, CatalogException {
         try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
-            if (ignoreIfExists) {
-                conn.createStatement()
-                        .execute(
-                                "CREATE DATABASE IF NOT EXISTS `"
-                                        + tablePath.getDatabaseName()
-                                        + "`");
-            } else {
-                conn.createStatement()
-                        .execute("CREATE DATABASE `" + 
tablePath.getDatabaseName() + "`");
-            }
+            conn.createStatement()
+                    .execute(
+                            StarRocksSaveModeUtil.getCreateDatabaseSql(
+                                    tablePath.getDatabaseName(), 
ignoreIfExists));
         } catch (Exception e) {
             throw new CatalogException(
                     String.format("Failed listing database in catalog %s", 
catalogName), e);
@@ -297,13 +290,10 @@ public class StarRocksCatalog implements Catalog {
     public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException {
         try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
-            if (ignoreIfNotExists) {
-                conn.createStatement()
-                        .execute("DROP DATABASE IF EXISTS `" + 
tablePath.getDatabaseName() + "`");
-            } else {
-                conn.createStatement()
-                        .execute(String.format("DROP DATABASE `%s`", 
tablePath.getDatabaseName()));
-            }
+            conn.createStatement()
+                    .execute(
+                            StarRocksSaveModeUtil.getDropDatabaseSql(
+                                    tablePath.getDatabaseName(), 
ignoreIfNotExists));
         } catch (Exception e) {
             throw new CatalogException(
                     String.format("Failed listing database in catalog %s", 
catalogName), e);
@@ -501,4 +491,30 @@ public class StarRocksCatalog implements Catalog {
             return false;
         }
     }
+
+    @Override
+    public PreviewResult previewAction(
+            ActionType actionType, TablePath tablePath, Optional<CatalogTable> 
catalogTable) {
+        if (actionType == ActionType.CREATE_TABLE) {
+            Preconditions.checkArgument(catalogTable.isPresent(), 
"CatalogTable cannot be null");
+            return new SQLPreviewResult(
+                    StarRocksSaveModeUtil.getCreateTableSql(
+                            template,
+                            tablePath.getDatabaseName(),
+                            tablePath.getTableName(),
+                            catalogTable.get().getTableSchema()));
+        } else if (actionType == ActionType.DROP_TABLE) {
+            return new 
SQLPreviewResult(StarRocksSaveModeUtil.getDropTableSql(tablePath, true));
+        } else if (actionType == ActionType.TRUNCATE_TABLE) {
+            return new 
SQLPreviewResult(StarRocksSaveModeUtil.getTruncateTableSql(tablePath));
+        } else if (actionType == ActionType.CREATE_DATABASE) {
+            return new SQLPreviewResult(
+                    
StarRocksSaveModeUtil.getCreateDatabaseSql(tablePath.getDatabaseName(), true));
+        } else if (actionType == ActionType.DROP_DATABASE) {
+            return new SQLPreviewResult(
+                    "DROP DATABASE IF EXISTS `" + tablePath.getDatabaseName() 
+ "`");
+        } else {
+            throw new UnsupportedOperationException("Unsupported action type: 
" + actionType);
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
index 0ca7774a7c..3b65d65a5b 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
 
 import org.apache.seatunnel.api.sink.SaveModeConstants;
 import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.DecimalType;
@@ -37,7 +38,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 public class StarRocksSaveModeUtil {
 
-    public static String fillingCreateSql(
+    public static String getCreateTableSql(
             String template, String database, String table, TableSchema 
tableSchema) {
         String primaryKey = "";
         if (tableSchema.getPrimaryKey() != null) {
@@ -178,4 +179,32 @@ public class StarRocksSaveModeUtil {
         }
         throw new IllegalArgumentException("Unsupported SeaTunnel's data type: 
" + dataType);
     }
+
+    public static String getCreateDatabaseSql(String database, boolean 
ignoreIfExists) {
+        if (ignoreIfExists) {
+            return "CREATE DATABASE IF NOT EXISTS `" + database + "`";
+        } else {
+            return "CREATE DATABASE `" + database + "`";
+        }
+    }
+
+    public static String getDropDatabaseSql(String database, boolean 
ignoreIfNotExists) {
+        if (ignoreIfNotExists) {
+            return "DROP DATABASE IF EXISTS `" + database + "`";
+        } else {
+            return "DROP DATABASE `" + database + "`";
+        }
+    }
+
+    public static String getDropTableSql(TablePath tablePath, boolean 
ignoreIfNotExists) {
+        if (ignoreIfNotExists) {
+            return "DROP TABLE IF EXISTS " + tablePath.getFullName();
+        } else {
+            return "DROP TABLE " + tablePath.getFullName();
+        }
+    }
+
+    public static String getTruncateTableSql(TablePath tablePath) {
+        return "TRUNCATE TABLE " + tablePath.getFullName();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/PreviewActionTest.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/PreviewActionTest.java
new file mode 100644
index 0000000000..37c06345fd
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/PreviewActionTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+
+public class PreviewActionTest {
+
+    private static final CatalogTable CATALOG_TABLE =
+            CatalogTable.of(
+                    TableIdentifier.of("catalog", "database", "table"),
+                    TableSchema.builder()
+                            .column(
+                                    PhysicalColumn.of(
+                                            "test",
+                                            BasicType.STRING_TYPE,
+                                            (Long) null,
+                                            true,
+                                            null,
+                                            ""))
+                            .column(
+                                    PhysicalColumn.of(
+                                            "test2",
+                                            BasicType.STRING_TYPE,
+                                            (Long) null,
+                                            true,
+                                            null,
+                                            ""))
+                            .primaryKey(PrimaryKey.of("test", 
Collections.singletonList("test")))
+                            .build(),
+                    Collections.emptyMap(),
+                    Collections.emptyList(),
+                    "comment");
+
+    @Test
+    public void testDorisPreviewAction() {
+        StarRocksCatalogFactory factory = new StarRocksCatalogFactory();
+        Catalog catalog =
+                factory.createCatalog(
+                        "test",
+                        ReadonlyConfig.fromMap(
+                                new HashMap<String, Object>() {
+                                    {
+                                        put("base-url", 
"jdbc:mysql://localhost:9030");
+                                        put("username", "root");
+                                        put("password", "root");
+                                    }
+                                }));
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_DATABASE,
+                "CREATE DATABASE IF NOT EXISTS `testddatabase`",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_DATABASE,
+                "DROP DATABASE IF EXISTS `testddatabase`",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.TRUNCATE_TABLE,
+                "TRUNCATE TABLE testddatabase.testtable",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_TABLE,
+                "DROP TABLE IF EXISTS testddatabase.testtable",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_TABLE,
+                "CREATE TABLE IF NOT EXISTS `testddatabase`.`testtable` (\n"
+                        + "`test` STRING NULL ,\n"
+                        + "`test2` STRING NULL \n"
+                        + ") ENGINE=OLAP\n"
+                        + " PRIMARY KEY (`test`)\n"
+                        + "DISTRIBUTED BY HASH (`test`)PROPERTIES (\n"
+                        + "    \"replication_num\" = \"1\" \n"
+                        + ")",
+                Optional.of(CATALOG_TABLE));
+    }
+
+    private void assertPreviewResult(
+            Catalog catalog,
+            Catalog.ActionType actionType,
+            String expectedSql,
+            Optional<CatalogTable> catalogTable) {
+        PreviewResult previewResult =
+                catalog.previewAction(
+                        actionType, TablePath.of("testddatabase.testtable"), 
catalogTable);
+        Assertions.assertInstanceOf(SQLPreviewResult.class, previewResult);
+        Assertions.assertEquals(expectedSql, ((SQLPreviewResult) 
previewResult).getSql());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
index e3471b6bf2..0a3f36196a 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
@@ -54,7 +54,7 @@ public class StarRocksCreateTableTest {
                 PhysicalColumn.of("create_time", BasicType.LONG_TYPE, (Long) 
null, true, null, ""));
 
         String result =
-                StarRocksSaveModeUtil.fillingCreateSql(
+                StarRocksSaveModeUtil.getCreateTableSql(
                         "CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (                                                 
                                                                                
                  \n"
                                 + "${rowtype_primary_key}  ,       \n"
                                 + "${rowtype_unique_key} , \n"
@@ -187,7 +187,7 @@ public class StarRocksCreateTableTest {
                         "L_COMMENT", BasicType.STRING_TYPE, (Long) null, 
false, null, ""));
 
         String result =
-                StarRocksSaveModeUtil.fillingCreateSql(
+                StarRocksSaveModeUtil.getCreateTableSql(
                         "CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (\n"
                                 + "`L_COMMITDATE`,\n"
                                 + "${rowtype_primary_key},\n"
@@ -244,7 +244,7 @@ public class StarRocksCreateTableTest {
         columns.add(PhysicalColumn.of("description", BasicType.STRING_TYPE, 
70000, true, null, ""));
 
         String result =
-                StarRocksSaveModeUtil.fillingCreateSql(
+                StarRocksSaveModeUtil.getCreateTableSql(
                         "CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (                                                 
                                                                                
                  \n"
                                 + "${rowtype_primary_key}  ,       \n"
                                 + "`create_time` DATETIME NOT NULL ,  \n"
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 05fde53336..f775bfb46f 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
@@ -123,7 +124,7 @@ public class SinkExecuteProcessor
                 Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
                 if (saveModeHandler.isPresent()) {
                     try (SaveModeHandler handler = saveModeHandler.get()) {
-                        handler.handleSaveMode();
+                        new SaveModeExecuteWrapper(handler).execute();
                     } catch (Exception e) {
                         throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
                     }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index f4af78f81c..6257a94dde 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
@@ -124,7 +125,7 @@ public class SinkExecuteProcessor
                 Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
                 if (saveModeHandler.isPresent()) {
                     try (SaveModeHandler handler = saveModeHandler.get()) {
-                        handler.handleSaveMode();
+                        new SaveModeExecuteWrapper(handler).execute();
                     } catch (Exception e) {
                         throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
                     }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 48baa7f746..886f6d6a15 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
@@ -142,7 +143,7 @@ public class SinkExecuteProcessor
                 Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
                 if (saveModeHandler.isPresent()) {
                     try (SaveModeHandler handler = saveModeHandler.get()) {
-                        handler.handleSaveMode();
+                        new SaveModeExecuteWrapper(handler).execute();
                     } catch (Exception e) {
                         throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
                     }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index fc5eade3f4..654cfaa181 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigValidator;
+import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
@@ -143,7 +144,7 @@ public class SinkExecuteProcessor
                 Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
                 if (saveModeHandler.isPresent()) {
                     try (SaveModeHandler handler = saveModeHandler.get()) {
-                        handler.handleSaveMode();
+                        new SaveModeExecuteWrapper(handler).execute();
                     } catch (Exception e) {
                         throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
                     }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 50e35d9117..f988f293a5 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
@@ -677,7 +678,7 @@ public class MultipleTableJobConfigParser {
             Optional<SaveModeHandler> saveModeHandler = 
saveModeSink.getSaveModeHandler();
             if (saveModeHandler.isPresent()) {
                 try (SaveModeHandler handler = saveModeHandler.get()) {
-                    handler.handleSaveMode();
+                    new SaveModeExecuteWrapper(handler).execute();
                 } catch (Exception e) {
                     throw new 
SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
                 }


Reply via email to