This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6b7c53d03c [Feature][Restapi] Allow metrics information to be 
associated to logical plan nodes (#7786)
6b7c53d03c is described below

commit 6b7c53d03c82cce55ca8cfa5324830e476390997
Author: Guangdong Liu <[email protected]>
AuthorDate: Fri Oct 11 14:31:24 2024 +0800

    [Feature][Restapi] Allow metrics information to be associated to logical 
plan nodes (#7786)
---
 docs/en/seatunnel-engine/rest-api-v1.md            | 43 ++++++++++++++---
 docs/en/seatunnel-engine/rest-api-v2.md            | 43 ++++++++++++++---
 docs/zh/seatunnel-engine/rest-api-v1.md            | 44 +++++++++++++----
 docs/zh/seatunnel-engine/rest-api-v2.md            | 43 ++++++++++++++---
 .../apache/seatunnel/api/sink/SeaTunnelSink.java   | 10 ++++
 .../api/sink/multitablesink/MultiTableSink.java    | 20 +++++++-
 .../seatunnel/activemq/sink/ActivemqSink.java      | 13 ++++-
 .../activemq/sink/ActivemqSinkFactory.java         |  5 +-
 .../amazondynamodb/sink/AmazonDynamoDBSink.java    |  7 +++
 .../seatunnel/amazonsqs/sink/AmazonSqsSink.java    | 17 +++++--
 .../amazonsqs/sink/AmazonSqsSinkFactory.java       |  3 +-
 .../seatunnel/assertion/sink/AssertSink.java       |  8 ++++
 .../seatunnel/cassandra/sink/CassandraSink.java    |  7 +++
 .../clickhouse/sink/file/ClickhouseFileSink.java   |  6 +++
 .../seatunnel/console/sink/ConsoleSink.java        | 14 +++++-
 .../seatunnel/console/sink/ConsoleSinkFactory.java |  5 +-
 .../seatunnel/datahub/sink/DataHubSink.java        |  7 +++
 .../connectors/seatunnel/sink/DingTalkSink.java    |  7 +++
 .../seatunnel/connectors/doris/sink/DorisSink.java |  5 ++
 .../seatunnel/connectors/druid/sink/DruidSink.java |  6 +++
 .../seatunnel/easysearch/sink/EasysearchSink.java  |  8 ++++
 .../elasticsearch/sink/ElasticsearchSink.java      |  5 ++
 .../connectors/seatunnel/email/sink/EmailSink.java | 17 +++++--
 .../seatunnel/file/cos/sink/CosFileSink.java       |  8 ++++
 .../seatunnel/file/ftp/sink/FtpFileSink.java       | 11 +++++
 .../seatunnel/file/hdfs/sink/HdfsFileSink.java     |  8 ++++
 .../seatunnel/file/oss/jindo/sink/OssFileSink.java |  8 ++++
 .../seatunnel/file/local/sink/LocalFileSink.java   | 10 ++++
 .../seatunnel/file/obs/sink/ObsFileSink.java       |  8 ++++
 .../seatunnel/file/oss/sink/OssFileSink.java       | 11 +++++
 .../seatunnel/file/s3/sink/S3FileSink.java         |  9 +++-
 .../seatunnel/file/sftp/sink/SftpFileSink.java     | 11 +++++
 .../google/firestore/sink/FirestoreSink.java       |  7 +++
 .../connectors/seatunnel/hbase/sink/HbaseSink.java | 13 +++--
 .../connectors/seatunnel/hive/sink/HiveSink.java   |  5 ++
 .../connectors/seatunnel/http/sink/HttpSink.java   | 13 ++++-
 .../seatunnel/http/sink/HttpSinkFactory.java       |  3 +-
 .../seatunnel/feishu/sink/FeishuSink.java          | 13 +++--
 .../seatunnel/feishu/sink/FeishuSinkFactory.java   |  3 +-
 .../seatunnel/wechat/sink/WeChatSink.java          | 13 +++--
 .../connectors/seatunnel/hudi/sink/HudiSink.java   |  5 ++
 .../seatunnel/iceberg/sink/IcebergSink.java        |  7 ++-
 .../seatunnel/influxdb/sink/InfluxDBSink.java      | 12 ++++-
 .../connectors/seatunnel/iotdb/sink/IoTDBSink.java |  8 ++++
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   |  5 ++
 .../connectors/seatunnel/kafka/sink/KafkaSink.java | 12 ++++-
 .../seatunnel/kafka/sink/KafkaSinkFactory.java     |  5 +-
 .../connectors/seatunnel/kudu/sink/KuduSink.java   | 13 ++++-
 .../seatunnel/maxcompute/sink/MaxcomputeSink.java  |  8 ++++
 .../seatunnel/milvus/sink/MilvusSink.java          |  5 ++
 .../connectors/seatunnel/neo4j/sink/Neo4jSink.java |  7 +++
 .../seatunnel/paimon/sink/PaimonSink.java          |  5 ++
 .../seatunnel/pulsar/sink/PulsarSink.java          | 18 +++++--
 .../seatunnel/pulsar/sink/PulsarSinkFactory.java   |  5 +-
 .../seatunnel/qdrant/sink/QdrantSink.java          |  6 +++
 .../seatunnel/rabbitmq/sink/RabbitmqSink.java      |  7 +++
 .../connectors/seatunnel/redis/sink/RedisSink.java | 12 +++--
 .../seatunnel/rocketmq/sink/RocketMqSink.java      |  7 +++
 .../connectors/selectdb/sink/SelectDBSink.java     |  6 +++
 .../seatunnel/sentry/sink/SentrySink.java          |  7 +++
 .../connectors/seatunnel/slack/sink/SlackSink.java |  7 +++
 .../seatunnel/socket/sink/SocketSink.java          |  7 +++
 .../seatunnel/starrocks/sink/StarRocksSink.java    | 11 +++--
 .../seatunnel/tablestore/sink/TablestoreSink.java  |  7 +++
 .../seatunnel/tdengine/sink/TDengineSink.java      |  7 +++
 .../seatunnel/typesense/sink/TypesenseSink.java    |  9 +++-
 .../connector/ConnectorSpecificationCheckTest.java | 12 +++++
 .../seatunnel/e2e/sink/inmemory/InMemorySink.java  |  5 ++
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java | 42 +++++++++++++++++
 .../seatunnel/engine/core/job/JobDAGInfo.java      | 39 +++++++++++++++
 .../seatunnel/engine/core/job/VertexInfo.java      |  4 ++
 .../seatunnel/engine/server/dag/DAGUtils.java      | 55 +++++++++++++++++++++-
 .../server/rest/RestHttpGetCommandProcessor.java   |  2 +-
 .../engine/server/rest/servlet/BaseServlet.java    |  3 +-
 .../apache/seatunnel/engine/server/TestUtils.java  | 26 +++++++---
 .../server/checkpoint/CheckpointPlanTest.java      | 27 ++++++++---
 .../seatunnel/engine/server/dag/TaskTest.java      | 27 ++++++++---
 .../spark/sink/SeaTunnelSinkWithBuffer.java        |  7 +++
 78 files changed, 830 insertions(+), 124 deletions(-)

diff --git a/docs/en/seatunnel-engine/rest-api-v1.md 
b/docs/en/seatunnel-engine/rest-api-v1.md
index 674542c23e..ec9d8f13b9 100644
--- a/docs/en/seatunnel-engine/rest-api-v1.md
+++ b/docs/en/seatunnel-engine/rest-api-v1.md
@@ -161,10 +161,18 @@ network:
   "jobStatus": "",
   "createTime": "",
   "jobDag": {
-    "vertices": [
+    "jobId": "",
+    "vertexInfoMap": [
+      {
+        "vertexId": 1,
+        "type": "",
+        "vertexName": "",
+        "tablePaths": [
+          ""
+        ]
+      }
     ],
-    "edges": [
-    ]
+    "pipelineEdges": {}
   },
   "metrics": {
     "sourceReceivedCount": "",
@@ -218,10 +226,18 @@ This API has been deprecated, please use 
/hazelcast/rest/maps/job-info/:jobId in
   "jobStatus": "",
   "createTime": "",
   "jobDag": {
-    "vertices": [
+    "jobId": "",
+    "vertexInfoMap": [
+      {
+        "vertexId": 1,
+        "type": "",
+        "vertexName": "",
+        "tablePaths": [
+          ""
+        ]
+      }
     ],
-    "edges": [
-    ]
+    "pipelineEdges": {}
   },
   "metrics": {
     "SourceReceivedCount": "",
@@ -289,7 +305,20 @@ When we can't get the job info, the response will be:
     "errorMsg": null,
     "createTime": "",
     "finishTime": "",
-    "jobDag": "",
+    "jobDag": {
+      "jobId": "",
+      "vertexInfoMap": [
+        {
+          "vertexId": 1,
+          "type": "",
+          "vertexName": "",
+          "tablePaths": [
+            ""
+          ]
+        }
+      ],
+      "pipelineEdges": {}
+    },
     "metrics": ""
   }
 ]
diff --git a/docs/en/seatunnel-engine/rest-api-v2.md 
b/docs/en/seatunnel-engine/rest-api-v2.md
index 643b4e51d8..e5b9d5d718 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -128,10 +128,18 @@ seatunnel:
   "jobStatus": "",
   "createTime": "",
   "jobDag": {
-    "vertices": [
+    "jobId": "",
+    "vertexInfoMap": [
+      {
+        "vertexId": 1,
+        "type": "",
+        "vertexName": "",
+        "tablePaths": [
+          ""
+        ]
+      }
     ],
-    "edges": [
-    ]
+    "pipelineEdges": {}
   },
   "metrics": {
     "sourceReceivedCount": "",
@@ -185,10 +193,18 @@ This API has been deprecated, please use /job-info/:jobId 
instead
   "jobStatus": "",
   "createTime": "",
   "jobDag": {
-    "vertices": [
+    "jobId": "",
+    "vertexInfoMap": [
+      {
+        "vertexId": 1,
+        "type": "",
+        "vertexName": "",
+        "tablePaths": [
+          ""
+        ]
+      }
     ],
-    "edges": [
-    ]
+    "pipelineEdges": {}
   },
   "metrics": {
     "SourceReceivedCount": "",
@@ -256,7 +272,20 @@ When we can't get the job info, the response will be:
     "errorMsg": null,
     "createTime": "",
     "finishTime": "",
-    "jobDag": "",
+    "jobDag": {
+      "jobId": "",
+      "vertexInfoMap": [
+        {
+          "vertexId": 1,
+          "type": "",
+          "vertexName": "",
+          "tablePaths": [
+            ""
+          ]
+        }
+      ],
+      "pipelineEdges": {}
+    },
     "metrics": ""
   }
 ]
diff --git a/docs/zh/seatunnel-engine/rest-api-v1.md 
b/docs/zh/seatunnel-engine/rest-api-v1.md
index 639a7318cd..5154922ec0 100644
--- a/docs/zh/seatunnel-engine/rest-api-v1.md
+++ b/docs/zh/seatunnel-engine/rest-api-v1.md
@@ -159,10 +159,18 @@ network:
   "jobStatus": "",
   "createTime": "",
   "jobDag": {
-    "vertices": [
+    "jobId": "",
+    "vertexInfoMap": [
+      {
+        "vertexId": 1,
+        "type": "",
+        "vertexName": "",
+        "tablePaths": [
+          ""
+        ]
+      }
     ],
-    "edges": [
-    ]
+    "pipelineEdges": {}
   },
   "metrics": {
     "SourceReceivedCount": "",
@@ -230,10 +238,18 @@ network:
   "jobStatus": "",
   "createTime": "",
   "jobDag": {
-    "vertices": [
+    "jobId": "",
+    "vertexInfoMap": [
+      {
+        "vertexId": 1,
+        "type": "",
+        "vertexName": "",
+        "tablePaths": [
+          ""
+        ]
+      }
     ],
-    "edges": [
-    ]
+    "pipelineEdges": {}
   },
   "metrics": {
     "sourceReceivedCount": "",
@@ -287,8 +303,20 @@ network:
     "errorMsg": null,
     "createTime": "",
     "finishTime": "",
-    "jobDag": "",
-    "metrics": ""
+    "jobDag": {
+      "jobId": "",
+      "vertexInfoMap": [
+        {
+          "vertexId": 1,
+          "type": "",
+          "vertexName": "",
+          "tablePaths": [
+            ""
+          ]
+        }
+      ],
+      "pipelineEdges": {}
+    },    "metrics": ""
   }
 ]
 ```
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md 
b/docs/zh/seatunnel-engine/rest-api-v2.md
index cdd27595bf..df884fa18e 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -124,10 +124,18 @@ seatunnel:
   "jobStatus": "",
   "createTime": "",
   "jobDag": {
-    "vertices": [
+    "jobId": "",
+    "vertexInfoMap": [
+      {
+        "vertexId": 1,
+        "type": "",
+        "vertexName": "",
+        "tablePaths": [
+          ""
+        ]
+      }
     ],
-    "edges": [
-    ]
+    "pipelineEdges": {}
   },
   "metrics": {
     "SourceReceivedCount": "",
@@ -195,10 +203,18 @@ seatunnel:
   "jobStatus": "",
   "createTime": "",
   "jobDag": {
-    "vertices": [
+    "jobId": "",
+    "vertexInfoMap": [
+      {
+        "vertexId": 1,
+        "type": "",
+        "vertexName": "",
+        "tablePaths": [
+          ""
+        ]
+      }
     ],
-    "edges": [
-    ]
+    "pipelineEdges": {}
   },
   "metrics": {
     "sourceReceivedCount": "",
@@ -252,7 +268,20 @@ seatunnel:
     "errorMsg": null,
     "createTime": "",
     "finishTime": "",
-    "jobDag": "",
+    "jobDag": {
+      "jobId": "",
+      "vertexInfoMap": [
+        {
+          "vertexId": 1,
+          "type": "",
+          "vertexName": "",
+          "tablePaths": [
+            ""
+          ]
+        }
+      ],
+      "pipelineEdges": {}
+    },
     "metrics": ""
   }
 ]
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
index cd869a3ca8..954bec748c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java
@@ -21,6 +21,7 @@ import 
org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.SeaTunnelJobAware;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
@@ -135,4 +136,13 @@ public interface SeaTunnelSink<IN, StateT, CommitInfoT, 
AggregatedCommitInfoT>
     default Optional<Serializer<AggregatedCommitInfoT>> 
getAggregatedCommitInfoSerializer() {
         return Optional.empty();
     }
+
+    /**
+     * Get the catalog table of the sink.
+     *
+     * @return Optional of catalog table.
+     */
+    default Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.empty();
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index 3db7a8b7d2..2c3e9c6582 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkCommonOptions;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -32,6 +33,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import lombok.Getter;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -157,7 +159,18 @@ public class MultiTableSink
     }
 
     public List<TablePath> getSinkTables() {
-        return 
sinks.keySet().stream().map(TablePath::of).collect(Collectors.toList());
+
+        List<TablePath> tablePaths = new ArrayList<>();
+        List<SeaTunnelSink> values = new ArrayList<>(sinks.values());
+        for (int i = 0; i < values.size(); i++) {
+            if (values.get(i).getWriteCatalogTable().isPresent()) {
+                tablePaths.add(
+                        ((CatalogTable) 
values.get(i).getWriteCatalogTable().get()).getTablePath());
+            } else {
+                tablePaths.add(TablePath.of(sinks.keySet().toArray(new 
String[0])[i]));
+            }
+        }
+        return tablePaths;
     }
 
     @Override
@@ -170,4 +183,9 @@ public class MultiTableSink
     public void setJobContext(JobContext jobContext) {
         sinks.values().forEach(sink -> sink.setJobContext(jobContext));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return SeaTunnelSink.super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSink.java
 
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSink.java
index d1d3701795..85ecb347a7 100644
--- 
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSink.java
+++ 
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSink.java
@@ -19,25 +19,29 @@ package 
org.apache.seatunnel.connectors.seatunnel.activemq.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
 import java.io.IOException;
+import java.util.Optional;
 
 public class ActivemqSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
     private final SeaTunnelRowType seaTunnelRowType;
     private final ReadonlyConfig pluginConfig;
+    private final CatalogTable catalogTable;
 
     @Override
     public String getPluginName() {
         return "ActiveMQ";
     }
 
-    public ActivemqSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) 
{
+    public ActivemqSink(ReadonlyConfig pluginConfig, CatalogTable 
catalogTable) {
         this.pluginConfig = pluginConfig;
-        this.seaTunnelRowType = rowType;
+        this.catalogTable = catalogTable;
+        this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
     }
 
     @Override
@@ -45,4 +49,9 @@ public class ActivemqSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
             throws IOException {
         return new ActivemqSinkWriter(pluginConfig, seaTunnelRowType);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.of(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
 
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
index 7f0dca38f6..ec40d648ae 100644
--- 
a/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-activemq/src/main/java/org/apache/seatunnel/connectors/seatunnel/activemq/sink/ActivemqSinkFactory.java
@@ -75,9 +75,6 @@ public class ActivemqSinkFactory implements TableSinkFactory {
 
     @Override
     public TableSink createSink(TableSinkFactoryContext context) {
-        return () ->
-                new ActivemqSink(
-                        context.getOptions(),
-                        
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
+        return () -> new ActivemqSink(context.getOptions(), 
context.getCatalogTable());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
index f26b388cd7..68dcc84a42 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,6 +37,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
@@ -85,4 +87,9 @@ public class AmazonDynamoDBSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
             throws IOException {
         return new AmazonDynamoDBWriter(amazondynamodbSourceOptions, rowType);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
index 04cbdba2a5..9952217f07 100644
--- 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
+++ 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSink.java
@@ -19,25 +19,29 @@ package 
org.apache.seatunnel.connectors.seatunnel.amazonsqs.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
 import java.io.IOException;
+import java.util.Optional;
 
 public class AmazonSqsSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
-    private SeaTunnelRowType typeInfo;
-    private ReadonlyConfig pluginConfig;
+    private final SeaTunnelRowType typeInfo;
+    private final ReadonlyConfig pluginConfig;
+    private final CatalogTable catalogTable;
 
     @Override
     public String getPluginName() {
         return "AmazonSqs";
     }
 
-    public AmazonSqsSink(ReadonlyConfig pluginConfig, SeaTunnelRowType 
typeInfo) {
-        this.typeInfo = typeInfo;
+    public AmazonSqsSink(ReadonlyConfig pluginConfig, CatalogTable 
catalogTable) {
+        this.typeInfo = catalogTable.getTableSchema().toPhysicalRowDataType();
         this.pluginConfig = pluginConfig;
+        this.catalogTable = catalogTable;
     }
 
     @Override
@@ -45,4 +49,9 @@ public class AmazonSqsSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
             throws IOException {
         return new AmazonSqsSinkWriter(typeInfo, pluginConfig);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.of(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
index 81ccf9a37b..030e9d221a 100644
--- 
a/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-amazonsqs/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazonsqs/sink/AmazonSqsSinkFactory.java
@@ -41,8 +41,7 @@ public class AmazonSqsSinkFactory implements TableSinkFactory 
{
     public TableSink createSink(TableSinkFactoryContext context) {
         ReadonlyConfig config = context.getOptions();
         CatalogTable catalogTable = context.getCatalogTable();
-        return () ->
-                new AmazonSqsSink(config, 
catalogTable.getTableSchema().toPhysicalRowDataType());
+        return () -> new AmazonSqsSink(config, catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index 927943adc2..e84b6fbcb2 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -39,6 +39,7 @@ import com.google.common.base.Throwables;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.CATALOG_TABLE_RULES;
@@ -56,6 +57,7 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
     private final AssertTableRule assertTableRule;
     private final Map<String, AssertCatalogTableRule> assertCatalogTableRule;
     private final String catalogTableName;
+    private final CatalogTable catalogTable;
 
     public AssertSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
         this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
@@ -93,6 +95,7 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
                     new ConfigException.BadValue(
                             RULES.key(), "Assert rule config is empty, please 
add rule config."));
         }
+        this.catalogTable = catalogTable;
     }
 
     private void initTableRule(CatalogTable catalogTable, Config tableConfig, 
String tableName) {
@@ -130,4 +133,9 @@ public class AssertSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
     public String getPluginName() {
         return "Assert";
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.of(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
index 09a50b8c95..9b37d94266 100644
--- 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -43,6 +44,7 @@ import com.google.auto.service.AutoService;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST;
 import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE;
@@ -122,4 +124,9 @@ public class CassandraSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
             throws IOException {
         return new CassandraSinkWriter(cassandraParameters, seaTunnelRowType, 
tableSchema);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index cc63179f16..bb445d4282 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -212,4 +213,9 @@ public class ClickhouseFileSink
     public Optional<Serializer<CKFileAggCommitInfo>> 
getAggregatedCommitInfoSerializer() {
         return Optional.of(new DefaultSerializer<>());
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return SeaTunnelSink.super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 62ebab6a9f..6e7b78e195 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -20,10 +20,13 @@ package 
org.apache.seatunnel.connectors.seatunnel.console.sink;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 
+import java.util.Optional;
+
 import static 
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DATA;
 import static 
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DELAY;
 
@@ -32,11 +35,13 @@ public class ConsoleSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
     private final SeaTunnelRowType seaTunnelRowType;
     private final boolean isPrintData;
     private final int delayMs;
+    private final CatalogTable catalogTable;
 
-    public ConsoleSink(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig 
options) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public ConsoleSink(CatalogTable catalogTable, ReadonlyConfig options) {
+        this.catalogTable = catalogTable;
         this.isPrintData = options.get(LOG_PRINT_DATA);
         this.delayMs = options.get(LOG_PRINT_DELAY);
+        this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
     }
 
     @Override
@@ -48,4 +53,9 @@ public class ConsoleSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
     public String getPluginName() {
         return "Console";
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
index fa5c7deae9..72987032bc 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
@@ -62,9 +62,6 @@ public class ConsoleSinkFactory implements TableSinkFactory {
     @Override
     public TableSink createSink(TableSinkFactoryContext context) {
         ReadonlyConfig options = context.getOptions();
-        return () ->
-                new ConsoleSink(
-                        
context.getCatalogTable().getTableSchema().toPhysicalRowDataType(),
-                        options);
+        return () -> new ConsoleSink(context.getCatalogTable(), options);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
 
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
index 8eeffb3aca..b22c236e7a 100644
--- 
a/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
+++ 
b/seatunnel-connectors-v2/connector-datahub/src/main/java/org/apache/seatunnel/connectors/seatunnel/datahub/sink/DataHubSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter.Context;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -35,6 +36,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.datahub.exception.DataHubConnec
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.datahub.config.DataHubConfig.ACCESS_ID;
 import static 
org.apache.seatunnel.connectors.seatunnel.datahub.config.DataHubConfig.ACCESS_KEY;
@@ -93,4 +95,9 @@ public class DataHubSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
                 pluginConfig.getInt(TIMEOUT.key()),
                 pluginConfig.getInt(RETRY_TIMES.key()));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
 
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
index 8d9d4b973e..42ec1e688a 100644
--- 
a/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
+++ 
b/seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter.Context;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -32,6 +33,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.exception.DingTalkConnectorExce
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.SECRET;
 import static 
org.apache.seatunnel.connectors.seatunnel.config.DingTalkConfig.URL;
@@ -73,4 +75,9 @@ public class DingTalkSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         return new DingTalkWriter(
                 pluginConfig.getString(URL.key()), 
pluginConfig.getString(SECRET.key()));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index c746fea00c..c0a9a2a5a1 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -133,4 +133,9 @@ public class DorisSink
                         catalogTable,
                         config.get(DorisOptions.CUSTOM_SQL)));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.of(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
index ad515aeeb7..786ab56e2c 100644
--- 
a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
+++ 
b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSink.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.druid.config.DruidConfig.BATCH_SIZE;
 import static 
org.apache.seatunnel.connectors.druid.config.DruidConfig.COORDINATOR_URL;
@@ -58,4 +59,9 @@ public class DruidSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
                 config.get(DATASOURCE),
                 config.get(BATCH_SIZE));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
index d96f7eb6b5..0eb4fbcd48 100644
--- 
a/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
+++ 
b/seatunnel-connectors-v2/connector-easysearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/easysearch/sink/EasysearchSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchAggregatedCommitInfo;
@@ -30,6 +31,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.easysearch.state.EasysearchSink
 
 import com.google.auto.service.AutoService;
 
+import java.util.Optional;
+
 import static 
org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_BATCH_SIZE;
 import static 
org.apache.seatunnel.connectors.seatunnel.easysearch.config.SinkConfig.MAX_RETRY_COUNT;
 
@@ -75,4 +78,9 @@ public class EasysearchSink
         return new EasysearchSinkWriter(
                 context, seaTunnelRowType, pluginConfig, maxBatchSize, 
maxRetryCount);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return SeaTunnelSink.super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
index fed65733e0..ffe2b0520b 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java
@@ -95,4 +95,9 @@ public class ElasticsearchSink
                 new DefaultSaveModeHandler(
                         schemaSaveMode, dataSaveMode, catalog, tablePath, 
null, null));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
index 0a3df90a12..24f9c2295f 100644
--- 
a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
+++ 
b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSink.java
@@ -27,13 +27,17 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import org.apache.seatunnel.connectors.seatunnel.email.config.EmailConfig;
 import org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkConfig;
 
+import lombok.Getter;
+
+import java.util.Optional;
+
 public class EmailSink extends AbstractSimpleSink<SeaTunnelRow, Void>
         implements SupportMultiTableSink {
 
-    private SeaTunnelRowType seaTunnelRowType;
-    private ReadonlyConfig readonlyConfig;
-    private CatalogTable catalogTable;
-    private EmailSinkConfig pluginConfig;
+    private final SeaTunnelRowType seaTunnelRowType;
+    @Getter private ReadonlyConfig readonlyConfig;
+    private final CatalogTable catalogTable;
+    private final EmailSinkConfig pluginConfig;
 
     public EmailSink(ReadonlyConfig config, CatalogTable table) {
         this.readonlyConfig = config;
@@ -51,4 +55,9 @@ public class EmailSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
     public String getPluginName() {
         return EmailConfig.CONNECTOR_IDENTITY;
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.of(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
index 9a1885da4f..8783c268ba 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -33,6 +34,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
 
 import com.google.auto.service.AutoService;
 
+import java.util.Optional;
+
 @AutoService(SeaTunnelSink.class)
 public class CosFileSink extends BaseFileSink {
     @Override
@@ -60,4 +63,9 @@ public class CosFileSink extends BaseFileSink {
         }
         hadoopConf = CosConf.buildWithConfig(pluginConfig);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
index f4b271e035..ac481be25b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
@@ -23,7 +23,12 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
 
+import java.util.Optional;
+
 public class FtpFileSink extends BaseMultipleTableFileSink {
+
+    private final CatalogTable catalogTable;
+
     @Override
     public String getPluginName() {
         return FileSystemType.FTP.getFileSystemPluginName();
@@ -31,5 +36,11 @@ public class FtpFileSink extends BaseMultipleTableFileSink {
 
     public FtpFileSink(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
         super(FtpConf.buildWithConfig(readonlyConfig), readonlyConfig, 
catalogTable);
+        this.catalogTable = catalogTable;
+    }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
index 26c0f4f049..5e098ea2d2 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
@@ -21,10 +21,13 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 
 import com.google.auto.service.AutoService;
 
+import java.util.Optional;
+
 @AutoService(SeaTunnelSink.class)
 public class HdfsFileSink extends BaseHdfsFileSink {
 
@@ -37,4 +40,9 @@ public class HdfsFileSink extends BaseHdfsFileSink {
     public void prepare(Config pluginConfig) throws PrepareFailException {
         super.prepare(pluginConfig);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
index ac6ee94992..03663d5c76 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -33,6 +34,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
 
 import com.google.auto.service.AutoService;
 
+import java.util.Optional;
+
 @AutoService(SeaTunnelSink.class)
 public class OssFileSink extends BaseFileSink {
     @Override
@@ -60,4 +63,9 @@ public class OssFileSink extends BaseFileSink {
         }
         hadoopConf = OssConf.buildWithConfig(pluginConfig);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
index 94741941bf..4042843d50 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
@@ -23,14 +23,24 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import 
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
 
+import java.util.Optional;
+
 public class LocalFileSink extends BaseMultipleTableFileSink {
 
+    private final CatalogTable catalogTable;
+
     public LocalFileSink(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
         super(new LocalFileHadoopConf(), readonlyConfig, catalogTable);
+        this.catalogTable = catalogTable;
     }
 
     @Override
     public String getPluginName() {
         return FileSystemType.LOCAL.getFileSystemPluginName();
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
index 8f303b6a45..67a17fc954 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -33,6 +34,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
 
 import com.google.auto.service.AutoService;
 
+import java.util.Optional;
+
 @AutoService(SeaTunnelSink.class)
 public class ObsFileSink extends BaseFileSink {
     @Override
@@ -60,4 +63,9 @@ public class ObsFileSink extends BaseFileSink {
         }
         hadoopConf = ObsConf.buildWithConfig(pluginConfig);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
index de4726fd5c..11a3df2942 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
@@ -23,13 +23,24 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssHadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
 
+import java.util.Optional;
+
 public class OssFileSink extends BaseMultipleTableFileSink {
+
+    private final CatalogTable catalogTable;
+
     public OssFileSink(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
         super(OssHadoopConf.buildWithConfig(readonlyConfig), readonlyConfig, 
catalogTable);
+        this.catalogTable = catalogTable;
     }
 
     @Override
     public String getPluginName() {
         return FileSystemType.OSS.getFileSystemPluginName();
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
index 2a636bcbcc..b0b6d9fbbb 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
@@ -44,8 +44,8 @@ import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory
 
 public class S3FileSink extends BaseMultipleTableFileSink implements 
SupportSaveMode {
 
-    private CatalogTable catalogTable;
-    private ReadonlyConfig readonlyConfig;
+    private final CatalogTable catalogTable;
+    private final ReadonlyConfig readonlyConfig;
 
     private static final String S3 = "S3";
 
@@ -89,4 +89,9 @@ public class S3FileSink extends BaseMultipleTableFileSink 
implements SupportSave
                 new DefaultSaveModeHandler(
                         schemaSaveMode, dataSaveMode, catalog, catalogTable, 
null));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
index dd3f1080b8..415ae3a5d6 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
@@ -23,13 +23,24 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
 
+import java.util.Optional;
+
 public class SftpFileSink extends BaseMultipleTableFileSink {
+
+    private final CatalogTable catalogTable;
+
     public SftpFileSink(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
         super(SftpConf.buildWithConfig(readonlyConfig), readonlyConfig, 
catalogTable);
+        this.catalogTable = catalogTable;
     }
 
     @Override
     public String getPluginName() {
         return FileSystemType.SFTP.getFileSystemPluginName();
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
 
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
index ab7c02057d..6149ba9358 100644
--- 
a/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
+++ 
b/seatunnel-connectors-v2/connector-google-firestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/firestore/sink/FirestoreSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,6 +37,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.google.firestore.exception.Fire
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.COLLECTION;
 import static 
org.apache.seatunnel.connectors.seatunnel.google.firestore.config.FirestoreConfig.PROJECT_ID;
@@ -76,4 +78,9 @@ public class FirestoreSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
             throws IOException {
         return new FirestoreSinkWriter(rowType, firestoreParameters);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index 0a46b1baef..e8d7b8b205 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -52,15 +52,15 @@ public class HbaseSink
                 SupportMultiTableSink,
                 SupportSaveMode {
 
-    private ReadonlyConfig config;
+    private final ReadonlyConfig config;
 
-    private CatalogTable catalogTable;
+    private final CatalogTable catalogTable;
 
     private final HbaseParameters hbaseParameters;
 
-    private SeaTunnelRowType seaTunnelRowType;
+    private final SeaTunnelRowType seaTunnelRowType;
 
-    private List<Integer> rowkeyColumnIndexes = new ArrayList<>();
+    private final List<Integer> rowkeyColumnIndexes = new ArrayList<>();
 
     private int versionColumnIndex = -1;
 
@@ -110,4 +110,9 @@ public class HbaseSink
                 new DefaultSaveModeHandler(
                         schemaSaveMode, dataSaveMode, catalog, tablePath, 
null, null));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index b5602c13f8..997c42f9fa 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -244,4 +244,9 @@ public class HiveSink
         }
         return writeStrategy;
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
index 9dfe688c11..5ac6b927ca 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -34,15 +35,17 @@ import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorExc
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class HttpSink extends AbstractSimpleSink<SeaTunnelRow, Void>
         implements SupportMultiTableSink {
     protected final HttpParameter httpParameter = new HttpParameter();
+    protected CatalogTable catalogTable;
     protected SeaTunnelRowType seaTunnelRowType;
     protected Config pluginConfig;
 
-    public HttpSink(Config pluginConfig, SeaTunnelRowType rowType) {
+    public HttpSink(Config pluginConfig, CatalogTable catalogTable) {
         this.pluginConfig = pluginConfig;
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HttpConfig.URL.key());
         if (!result.isSuccess()) {
@@ -71,7 +74,8 @@ public class HttpSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
                                             entry -> 
String.valueOf(entry.getValue().unwrapped()),
                                             (v1, v2) -> v2)));
         }
-        this.seaTunnelRowType = rowType;
+        this.catalogTable = catalogTable;
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
     }
 
     @Override
@@ -83,4 +87,9 @@ public class HttpSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
     public HttpSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
         return new HttpSinkWriter(seaTunnelRowType, httpParameter);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
index 313d26dd3f..6ed6765d57 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
@@ -38,8 +38,7 @@ public class HttpSinkFactory implements TableSinkFactory {
     @Override
     public TableSink createSink(TableSinkFactoryContext context) {
         CatalogTable catalogTable = context.getCatalogTable();
-        return () ->
-                new HttpSink(context.getOptions().toConfig(), 
catalogTable.getSeaTunnelRowType());
+        return () -> new HttpSink(context.getOptions().toConfig(), 
catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java
index b3fbaa6a5b..25af9636c8 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSink.java
@@ -19,16 +19,23 @@ package 
org.apache.seatunnel.connectors.seatunnel.feishu.sink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
 
+import java.util.Optional;
+
 public class FeishuSink extends HttpSink {
-    public FeishuSink(Config pluginConfig, SeaTunnelRowType rowType) {
-        super(pluginConfig, rowType);
+    public FeishuSink(Config pluginConfig, CatalogTable catalogTable) {
+        super(pluginConfig, catalogTable);
     }
 
     @Override
     public String getPluginName() {
         return "Feishu";
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
index f9cd6ee01c..3052ba78d1 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-feishu/src/main/java/org/apache/seatunnel/connectors/seatunnel/feishu/sink/FeishuSinkFactory.java
@@ -31,8 +31,7 @@ public class FeishuSinkFactory extends HttpSinkFactory {
     @Override
     public TableSink createSink(TableSinkFactoryContext context) {
         CatalogTable catalogTable = context.getCatalogTable();
-        return () ->
-                new FeishuSink(context.getOptions().toConfig(), 
catalogTable.getSeaTunnelRowType());
+        return () -> new FeishuSink(context.getOptions().toConfig(), 
catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
index f438167c39..a3e910b620 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-wechat/src/main/java/org/apache/seatunnel/connectors/seatunnel/wechat/sink/WeChatSink.java
@@ -20,15 +20,17 @@ package 
org.apache.seatunnel.connectors.seatunnel.wechat.sink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSink;
 import org.apache.seatunnel.connectors.seatunnel.http.sink.HttpSinkWriter;
 import 
org.apache.seatunnel.connectors.seatunnel.wechat.sink.config.WeChatSinkConfig;
 
+import java.util.Optional;
+
 public class WeChatSink extends HttpSink {
 
-    public WeChatSink(Config pluginConfig, SeaTunnelRowType rowType) {
-        super(pluginConfig, rowType);
+    public WeChatSink(Config pluginConfig, CatalogTable catalogTable) {
+        super(pluginConfig, catalogTable);
     }
 
     @Override
@@ -44,4 +46,9 @@ public class WeChatSink extends HttpSink {
                 new WeChatBotMessageSerializationSchema(
                         new WeChatSinkConfig(pluginConfig), seaTunnelRowType));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
index 5bdc3b8c3a..13c245336a 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
@@ -143,4 +143,9 @@ public class HudiSink
                         catalogTable,
                         null));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
index 417d92ac49..bdced4b52b 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSink.java
@@ -55,7 +55,7 @@ public class IcebergSink
                         IcebergAggregatedCommitInfo>,
                 SupportSaveMode,
                 SupportMultiTableSink {
-    private static String PLUGIN_NAME = "Iceberg";
+    private static final String PLUGIN_NAME = "Iceberg";
     private final SinkConfig config;
     private final ReadonlyConfig readonlyConfig;
     private final CatalogTable catalogTable;
@@ -133,4 +133,9 @@ public class IcebergSink
                         catalogTable,
                         null));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
index 4d940f63cc..840379f7bd 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
@@ -26,12 +26,14 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
 
 import java.io.IOException;
+import java.util.Optional;
 
 public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void>
         implements SupportMultiTableSink {
 
-    private SeaTunnelRowType seaTunnelRowType;
-    private SinkConfig sinkConfig;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final SinkConfig sinkConfig;
+    private final CatalogTable catalogTable;
 
     @Override
     public String getPluginName() {
@@ -41,10 +43,16 @@ public class InfluxDBSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
     public InfluxDBSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
         this.sinkConfig = sinkConfig;
         this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
+        this.catalogTable = catalogTable;
     }
 
     @Override
     public InfluxDBSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
         return new InfluxDBSinkWriter(sinkConfig, seaTunnelRowType);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
index 263ab04973..65c29d57cb 100644
--- 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -34,6 +35,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorE
 
 import com.google.auto.service.AutoService;
 
+import java.util.Optional;
+
 import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.KEY_DEVICE;
 import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.NODE_URLS;
 import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.PASSWORD;
@@ -78,4 +81,9 @@ public class IoTDBSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
         return new IoTDBSinkWriter(pluginConfig, seaTunnelRowType);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 2ccfba19f2..b35150900f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -237,4 +237,9 @@ public class JdbcSink
         }
         return Optional.empty();
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index e7945d9ed1..4deb30f547 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
@@ -43,10 +44,12 @@ public class KafkaSink
 
     private final ReadonlyConfig pluginConfig;
     private final SeaTunnelRowType seaTunnelRowType;
+    private final CatalogTable catalogTable;
 
-    public KafkaSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
+    public KafkaSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
         this.pluginConfig = pluginConfig;
-        this.seaTunnelRowType = rowType;
+        this.catalogTable = catalogTable;
+        this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
     }
 
     @Override
@@ -81,4 +84,9 @@ public class KafkaSink
     public String getPluginName() {
         return 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONNECTOR_IDENTITY;
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index fe6965132d..ed3278602a 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -50,9 +50,6 @@ public class KafkaSinkFactory implements TableSinkFactory {
 
     @Override
     public TableSink createSink(TableSinkFactoryContext context) {
-        return () ->
-                new KafkaSink(
-                        context.getOptions(),
-                        
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
+        return () -> new KafkaSink(context.getOptions(), 
context.getCatalogTable());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
index def4a2b366..d56a08db43 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
@@ -30,6 +30,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /**
  * Kudu Sink implementation by using SeaTunnel sink API. This class contains 
the method to create
@@ -40,11 +41,14 @@ public class KuduSink
                         SeaTunnelRow, KuduSinkState, KuduCommitInfo, 
KuduAggregatedCommitInfo>,
                 SupportMultiTableSink {
 
-    private KuduSinkConfig kuduSinkConfig;
-    private SeaTunnelRowType seaTunnelRowType;
+    private final KuduSinkConfig kuduSinkConfig;
+    private final SeaTunnelRowType seaTunnelRowType;
+
+    private final CatalogTable catalogTable;
 
     public KuduSink(KuduSinkConfig kuduSinkConfig, CatalogTable catalogTable) {
         this.kuduSinkConfig = kuduSinkConfig;
+        this.catalogTable = catalogTable;
         this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
     }
 
@@ -57,4 +61,9 @@ public class KuduSink
     public KuduSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
         return new KuduSinkWriter(seaTunnelRowType, kuduSinkConfig);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
index 6abce7e417..91e8c12dca 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -33,6 +34,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.auto.service.AutoService;
 
+import java.util.Optional;
+
 import static 
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
 
 @AutoService(SeaTunnelSink.class)
@@ -61,4 +64,9 @@ public class MaxcomputeSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
         return new MaxcomputeWriter(this.pluginConfig, this.typeInfo);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
index 2015be1973..10f4b6ca69 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/sink/MilvusSink.java
@@ -112,4 +112,9 @@ public class MilvusSink
                         catalogTable,
                         null));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
index 26f127509e..c3af6c7a90 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
@@ -29,6 +30,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.PLUGIN_NAME;
 
@@ -58,4 +60,9 @@ public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, 
Void, Void, Void>
             throws IOException {
         return new Neo4jSinkWriter(neo4JSinkQueryInfo, rowType);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return SeaTunnelSink.super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
index fbf04a5038..73d2151b89 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java
@@ -158,4 +158,9 @@ public class PaimonSink
     public void setLoadTable(Table table) {
         this.table = table;
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
index 05a007df9a..989e24b024 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
@@ -47,13 +48,15 @@ public class PulsarSink
         implements SeaTunnelSink<
                 SeaTunnelRow, PulsarSinkState, PulsarCommitInfo, 
PulsarAggregatedCommitInfo> {
 
-    private SeaTunnelRowType seaTunnelRowType;
-    private PulsarClientConfig clientConfig;
-    private ReadonlyConfig readonlyConfig;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final PulsarClientConfig clientConfig;
+    private final ReadonlyConfig readonlyConfig;
+    private final CatalogTable catalogTable;
 
-    public PulsarSink(ReadonlyConfig readonlyConfig, SeaTunnelRowType 
seaTunnelRowType) {
+    public PulsarSink(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
         this.readonlyConfig = readonlyConfig;
-        this.seaTunnelRowType = seaTunnelRowType;
+        this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
+        this.catalogTable = catalogTable;
 
         /** client config */
         PulsarClientConfig.Builder clientConfigBuilder =
@@ -96,4 +99,9 @@ public class PulsarSink
     public String getPluginName() {
         return PulsarConfigUtil.IDENTIFIER;
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
index c5b13e2876..7781ba7b94 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java
@@ -53,9 +53,6 @@ public class PulsarSinkFactory implements TableSinkFactory {
 
     @Override
     public TableSink createSink(TableSinkFactoryContext context) {
-        return () ->
-                new PulsarSink(
-                        context.getOptions(),
-                        
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
+        return () -> new PulsarSink(context.getOptions(), 
context.getCatalogTable());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java
 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java
index 85119032c8..16904903e5 100644
--- 
a/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java
+++ 
b/seatunnel-connectors-v2/connector-qdrant/src/main/java/org/apache/seatunnel/connectors/seatunnel/qdrant/sink/QdrantSink.java
@@ -27,6 +27,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.qdrant.config.QdrantParameters;
 
 import java.io.IOException;
+import java.util.Optional;
 
 public class QdrantSink extends AbstractSimpleSink<SeaTunnelRow, Void>
         implements SupportMultiTableSink {
@@ -47,4 +48,9 @@ public class QdrantSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
     public QdrantSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
         return new QdrantSinkWriter(catalogTable, qdrantParameters);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
index 1dcd792404..7d4f26272b 100644
--- 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,6 +37,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConn
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
 import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
@@ -88,4 +90,9 @@ public class RabbitmqSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
             throws IOException {
         return new RabbitmqSinkWriter(rabbitMQConfig, seaTunnelRowType);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
index a87ee1ebf7..ddb1901205 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSink.java
@@ -28,13 +28,14 @@ import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
 
 import java.io.IOException;
+import java.util.Optional;
 
 public class RedisSink extends AbstractSimpleSink<SeaTunnelRow, Void>
         implements SupportMultiTableSink {
     private final RedisParameters redisParameters = new RedisParameters();
-    private SeaTunnelRowType seaTunnelRowType;
-    private ReadonlyConfig readonlyConfig;
-    private CatalogTable catalogTable;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final ReadonlyConfig readonlyConfig;
+    private final CatalogTable catalogTable;
 
     public RedisSink(ReadonlyConfig config, CatalogTable table) {
         this.readonlyConfig = config;
@@ -52,4 +53,9 @@ public class RedisSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
     public RedisSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
         return new RedisSinkWriter(seaTunnelRowType, redisParameters);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
index bf81adf0a7..9fda05dece 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -38,6 +39,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConn
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED;
@@ -161,4 +163,9 @@ public class RocketMqSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
             throws IOException {
         return new RocketMqSinkWriter(producerMetadata, seaTunnelRowType);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
index f4ef83d0b2..33222116cc 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -140,4 +141,9 @@ public class SelectDBSink
     public Optional<Serializer<SelectDBCommitInfo>> 
getAggregatedCommitInfoSerializer() {
         return Optional.empty();
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return SeaTunnelSink.super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
 
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
index 0a109dfa41..1298715633 100644
--- 
a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
+++ 
b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -34,6 +35,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.sentry.exception.SentryConnecto
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /** @description: SentrySink class */
 @AutoService(SeaTunnelSink.class)
@@ -72,4 +74,9 @@ public class SentrySink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
             throws IOException {
         return new SentrySinkWriter(seaTunnelRowType, pluginConfig);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
index 91bee53eb5..e4b6ad2766 100644
--- 
a/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
+++ 
b/seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,6 +37,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.slack.exception.SlackConnectorE
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /** Slack sink class */
 @AutoService(SeaTunnelSink.class)
@@ -77,4 +79,9 @@ public class SlackSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         }
         this.pluginConfig = pluginConfig;
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
index 222947f72e..87bff65a9a 100644
--- 
a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
+++ 
b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/sink/SocketSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,6 +37,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnecto
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.HOST;
 import static 
org.apache.seatunnel.connectors.seatunnel.socket.config.SocketSinkConfigOptions.PORT;
@@ -75,4 +77,9 @@ public class SocketSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
             throws IOException {
         return new SocketSinkWriter(sinkConfig, seaTunnelRowType);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index 35c9ed9e37..61f279fc12 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -40,10 +40,10 @@ import java.util.Optional;
 public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
         implements SupportSaveMode {
 
-    private SeaTunnelRowType seaTunnelRowType;
+    private final SeaTunnelRowType seaTunnelRowType;
     private final SinkConfig sinkConfig;
-    private DataSaveMode dataSaveMode;
-    private SchemaSaveMode schemaSaveMode;
+    private final DataSaveMode dataSaveMode;
+    private final SchemaSaveMode schemaSaveMode;
     private final CatalogTable catalogTable;
 
     public StarRocksSink(
@@ -88,4 +88,9 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
                         catalogTable,
                         sinkConfig.getCustomSql()));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.of(catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
index 9a16e6aeb1..2656263850 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -36,6 +37,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.tablestore.exception.Tablestore
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID;
 import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET;
@@ -87,4 +89,9 @@ public class TablestoreSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
             throws IOException {
         return new TablestoreWriter(tablestoreOptions, rowType);
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
index 82d1069f7c..194c94dda5 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSink.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
@@ -29,6 +30,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import com.google.auto.service.AutoService;
 
 import java.io.IOException;
+import java.util.Optional;
 
 @AutoService(SeaTunnelSink.class)
 public class TDengineSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
@@ -55,4 +57,9 @@ public class TDengineSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     public String getPluginName() {
         return "TDengine";
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return super.getWriteCatalogTable();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/sink/TypesenseSink.java
 
b/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/sink/TypesenseSink.java
index e52638f83e..fe4d7190c5 100644
--- 
a/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/sink/TypesenseSink.java
+++ 
b/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/sink/TypesenseSink.java
@@ -51,8 +51,8 @@ public class TypesenseSink
                 SupportMultiTableSink,
                 SupportSaveMode {
 
-    private ReadonlyConfig config;
-    private CatalogTable catalogTable;
+    private final ReadonlyConfig config;
+    private final CatalogTable catalogTable;
     private final int maxBatchSize;
     private final int maxRetryCount;
 
@@ -93,4 +93,9 @@ public class TypesenseSink
                 new DefaultSaveModeHandler(
                         schemaSaveMode, dataSaveMode, catalog, tablePath, 
null, null));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.of(catalogTable);
+    }
 }
diff --git 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
index 3628a5dce6..59c4bf67f5 100644
--- 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
+++ 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
@@ -142,6 +142,8 @@ public class ConnectorSpecificationCheckTest {
                                 sinkClass, "setTypeInfo", 
SeaTunnelRowType.class);
                 Optional<Method> getConsumedType =
                         ReflectionUtils.getDeclaredMethod(sinkClass, 
"getConsumedType");
+                Optional<Method> getWriteCatalogTable =
+                        ReflectionUtils.getDeclaredMethod(sinkClass, 
"getWriteCatalogTable");
                 Assertions.assertFalse(
                         prepare.isPresent(),
                         "Please remove `prepare` method in " + 
sinkClass.getSimpleName());
@@ -151,6 +153,16 @@ public class ConnectorSpecificationCheckTest {
                 Assertions.assertFalse(
                         getConsumedType.isPresent(),
                         "Please remove `getConsumedType` method in " + 
sinkClass.getSimpleName());
+                Assertions.assertTrue(
+                        getWriteCatalogTable.isPresent(),
+                        "Please implement `getWriteCatalogTable` method in "
+                                + sinkClass.getSimpleName());
+                Assertions.assertEquals(
+                        Optional.class,
+                        getWriteCatalogTable.get().getReturnType(),
+                        "The `getWriteCatalogTable` method should return 
Optional<CatalogTable> in "
+                                + sinkClass.getSimpleName());
+
                 log.info(
                         "Check sink connector {} successfully", 
factory.getClass().getSimpleName());
 
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
index 9e3852fb3c..ff48999461 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
@@ -80,4 +80,9 @@ public class InMemorySink
     public Optional<SaveModeHandler> getSaveModeHandler() {
         return Optional.of(new InMemorySaveModeHandler(catalogTable));
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 378da4df17..8e5b15cc3d 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -47,6 +47,8 @@ import java.util.concurrent.TimeUnit;
 import static io.restassured.RestAssured.given;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.CONTEXT_PATH;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.notNullValue;
 
@@ -327,6 +329,46 @@ public class RestApiIT {
                                                                 + 
batchJobProxy.getJobId())
                                                 .then()
                                                 .statusCode(200)
+                                                .body(
+                                                        "jobDag.jobId",
+                                                        equalTo(
+                                                                Long.toString(
+                                                                        
batchJobProxy.getJobId())))
+                                                .body("jobDag.pipelineEdges", 
hasKey("1"))
+                                                
.body("jobDag.pipelineEdges['1']", hasSize(1))
+                                                .body(
+                                                        
"jobDag.pipelineEdges['1'][0].inputVertexId",
+                                                        equalTo("1"))
+                                                .body(
+                                                        
"jobDag.pipelineEdges['1'][0].targetVertexId",
+                                                        equalTo("2"))
+                                                .body("jobDag.vertexInfoMap", 
hasSize(2))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[0].vertexId",
+                                                        equalTo(1))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[0].type",
+                                                        equalTo("source"))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[0].vertexName",
+                                                        equalTo(
+                                                                "pipeline-1 
[Source[0]-FakeSource]"))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[0].tablePaths[0]",
+                                                        equalTo("fake"))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[1].vertexId",
+                                                        equalTo(2))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[1].type",
+                                                        equalTo("sink"))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[1].vertexName",
+                                                        equalTo(
+                                                                "pipeline-1 
[Sink[0]-console-MultiTableSink]"))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[1].tablePaths[0]",
+                                                        equalTo("fake"))
                                                 .body("jobName", 
equalTo("fake_to_console"))
                                                 .body("jobStatus", 
equalTo("FINISHED"));
                                     });
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
index ecb797f80a..ee6326acbd 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.engine.core.job;
 
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import com.hazelcast.internal.json.JsonArray;
+import com.hazelcast.internal.json.JsonObject;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -32,4 +36,39 @@ public class JobDAGInfo implements Serializable {
     Long jobId;
     Map<Integer, List<Edge>> pipelineEdges;
     Map<Long, VertexInfo> vertexInfoMap;
+
+    public JsonObject toJsonObject() {
+        JsonObject pipelineEdgesJsonObject = new JsonObject();
+
+        for (Map.Entry<Integer, List<Edge>> entry : pipelineEdges.entrySet()) {
+            JsonArray jsonArray = new JsonArray();
+            for (Edge edge : entry.getValue()) {
+                JsonObject edgeJsonObject = new JsonObject();
+                edgeJsonObject.add("inputVertexId", 
edge.getInputVertexId().toString());
+                edgeJsonObject.add("targetVertexId", 
edge.getTargetVertexId().toString());
+                jsonArray.add(edgeJsonObject);
+            }
+            pipelineEdgesJsonObject.add(entry.getKey().toString(), jsonArray);
+        }
+
+        JsonObject jsonObject = new JsonObject();
+        jsonObject.add("jobId", jobId.toString());
+        jsonObject.add("pipelineEdges", pipelineEdgesJsonObject);
+        JsonArray vertexInfoMapString = new JsonArray();
+        for (Map.Entry<Long, VertexInfo> entry : vertexInfoMap.entrySet()) {
+            JsonObject vertexInfoJsonObj = new JsonObject();
+            VertexInfo vertexInfo = entry.getValue();
+            vertexInfoJsonObj.add("vertexId", vertexInfo.getVertexId());
+            vertexInfoJsonObj.add("type", vertexInfo.getType().getType());
+            vertexInfoJsonObj.add("vertexName", vertexInfo.getConnectorType());
+            JsonArray tablePaths = new JsonArray();
+            for (TablePath tablePath : vertexInfo.getTablePaths()) {
+                tablePaths.add(tablePath.toString());
+            }
+            vertexInfoJsonObj.add("tablePaths", tablePaths);
+            vertexInfoMapString.add(vertexInfoJsonObj);
+        }
+        jsonObject.add("vertexInfoMap", vertexInfoMapString);
+        return jsonObject;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/VertexInfo.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/VertexInfo.java
index 20602c9e83..e7e00e9ef6 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/VertexInfo.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/VertexInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.core.job;
 
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.constants.PluginType;
 
 import lombok.AllArgsConstructor;
@@ -24,6 +25,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 
 import java.io.Serializable;
+import java.util.List;
 
 @Data
 @NoArgsConstructor
@@ -35,4 +37,6 @@ public class VertexInfo implements Serializable {
     private PluginType type;
 
     private String connectorType;
+
+    private List<TablePath> tablePaths;
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
index e3bba41b5f..e7b41de73d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
@@ -17,8 +17,15 @@
 
 package org.apache.seatunnel.engine.server.dag;
 
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.ActionUtils;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
 import org.apache.seatunnel.engine.core.job.Edge;
@@ -28,12 +35,17 @@ import org.apache.seatunnel.engine.core.job.VertexInfo;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
 import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
 
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class DAGUtils {
 
     public static JobDAGInfo getJobDAGInfo(
@@ -69,7 +81,8 @@ public class DAGUtils {
                                                             
vertex.getVertexId(),
                                                             
ActionUtils.getActionType(
                                                                     
vertex.getAction()),
-                                                            
vertex.getAction().getName()));
+                                                            
vertex.getAction().getName(),
+                                                            
getTablePaths(vertex.getAction())));
                                         });
                     });
             return new JobDAGInfo(
@@ -89,7 +102,8 @@ public class DAGUtils {
                                             new VertexInfo(
                                                     v.getVertexId(),
                                                     
ActionUtils.getActionType(v.getAction()),
-                                                    v.getAction().getName()))
+                                                    v.getAction().getName(),
+                                                    
getTablePaths(v.getAction())))
                             .collect(
                                     Collectors.toMap(VertexInfo::getVertexId, 
Function.identity()));
 
@@ -119,4 +133,41 @@ public class DAGUtils {
                     jobImmutableInformation.getJobId(), pipelineWithEdges, 
vertexInfoMap);
         }
     }
+
+    private static List<TablePath> getTablePaths(Action action) {
+
+        List<TablePath> tablePaths = new ArrayList<>();
+        if (action instanceof SourceAction) {
+            SourceAction sourceAction = (SourceAction) action;
+
+            try {
+
+                List<CatalogTable> producedCatalogTables =
+                        sourceAction.getSource().getProducedCatalogTables();
+                List<TablePath> sourceTablePaths =
+                        producedCatalogTables.stream()
+                                .map(CatalogTable::getTablePath)
+                                .collect(Collectors.toList());
+                tablePaths.addAll(sourceTablePaths);
+            } catch (UnsupportedOperationException e) {
+                // ignore
+                log.warn(
+                        "SourceAction {} does not support 
getProducedCatalogTables, fallback to default table path",
+                        action.getName());
+                tablePaths.add(TablePath.DEFAULT);
+            }
+        } else if (action instanceof SinkAction) {
+            SeaTunnelSink seaTunnelSink = ((SinkAction<?, ?, ?, ?>) 
action).getSink();
+            if (seaTunnelSink instanceof MultiTableSink) {
+                List<TablePath> sinkTablePaths =
+                        new ArrayList<>(((MultiTableSink) 
seaTunnelSink).getSinkTables());
+                tablePaths.addAll(sinkTablePaths);
+            } else {
+                Optional<CatalogTable> catalogTable = 
seaTunnelSink.getWriteCatalogTable();
+                catalogTable.ifPresent(table -> 
tablePaths.add(table.getTablePath()));
+            }
+        }
+
+        return tablePaths;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 51335cc9f2..d052629f2e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -754,7 +754,7 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
                         DateTimeUtils.toString(
                                 jobState.getFinishTime(),
                                 DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
-                .add(RestConstant.JOB_DAG, JsonUtils.toJsonString(jobDAGInfo))
+                .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
                 .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
                 .add(RestConstant.METRICS, 
toJsonObject(getJobMetrics(jobMetrics)));
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
index faa62bf54b..ce5fc74d3c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
@@ -26,7 +26,6 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
-import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -287,7 +286,7 @@ public class BaseServlet extends HttpServlet {
                         DateTimeUtils.toString(
                                 jobState.getFinishTime(),
                                 DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
-                .add(RestConstant.JOB_DAG, JsonUtils.toJsonString(jobDAGInfo))
+                .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
                 .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
                 .add(RestConstant.METRICS, 
toJsonObject(getJobMetrics(jobMetrics)));
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index 79d487d50a..2b63ac8018 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -22,9 +22,13 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
@@ -47,6 +51,7 @@ import com.google.common.collect.Sets;
 
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -78,12 +83,19 @@ public class TestUtils {
         fake.setParallelism(3);
         LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
 
+        List<Column> columns = new ArrayList<>();
+        columns.add(PhysicalColumn.of("id", BasicType.INT_TYPE, 11L, 0, true, 
111, ""));
+
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("default", TablePath.DEFAULT),
+                        TableSchema.builder().columns(columns).build(),
+                        new HashMap<>(),
+                        Collections.emptyList(),
+                        "fake");
+
         ConsoleSink consoleSink =
-                new ConsoleSink(
-                        new SeaTunnelRowType(
-                                new String[] {"id"},
-                                new SeaTunnelDataType<?>[] 
{BasicType.INT_TYPE}),
-                        ReadonlyConfig.fromMap(new HashMap<>()));
+                new ConsoleSink(catalogTable, ReadonlyConfig.fromMap(new 
HashMap<>()));
         consoleSink.setJobContext(jobContext);
         Action console =
                 new SinkAction<>(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index eb7cff6db9..5714e68aa6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -22,9 +22,13 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
 import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
@@ -49,8 +53,10 @@ import org.junit.jupiter.api.Test;
 import com.google.common.collect.ImmutableMap;
 import com.hazelcast.map.IMap;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 
@@ -132,12 +138,19 @@ public class CheckpointPlanTest extends 
AbstractSeaTunnelServerTest {
         fake.setParallelism(parallelism);
         LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 
parallelism);
 
+        List<Column> columns = new ArrayList<>();
+        columns.add(PhysicalColumn.of("id", BasicType.INT_TYPE, 11L, 0, true, 
111, ""));
+
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("default", TablePath.DEFAULT),
+                        TableSchema.builder().columns(columns).build(),
+                        new HashMap<>(),
+                        Collections.emptyList(),
+                        "fake");
+
         ConsoleSink consoleSink =
-                new ConsoleSink(
-                        new SeaTunnelRowType(
-                                new String[] {"id"},
-                                new SeaTunnelDataType<?>[] 
{BasicType.INT_TYPE}),
-                        ReadonlyConfig.fromMap(new HashMap<>()));
+                new ConsoleSink(catalogTable, ReadonlyConfig.fromMap(new 
HashMap<>()));
         consoleSink.setJobContext(jobContext);
         Action console =
                 new SinkAction<>(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 10d426f589..25391c5c18 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -22,9 +22,13 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
 import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
@@ -55,8 +59,10 @@ import com.hazelcast.map.IMap;
 
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.Executors;
 
 public class TaskTest extends AbstractSeaTunnelServerTest {
@@ -114,15 +120,22 @@ public class TaskTest extends AbstractSeaTunnelServerTest 
{
                         Collections.emptySet());
         LogicalVertex fake2Vertex = new LogicalVertex(fake2.getId(), fake2, 2);
 
+        List<Column> columns = new ArrayList<>();
+        columns.add(PhysicalColumn.of("id", BasicType.INT_TYPE, 11L, 0, true, 
111, ""));
+
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("default", TablePath.DEFAULT),
+                        TableSchema.builder().columns(columns).build(),
+                        new HashMap<>(),
+                        Collections.emptyList(),
+                        "fake");
+
         Action console =
                 new SinkAction<>(
                         idGenerator.getNextId(),
                         "console",
-                        new ConsoleSink(
-                                new SeaTunnelRowType(
-                                        new String[] {"id"},
-                                        new SeaTunnelDataType<?>[] 
{BasicType.INT_TYPE}),
-                                ReadonlyConfig.fromMap(new HashMap<>())),
+                        new ConsoleSink(catalogTable, 
ReadonlyConfig.fromMap(new HashMap<>())),
                         Sets.newHashSet(new URL("file:///console.jar")),
                         Collections.emptySet());
         LogicalVertex consoleVertex = new LogicalVertex(console.getId(), 
console, 2);
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
index ab1ac3f00e..f234004837 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/test/java/org/apache/seatunnel/translation/spark/sink/SeaTunnelSinkWithBuffer.java
@@ -19,9 +19,11 @@ package org.apache.seatunnel.translation.spark.sink;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import java.io.IOException;
+import java.util.Optional;
 
 public class SeaTunnelSinkWithBuffer implements SeaTunnelSink<SeaTunnelRow, 
Void, Void, Void> {
 
@@ -35,4 +37,9 @@ public class SeaTunnelSinkWithBuffer implements 
SeaTunnelSink<SeaTunnelRow, Void
             throws IOException {
         return new SeaTunnelSinkWithBufferWriter();
     }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return SeaTunnelSink.super.getWriteCatalogTable();
+    }
 }


Reply via email to