This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 41173357f8 [Improve] Refactor CatalogTable and add 
`SeaTunnelSource::getProducedCatalogTables` (#5562)
41173357f8 is described below

commit 41173357f847f7bc72119b3e0ebd5019f94a4c68
Author: Jia Fan <[email protected]>
AuthorDate: Thu Sep 28 16:55:42 2023 +0800

    [Improve] Refactor CatalogTable and add 
`SeaTunnelSource::getProducedCatalogTables` (#5562)
---
 .../seatunnel/api/source/SeaTunnelSource.java      | 18 ++++-
 .../api/table/catalog/CatalogTableUtil.java        | 23 +++++-
 .../seatunnel/api/table/factory/FactoryUtil.java   | 92 ++++++++++++----------
 .../api/table/factory/SupportMultipleTable.java    | 56 -------------
 .../api/table/factory/TableFactoryContext.java     | 33 +-------
 .../api/table/factory/TableSinkFactory.java        |  2 +-
 ...rmFactory.java => TableSinkFactoryContext.java} | 28 +++----
 .../api/table/factory/TableSourceFactory.java      |  2 +-
 ...Factory.java => TableSourceFactoryContext.java} | 23 ++----
 .../api/table/factory/TableTransformFactory.java   |  2 +-
 ...text.java => TableTransformFactoryContext.java} | 30 +------
 .../seatunnel/api/table/type/SeaTunnelRow.java     |  4 +-
 .../cdc/base/source/IncrementalSource.java         | 13 ++-
 .../cdc/mongodb/MongodbIncrementalSource.java      |  8 +-
 .../mongodb/MongodbIncrementalSourceFactory.java   | 41 +++-------
 .../cdc/mysql/source/MySqlIncrementalSource.java   |  7 +-
 .../source/MySqlIncrementalSourceFactory.java      | 39 +++------
 .../source/source/SqlServerIncrementalSource.java  |  8 +-
 .../source/SqlServerIncrementalSourceFactory.java  | 38 +++------
 .../seatunnel/console/sink/ConsoleSinkFactory.java |  4 +-
 .../sqlserver/SqlServerDataTypeConvertor.java      |  1 +
 .../seatunnel/jdbc/sink/JdbcSinkFactory.java       |  4 +-
 .../seatunnel/jdbc/source/JdbcSourceFactory.java   | 70 ++++++++++++----
 .../seatunnel/kafka/sink/KafkaSinkFactory.java     |  4 +-
 .../starrocks/sink/StarRocksSinkFactory.java       |  4 +-
 .../dag/actions/ShuffleMultipleRowStrategy.java    | 10 +--
 .../core/parse/MultipleTableJobConfigParser.java   | 67 +++++-----------
 .../dag/execution/ExecutionPlanGenerator.java      | 14 ++--
 .../server/dag/physical/PhysicalPlanGenerator.java |  7 --
 .../engine/server/task/SourceSeaTunnelTask.java    | 14 +++-
 .../transform/copy/CopyFieldTransformFactory.java  |  7 +-
 .../fieldmapper/FieldMapperTransformFactory.java   |  6 +-
 .../filter/FilterFieldTransformFactory.java        |  6 +-
 .../FilterRowKindTransformFactory.java             |  6 +-
 .../transform/replace/ReplaceTransformFactory.java |  6 +-
 .../transform/split/SplitTransformFactory.java     |  7 +-
 .../transform/sql/SQLTransformFactory.java         |  6 +-
 37 files changed, 316 insertions(+), 394 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index 0535de68b8..924c2e5244 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -21,9 +21,11 @@ import 
org.apache.seatunnel.api.common.PluginIdentifierInterface;
 import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
 import java.io.Serializable;
+import java.util.List;
 
 /**
  * The interface for Source. It acts like a factory class that helps construct 
the {@link
@@ -49,9 +51,23 @@ public interface SeaTunnelSource<T, SplitT extends 
SourceSplit, StateT extends S
     /**
      * Get the data type of the records produced by this source.
      *
+     * @deprecated Please use {@link #getProducedCatalogTables}
      * @return SeaTunnel data type.
      */
-    SeaTunnelDataType<T> getProducedType();
+    @Deprecated
+    default SeaTunnelDataType<T> getProducedType() {
+        throw new UnsupportedOperationException("getProducedType method has 
not been implemented.");
+    }
+
+    /**
+     * Get the catalog tables output by this source, It is recommended that 
all connectors implement
+     * this method instead of {@link #getProducedType}. CatalogTable contains 
more information to
+     * help downstream support more accurate and complete synchronization 
capabilities.
+     */
+    default List<CatalogTable> getProducedCatalogTables() {
+        throw new UnsupportedOperationException(
+                "getProducedCatalogTables method has not been implemented.");
+    }
 
     /**
      * Create source reader, used to produce data.
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index a9b921ce5b..def005eeb6 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -33,8 +33,10 @@ import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.utils.JsonUtils;
@@ -138,9 +140,14 @@ public class CatalogTableUtil implements Serializable {
     @Deprecated
     public static List<CatalogTable> getCatalogTablesFromConfig(
             ReadonlyConfig readonlyConfig, ClassLoader classLoader) {
-
         // We use plugin_name as factoryId, so MySQL-CDC should be MySQL
         String factoryId = 
readonlyConfig.get(CommonOptions.PLUGIN_NAME).replace("-CDC", "");
+        return getCatalogTablesFromConfig(factoryId, readonlyConfig, 
classLoader);
+    }
+
+    @Deprecated
+    public static List<CatalogTable> getCatalogTablesFromConfig(
+            String factoryId, ReadonlyConfig readonlyConfig, ClassLoader 
classLoader) {
         // Highest priority: specified schema
         Map<String, String> schemaMap = 
readonlyConfig.get(CatalogTableUtil.SCHEMA);
         if (schemaMap != null) {
@@ -188,6 +195,20 @@ public class CatalogTableUtil implements Serializable {
         return buildWithConfig(readonlyConfig);
     }
 
+    public static SeaTunnelDataType<SeaTunnelRow> convertToDataType(
+            List<CatalogTable> catalogTables) {
+        if (catalogTables.size() == 1) {
+            return 
catalogTables.get(0).getTableSchema().toPhysicalRowDataType();
+        } else {
+            Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
+            for (CatalogTable catalogTable : catalogTables) {
+                String tableId = 
catalogTable.getTableId().toTablePath().toString();
+                rowTypeMap.put(tableId, 
catalogTable.getTableSchema().toPhysicalRowDataType());
+            }
+            return new MultipleRowType(rowTypeMap);
+        }
+    }
+
     public static CatalogTable buildWithConfig(ReadonlyConfig readonlyConfig) {
         if (readonlyConfig.get(SCHEMA) == null) {
             throw new RuntimeException(
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index f309002699..48ed785c39 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -21,13 +21,20 @@ import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.env.ParsingMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceOptions;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 
 import org.slf4j.Logger;
@@ -55,10 +62,11 @@ public final class FactoryUtil {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FactoryUtil.class);
 
+    static final String DEFAULT_ID = "default-identifier";
+
     public static <T, SplitT extends SourceSplit, StateT extends Serializable>
             List<Tuple2<SeaTunnelSource<T, SplitT, StateT>, 
List<CatalogTable>>>
                     createAndPrepareSource(
-                            List<CatalogTable> multipleTables,
                             ReadonlyConfig options,
                             ClassLoader classLoader,
                             String factoryIdentifier) {
@@ -67,32 +75,44 @@ public final class FactoryUtil {
             final TableSourceFactory factory =
                     discoverFactory(classLoader, TableSourceFactory.class, 
factoryIdentifier);
             List<Tuple2<SeaTunnelSource<T, SplitT, StateT>, 
List<CatalogTable>>> sources =
-                    new ArrayList<>(multipleTables.size());
-            if (factory instanceof SupportMultipleTable) {
-                List<CatalogTable> remainingTables = multipleTables;
-                while (!remainingTables.isEmpty()) {
-                    TableFactoryContext context =
-                            new TableFactoryContext(remainingTables, options, 
classLoader);
-                    SupportMultipleTable.Result result =
-                            ((SupportMultipleTable) 
factory).applyTables(context);
-                    List<CatalogTable> acceptedTables = 
result.getAcceptedTables();
-                    sources.add(
-                            new Tuple2<>(
-                                    createAndPrepareSource(
-                                            factory, acceptedTables, options, 
classLoader),
-                                    acceptedTables));
-                    remainingTables = result.getRemainingTables();
-                }
-            } else {
-                for (CatalogTable catalogTable : multipleTables) {
-                    List<CatalogTable> acceptedTables = 
Collections.singletonList(catalogTable);
-                    sources.add(
-                            new Tuple2<>(
-                                    createAndPrepareSource(
-                                            factory, acceptedTables, options, 
classLoader),
-                                    acceptedTables));
+                    new ArrayList<>();
+            SeaTunnelSource<T, SplitT, StateT> source =
+                    createAndPrepareSource(factory, options, classLoader);
+            List<CatalogTable> catalogTables;
+            try {
+                catalogTables = source.getProducedCatalogTables();
+            } catch (UnsupportedOperationException e) {
+                // TODO remove it when all connector use 
`getProducedCatalogTables`
+                SeaTunnelDataType<T> seaTunnelDataType = 
source.getProducedType();
+                final String tableId =
+                        
options.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
+                if (seaTunnelDataType instanceof MultipleRowType) {
+                    catalogTables = new ArrayList<>();
+                    for (String id : ((MultipleRowType) 
seaTunnelDataType).getTableIds()) {
+                        catalogTables.add(
+                                CatalogTableUtil.getCatalogTable(
+                                        id, ((MultipleRowType) 
seaTunnelDataType).getRowType(id)));
+                    }
+                } else {
+                    catalogTables =
+                            Collections.singletonList(
+                                    CatalogTableUtil.getCatalogTable(
+                                            tableId, (SeaTunnelRowType) 
seaTunnelDataType));
                 }
             }
+            LOG.info(
+                    "get the CatalogTable from source {}: {}",
+                    source.getPluginName(),
+                    catalogTables.stream()
+                            .map(CatalogTable::getTableId)
+                            .map(TableIdentifier::toString)
+                            .collect(Collectors.joining(",")));
+            if (options.get(SourceOptions.DAG_PARSING_MODE) == 
ParsingMode.SHARDING) {
+                CatalogTable catalogTable = catalogTables.get(0);
+                catalogTables.clear();
+                catalogTables.add(catalogTable);
+            }
+            sources.add(new Tuple2<>(source, catalogTables));
             return sources;
         } catch (Throwable t) {
             throw new FactoryException(
@@ -104,22 +124,13 @@ public final class FactoryUtil {
 
     private static <T, SplitT extends SourceSplit, StateT extends Serializable>
             SeaTunnelSource<T, SplitT, StateT> createAndPrepareSource(
-                    TableSourceFactory factory,
-                    List<CatalogTable> acceptedTables,
-                    ReadonlyConfig options,
-                    ClassLoader classLoader) {
-        TableFactoryContext context = new TableFactoryContext(acceptedTables, 
options, classLoader);
+                    TableSourceFactory factory, ReadonlyConfig options, 
ClassLoader classLoader) {
+        TableSourceFactoryContext context = new 
TableSourceFactoryContext(options, classLoader);
         
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
         TableSource<T, SplitT, StateT> tableSource = 
factory.createSource(context);
-        validateAndApplyMetadata(acceptedTables, tableSource);
         return tableSource.createSource();
     }
 
-    private static void validateAndApplyMetadata(
-            List<CatalogTable> catalogTables, TableSource<?, ?, ?> 
tableSource) {
-        // TODO: handle reading metadata
-    }
-
     public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT>
             SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
createAndPrepareSink(
                     CatalogTable catalogTable,
@@ -129,9 +140,8 @@ public final class FactoryUtil {
         try {
             TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
factory =
                     discoverFactory(classLoader, TableSinkFactory.class, 
factoryIdentifier);
-            TableFactoryContext context =
-                    new TableFactoryContext(
-                            Collections.singletonList(catalogTable), options, 
classLoader);
+            TableSinkFactoryContext context =
+                    new TableSinkFactoryContext(catalogTable, options, 
classLoader);
             
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
             return factory.createSink(context).createSink();
         } catch (Throwable t) {
@@ -293,8 +303,8 @@ public final class FactoryUtil {
             String factoryIdentifier) {
         final TableTransformFactory factory =
                 discoverFactory(classLoader, TableTransformFactory.class, 
factoryIdentifier);
-        TableFactoryContext context =
-                new TableFactoryContext(
+        TableTransformFactoryContext context =
+                new TableTransformFactoryContext(
                         Collections.singletonList(catalogTable), options, 
classLoader);
         
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
         return factory.createTransform(context).createTransform();
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
deleted file mode 100644
index a48fd96f74..0000000000
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.factory;
-
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-
-import java.util.List;
-
-/**
- * Used to declare that the connector can handle data from multiple tables.
- *
- * <p>The expansion of the {@link TableSourceFactory}.
- */
-public interface SupportMultipleTable {
-
-    /** A connector can pick tables and return the accepted and remaining 
tables. */
-    Result applyTables(TableFactoryContext context);
-
-    final class Result {
-        private final List<CatalogTable> acceptedTables;
-        private final List<CatalogTable> remainingTables;
-
-        private Result(List<CatalogTable> acceptedTables, List<CatalogTable> 
remainingTables) {
-            this.acceptedTables = acceptedTables;
-            this.remainingTables = remainingTables;
-        }
-
-        public static Result of(
-                List<CatalogTable> acceptedTables, List<CatalogTable> 
remainingTables) {
-            return new Result(acceptedTables, remainingTables);
-        }
-
-        public List<CatalogTable> getAcceptedTables() {
-            return acceptedTables;
-        }
-
-        public List<CatalogTable> getRemainingTables() {
-            return remainingTables;
-        }
-    }
-}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
index 2fda5fc064..10436da09b 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
@@ -18,42 +18,17 @@
 package org.apache.seatunnel.api.table.factory;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
 
 import lombok.Getter;
 
-import java.util.List;
+@Getter
+public abstract class TableFactoryContext {
 
-public class TableFactoryContext {
-
-    private final List<CatalogTable> catalogTables;
-    @Getter private final ReadonlyConfig options;
+    private final ReadonlyConfig options;
     private final ClassLoader classLoader;
 
-    public TableFactoryContext(
-            List<CatalogTable> catalogTables, ReadonlyConfig options, 
ClassLoader classLoader) {
-        this.catalogTables = catalogTables;
+    public TableFactoryContext(ReadonlyConfig options, ClassLoader 
classLoader) {
         this.options = options;
         this.classLoader = classLoader;
     }
-
-    public ClassLoader getClassLoader() {
-        return this.classLoader;
-    }
-
-    /**
-     * Returns a list of tables that need to be processed.
-     *
-     * <p>By default, return only single table.
-     *
-     * <p>If you need multiple tables, implement {@link SupportMultipleTable}.
-     */
-    public List<CatalogTable> getCatalogTables() {
-        return catalogTables;
-    }
-
-    /** @return single table. */
-    public CatalogTable getCatalogTable() {
-        return catalogTables.get(0);
-    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index f0015fa58d..2fca039e7d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -37,7 +37,7 @@ public interface TableSinkFactory<IN, StateT, CommitInfoT, 
AggregatedCommitInfoT
      * @return
      */
     default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
createSink(
-            TableFactoryContext context) {
+            TableSinkFactoryContext context) {
         throw new UnsupportedOperationException(
                 "The Factory has not been implemented and the deprecated 
Plugin will be used.");
     }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
similarity index 55%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
index 33caf328d6..f579adc416 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
@@ -17,23 +17,19 @@
 
 package org.apache.seatunnel.api.table.factory;
 
-import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 
-/**
- * This is an SPI interface, used to create {@link
- * org.apache.seatunnel.api.table.connector.TableTransform}. Each plugin need 
to have it own
- * implementation.
- */
-public interface TableTransformFactory extends Factory {
+import lombok.Getter;
+
+@Getter
+public class TableSinkFactoryContext extends TableFactoryContext {
+
+    private final CatalogTable catalogTable;
 
-    /**
-     * We will never use this method now. So gave a default implement and 
return null.
-     *
-     * @param context TableFactoryContext
-     * @return
-     */
-    default <T> TableTransform<T> createTransform(TableFactoryContext context) 
{
-        throw new UnsupportedOperationException(
-                "The Factory has not been implemented and the deprecated 
Plugin will be used.");
+    public TableSinkFactoryContext(
+            CatalogTable catalogTable, ReadonlyConfig options, ClassLoader 
classLoader) {
+        super(options, classLoader);
+        this.catalogTable = catalogTable;
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 30f70efdea..132d904958 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -35,7 +35,7 @@ public interface TableSourceFactory extends Factory {
      * @param context TableFactoryContext
      */
     default <T, SplitT extends SourceSplit, StateT extends Serializable>
-            TableSource<T, SplitT, StateT> createSource(TableFactoryContext 
context) {
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         throw new UnsupportedOperationException(
                 "The Factory has not been implemented and the deprecated 
Plugin will be used.");
     }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactoryContext.java
similarity index 55%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactoryContext.java
index 33caf328d6..41b2b39c6e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactoryContext.java
@@ -17,23 +17,14 @@
 
 package org.apache.seatunnel.api.table.factory;
 
-import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
-/**
- * This is an SPI interface, used to create {@link
- * org.apache.seatunnel.api.table.connector.TableTransform}. Each plugin need 
to have it own
- * implementation.
- */
-public interface TableTransformFactory extends Factory {
+import lombok.Getter;
+
+@Getter
+public class TableSourceFactoryContext extends TableFactoryContext {
 
-    /**
-     * We will never use this method now. So gave a default implement and 
return null.
-     *
-     * @param context TableFactoryContext
-     * @return
-     */
-    default <T> TableTransform<T> createTransform(TableFactoryContext context) 
{
-        throw new UnsupportedOperationException(
-                "The Factory has not been implemented and the deprecated 
Plugin will be used.");
+    public TableSourceFactoryContext(ReadonlyConfig options, ClassLoader 
classLoader) {
+        super(options, classLoader);
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
index 33caf328d6..46c6cfa56f 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
@@ -32,7 +32,7 @@ public interface TableTransformFactory extends Factory {
      * @param context TableFactoryContext
      * @return
      */
-    default <T> TableTransform<T> createTransform(TableFactoryContext context) 
{
+    default <T> TableTransform<T> createTransform(TableTransformFactoryContext 
context) {
         throw new UnsupportedOperationException(
                 "The Factory has not been implemented and the deprecated 
Plugin will be used.");
     }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java
similarity index 62%
copy from 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
copy to 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java
index 2fda5fc064..bf8176c7a8 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java
@@ -24,36 +24,14 @@ import lombok.Getter;
 
 import java.util.List;
 
-public class TableFactoryContext {
+@Getter
+public class TableTransformFactoryContext extends TableFactoryContext {
 
     private final List<CatalogTable> catalogTables;
-    @Getter private final ReadonlyConfig options;
-    private final ClassLoader classLoader;
 
-    public TableFactoryContext(
+    public TableTransformFactoryContext(
             List<CatalogTable> catalogTables, ReadonlyConfig options, 
ClassLoader classLoader) {
+        super(options, classLoader);
         this.catalogTables = catalogTables;
-        this.options = options;
-        this.classLoader = classLoader;
-    }
-
-    public ClassLoader getClassLoader() {
-        return this.classLoader;
-    }
-
-    /**
-     * Returns a list of tables that need to be processed.
-     *
-     * <p>By default, return only single table.
-     *
-     * <p>If you need multiple tables, implement {@link SupportMultipleTable}.
-     */
-    public List<CatalogTable> getCatalogTables() {
-        return catalogTables;
-    }
-
-    /** @return single table. */
-    public CatalogTable getCatalogTable() {
-        return catalogTables.get(0);
     }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index bd05e0808d..299026c407 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.api.table.type;
 
-import org.apache.seatunnel.api.table.factory.SupportMultipleTable;
-
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Map;
@@ -27,7 +25,7 @@ import java.util.Objects;
 /** SeaTunnel row type. */
 public final class SeaTunnelRow implements Serializable {
     private static final long serialVersionUID = -1L;
-    /** Table identifier, used for the source connector that {@link 
SupportMultipleTable}. */
+    /** Table identifier. */
     private String tableId = "";
     /** The kind of change that a row describes in a changelog. */
     private RowKind kind = RowKind.INSERT;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index ed04fb0f5d..ff13fc30b1 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportCoordinate;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
@@ -88,14 +89,19 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
 
     protected int incrementalParallelism;
     protected StopConfig stopConfig;
+    protected List<CatalogTable> catalogTables;
 
     protected StopMode stopMode;
     protected DebeziumDeserializationSchema<T> deserializationSchema;
 
     protected SeaTunnelDataType<SeaTunnelRow> dataType;
 
-    protected IncrementalSource(ReadonlyConfig options, 
SeaTunnelDataType<SeaTunnelRow> dataType) {
+    protected IncrementalSource(
+            ReadonlyConfig options,
+            SeaTunnelDataType<SeaTunnelRow> dataType,
+            List<CatalogTable> catalogTables) {
         this.dataType = dataType;
+        this.catalogTables = catalogTables;
         this.readonlyConfig = options;
         this.startupConfig = getStartupConfig(readonlyConfig);
         this.stopConfig = getStopConfig(readonlyConfig);
@@ -137,6 +143,11 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
                 config.get(SourceOptions.STOP_TIMESTAMP));
     }
 
+    @Override
+    public List<CatalogTable> getProducedCatalogTables() {
+        return catalogTables;
+    }
+
     public abstract Option<StartupMode> getStartupModeOption();
 
     public abstract Option<StopMode> getStopModeOption();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
index 41191cfa52..edcbdea9a9 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
@@ -43,6 +44,7 @@ import lombok.NoArgsConstructor;
 
 import javax.annotation.Nonnull;
 
+import java.util.List;
 import java.util.Optional;
 
 @NoArgsConstructor
@@ -53,8 +55,10 @@ public class MongodbIncrementalSource<T> extends 
IncrementalSource<T, MongodbSou
     static final String IDENTIFIER = "MongoDB-CDC";
 
     public MongodbIncrementalSource(
-            ReadonlyConfig options, SeaTunnelDataType<SeaTunnelRow> dataType) {
-        super(options, dataType);
+            ReadonlyConfig options,
+            SeaTunnelDataType<SeaTunnelRow> dataType,
+            List<CatalogTable> catalogTables) {
+        super(options, dataType, catalogTables);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
index 6215afb74e..7b816ed3eb 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
@@ -21,28 +21,22 @@ import 
org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.SupportMultipleTable;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
 
 import com.google.auto.service.AutoService;
 
-import javax.annotation.Nonnull;
-
 import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
 
 @AutoService(Factory.class)
-public class MongodbIncrementalSourceFactory implements TableSourceFactory, 
SupportMultipleTable {
+public class MongodbIncrementalSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
         return MongodbIncrementalSource.IDENTIFIER;
@@ -77,28 +71,15 @@ public class MongodbIncrementalSourceFactory implements 
TableSourceFactory, Supp
     @SuppressWarnings("unchecked")
     @Override
     public <T, SplitT extends SourceSplit, StateT extends Serializable>
-            TableSource<T, SplitT, StateT> createSource(TableFactoryContext 
context) {
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () -> {
-            SeaTunnelDataType<SeaTunnelRow> dataType;
-            if (context.getCatalogTables().size() == 1) {
-                dataType =
-                        
context.getCatalogTables().get(0).getTableSchema().toPhysicalRowDataType();
-            } else {
-                Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
-                for (CatalogTable catalogTable : context.getCatalogTables()) {
-                    rowTypeMap.put(
-                            catalogTable.getTableId().toTablePath().toString(),
-                            
catalogTable.getTableSchema().toPhysicalRowDataType());
-                }
-                dataType = new MultipleRowType(rowTypeMap);
-            }
+            List<CatalogTable> catalogTables =
+                    CatalogTableUtil.getCatalogTablesFromConfig(
+                            context.getOptions(), context.getClassLoader());
+            SeaTunnelDataType<SeaTunnelRow> dataType =
+                    CatalogTableUtil.convertToDataType(catalogTables);
             return (SeaTunnelSource<T, SplitT, StateT>)
-                    new MongodbIncrementalSource<>(context.getOptions(), 
dataType);
+                    new MongodbIncrementalSource<>(context.getOptions(), 
dataType, catalogTables);
         };
     }
-
-    @Override
-    public Result applyTables(@Nonnull TableFactoryContext context) {
-        return Result.of(context.getCatalogTables(), Collections.emptyList());
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 43d8e505d6..270b0d7309 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -50,6 +50,7 @@ import com.google.auto.service.AutoService;
 import lombok.NoArgsConstructor;
 
 import java.time.ZoneId;
+import java.util.List;
 
 @NoArgsConstructor
 @AutoService(SeaTunnelSource.class)
@@ -58,8 +59,10 @@ public class MySqlIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceCo
     static final String IDENTIFIER = "MySQL-CDC";
 
     public MySqlIncrementalSource(
-            ReadonlyConfig options, SeaTunnelDataType<SeaTunnelRow> dataType) {
-        super(options, dataType);
+            ReadonlyConfig options,
+            SeaTunnelDataType<SeaTunnelRow> dataType,
+            List<CatalogTable> catalogTables) {
+        super(options, dataType, catalogTables);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 6429fa4b52..60e1105e30 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -22,15 +22,13 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.SupportMultipleTable;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
@@ -40,12 +38,10 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
 import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
 
 @AutoService(Factory.class)
-public class MySqlIncrementalSourceFactory implements TableSourceFactory, 
SupportMultipleTable {
+public class MySqlIncrementalSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
         return MySqlIncrementalSource.IDENTIFIER;
@@ -95,28 +91,15 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory, Suppor
 
     @Override
     public <T, SplitT extends SourceSplit, StateT extends Serializable>
-            TableSource<T, SplitT, StateT> createSource(TableFactoryContext 
context) {
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () -> {
-            SeaTunnelDataType<SeaTunnelRow> dataType;
-            if (context.getCatalogTables().size() == 1) {
-                dataType =
-                        
context.getCatalogTables().get(0).getTableSchema().toPhysicalRowDataType();
-            } else {
-                Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
-                for (CatalogTable catalogTable : context.getCatalogTables()) {
-                    rowTypeMap.put(
-                            catalogTable.getTableId().toTablePath().toString(),
-                            
catalogTable.getTableSchema().toPhysicalRowDataType());
-                }
-                dataType = new MultipleRowType(rowTypeMap);
-            }
+            List<CatalogTable> catalogTables =
+                    CatalogTableUtil.getCatalogTablesFromConfig(
+                            context.getOptions(), context.getClassLoader());
+            SeaTunnelDataType<SeaTunnelRow> dataType =
+                    CatalogTableUtil.convertToDataType(catalogTables);
             return (SeaTunnelSource<T, SplitT, StateT>)
-                    new MySqlIncrementalSource<>(context.getOptions(), 
dataType);
+                    new MySqlIncrementalSource<>(context.getOptions(), 
dataType, catalogTables);
         };
     }
-
-    @Override
-    public Result applyTables(TableFactoryContext context) {
-        return Result.of(context.getCatalogTables(), Collections.emptyList());
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
index cf9cf84b82..e56fb00423 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
@@ -50,6 +51,7 @@ import io.debezium.relational.TableId;
 import lombok.NoArgsConstructor;
 
 import java.time.ZoneId;
+import java.util.List;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils.convertFromTable;
@@ -62,8 +64,10 @@ public class SqlServerIncrementalSource<T> extends 
IncrementalSource<T, JdbcSour
     static final String IDENTIFIER = "SqlServer-CDC";
 
     public SqlServerIncrementalSource(
-            ReadonlyConfig options, SeaTunnelDataType<SeaTunnelRow> dataType) {
-        super(options, dataType);
+            ReadonlyConfig options,
+            SeaTunnelDataType<SeaTunnelRow> dataType,
+            List<CatalogTable> catalogTables) {
+        super(options, dataType, catalogTables);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index 285d4b7923..7127209aef 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -22,15 +22,13 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.SupportMultipleTable;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
@@ -40,12 +38,10 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
 import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
 
 @AutoService(Factory.class)
-public class SqlServerIncrementalSourceFactory implements TableSourceFactory, 
SupportMultipleTable {
+public class SqlServerIncrementalSourceFactory implements TableSourceFactory {
 
     @Override
     public String factoryIdentifier() {
@@ -100,26 +96,14 @@ public class SqlServerIncrementalSourceFactory implements 
TableSourceFactory, Su
 
     @Override
     public <T, SplitT extends SourceSplit, StateT extends Serializable>
-            TableSource<T, SplitT, StateT> createSource(TableFactoryContext 
context) {
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () -> {
-            SeaTunnelDataType<SeaTunnelRow> dataType;
-            if (context.getCatalogTables().size() == 1) {
-                dataType =
-                        
context.getCatalogTables().get(0).getTableSchema().toPhysicalRowDataType();
-            } else {
-                Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
-                for (CatalogTable catalogTable : context.getCatalogTables()) {
-                    String tableId = 
catalogTable.getTableId().toTablePath().toString();
-                    rowTypeMap.put(tableId, 
catalogTable.getTableSchema().toPhysicalRowDataType());
-                }
-                dataType = new MultipleRowType(rowTypeMap);
-            }
-            return new SqlServerIncrementalSource(context.getOptions(), 
dataType);
+            List<CatalogTable> catalogTables =
+                    CatalogTableUtil.getCatalogTablesFromConfig(
+                            context.getOptions(), context.getClassLoader());
+            SeaTunnelDataType<SeaTunnelRow> dataType =
+                    CatalogTableUtil.convertToDataType(catalogTables);
+            return new SqlServerIncrementalSource(context.getOptions(), 
dataType, catalogTables);
         };
     }
-
-    @Override
-    public SupportMultipleTable.Result applyTables(TableFactoryContext 
context) {
-        return SupportMultipleTable.Result.of(context.getCatalogTables(), 
Collections.emptyList());
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
index 5a66493aee..858357d282 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
@@ -23,8 +23,8 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 
 import com.google.auto.service.AutoService;
 
@@ -56,7 +56,7 @@ public class ConsoleSinkFactory implements TableSinkFactory {
     }
 
     @Override
-    public TableSink createSink(TableFactoryContext context) {
+    public TableSink createSink(TableSinkFactoryContext context) {
         ReadonlyConfig options = context.getOptions();
         return () ->
                 new ConsoleSink(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
index afad20c67c..7bd3bca639 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
@@ -80,6 +80,7 @@ public class SqlServerDataTypeConvertor implements 
DataTypeConvertor<SqlServerTy
             case NTEXT:
             case NVARCHAR:
             case TEXT:
+            case XML:
                 return BasicType.STRING_TYPE;
             case DATE:
                 return LocalTimeType.LOCAL_DATE_TYPE;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index d18ff0d7fd..d93115a707 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -27,8 +27,8 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
@@ -74,7 +74,7 @@ public class JdbcSinkFactory implements TableSinkFactory {
     }
 
     @Override
-    public TableSink createSink(TableFactoryContext context) {
+    public TableSink createSink(TableSinkFactoryContext context) {
         ReadonlyConfig config = context.getOptions();
         CatalogTable catalogTable = context.getCatalogTable();
         Map<String, String> catalogOptions = 
config.get(CatalogOptions.CATALOG_OPTIONS);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
index 8c21a84233..264df5eafa 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
@@ -22,12 +22,13 @@ import 
org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -41,6 +42,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcCo
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 
 import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
@@ -49,7 +51,9 @@ import java.io.Serializable;
 import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -78,20 +82,35 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
     @Override
     @SuppressWarnings("unchecked")
     public <T, SplitT extends SourceSplit, StateT extends Serializable>
-            TableSource<T, SplitT, StateT> createSource(TableFactoryContext 
context) {
-        CatalogTable catalogTable = context.getCatalogTable();
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         JdbcSourceConfig config = JdbcSourceConfig.of(context.getOptions());
-        JdbcConnectionProvider connectionProvider =
-                new 
SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig());
-        final String querySql = config.getQuery();
         JdbcDialect dialect =
                 JdbcDialectLoader.load(
                         config.getJdbcConnectionConfig().getUrl(),
                         config.getJdbcConnectionConfig().getCompatibleMode());
-        TableSchema tableSchema = catalogTable.getTableSchema();
-        SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
-        Optional<PartitionParameter> partitionParameter =
-                createPartitionParameter(config, tableSchema, 
connectionProvider);
+        JdbcConnectionProvider connectionProvider =
+                new 
SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig());
+
+        SeaTunnelRowType rowType;
+        Optional<PartitionParameter> partitionParameter = Optional.empty();
+        try {
+            CatalogTable catalogTable =
+                    CatalogTableUtil.getCatalogTablesFromConfig(
+                                    dialect.dialectName(),
+                                    context.getOptions(),
+                                    context.getClassLoader())
+                            .get(0);
+            TableSchema tableSchema = catalogTable.getTableSchema();
+            rowType = tableSchema.toPhysicalRowDataType();
+            partitionParameter = createPartitionParameter(config, tableSchema, 
connectionProvider);
+        } catch (Exception e) {
+            try (Connection connection = 
connectionProvider.getOrEstablishConnection()) {
+                rowType = initTableField(connection, config, dialect);
+            } catch (Exception k) {
+                throw new PrepareFailException("jdbc", PluginType.SOURCE, 
k.toString());
+            }
+        }
+        final String querySql = config.getQuery();
         JdbcInputFormat inputFormat =
                 new JdbcInputFormat(
                         connectionProvider,
@@ -100,18 +119,20 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
                         querySql,
                         config.getFetchSize(),
                         config.getJdbcConnectionConfig().isAutoCommit());
+        Optional<PartitionParameter> finalPartitionParameter = 
partitionParameter;
+        SeaTunnelRowType finalRowType = rowType;
         return () ->
                 (SeaTunnelSource<T, SplitT, StateT>)
                         new JdbcSource(
                                 config,
-                                rowType,
+                                finalRowType,
                                 dialect,
                                 inputFormat,
-                                partitionParameter.orElse(null),
+                                finalPartitionParameter.orElse(null),
                                 connectionProvider,
-                                partitionParameter.isPresent()
+                                finalPartitionParameter.isPresent()
                                         ? obtainPartitionSql(
-                                                dialect, 
partitionParameter.get(), querySql)
+                                                dialect, 
finalPartitionParameter.get(), querySql)
                                         : querySql);
     }
 
@@ -132,6 +153,27 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
                 partitionParameter.getPartitionColumnName());
     }
 
+    private SeaTunnelRowType initTableField(
+            Connection conn, JdbcSourceConfig jdbcSourceConfig, JdbcDialect 
jdbcDialect) {
+        JdbcDialectTypeMapper jdbcDialectTypeMapper = 
jdbcDialect.getJdbcDialectTypeMapper();
+        ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+        ArrayList<String> fieldNames = new ArrayList<>();
+        try {
+            ResultSetMetaData resultSetMetaData =
+                    jdbcDialect.getResultSetMetaData(conn, jdbcSourceConfig);
+            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+                // Support AS syntax
+                fieldNames.add(resultSetMetaData.getColumnLabel(i));
+                
seaTunnelDataTypes.add(jdbcDialectTypeMapper.mapping(resultSetMetaData, i));
+            }
+        } catch (Exception e) {
+            log.warn("get row type info exception", e);
+        }
+        return new SeaTunnelRowType(
+                fieldNames.toArray(new String[0]),
+                seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
+    }
+
     public static Optional<PartitionParameter> createPartitionParameter(
             JdbcSourceConfig config,
             TableSchema tableSchema,
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index 450ba3f1cd..c9a365511d 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -20,8 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 
@@ -56,7 +56,7 @@ public class KafkaSinkFactory implements TableSinkFactory {
     }
 
     @Override
-    public TableSink createSink(TableFactoryContext context) {
+    public TableSink createSink(TableSinkFactoryContext context) {
         return () ->
                 new KafkaSink(
                         context.getOptions(),
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index c0159c5fd4..7c3592411a 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
@@ -60,7 +60,7 @@ public class StarRocksSinkFactory implements TableSinkFactory 
{
     }
 
     @Override
-    public TableSink createSink(TableFactoryContext context) {
+    public TableSink createSink(TableSinkFactoryContext context) {
         SinkConfig sinkConfig = SinkConfig.of(context.getOptions());
         CatalogTable catalogTable = context.getCatalogTable();
         if (StringUtils.isBlank(sinkConfig.getTable())) {
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
index b5dcdf0534..18fea5fb28 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
@@ -17,11 +17,10 @@
 
 package org.apache.seatunnel.engine.core.dag.actions;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
 import com.hazelcast.collection.IQueue;
 import com.hazelcast.core.HazelcastInstance;
@@ -33,6 +32,7 @@ import lombok.experimental.Tolerate;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
@@ -44,7 +44,7 @@ import java.util.stream.Stream;
 @Setter
 @ToString
 public class ShuffleMultipleRowStrategy extends ShuffleStrategy {
-    private MultipleRowType inputRowType;
+    private List<CatalogTable> catalogTables;
     private String targetTableId;
 
     @Tolerate
@@ -54,8 +54,8 @@ public class ShuffleMultipleRowStrategy extends 
ShuffleStrategy {
     public Map<String, IQueue<Record<?>>> createShuffles(
             HazelcastInstance hazelcast, int pipelineId, int inputIndex) {
         Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
-        for (Map.Entry<String, SeaTunnelRowType> entry : inputRowType) {
-            String tableId = entry.getKey();
+        for (CatalogTable entry : catalogTables) {
+            String tableId = entry.getTableId().toTablePath().toString();
             String queueName = generateQueueName(pipelineId, inputIndex, 
tableId);
             IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
             // clear old data when job restore
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index c83ceade12..d38b5aacb7 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -23,16 +23,12 @@ import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.api.env.ParsingMode;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SupportDataSaveMode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceOptions;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.FactoryUtil;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -78,7 +74,6 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
@@ -303,31 +298,15 @@ public class MultipleTableJobConfigParser {
                         factoryId,
                         (factory) -> factory.createSource(null));
 
-        final List<CatalogTable> catalogTables = new ArrayList<>();
-        if (!fallback) {
-            List<CatalogTable> tables =
-                    CatalogTableUtil.getCatalogTables(sourceConfig, 
classLoader);
-            if (!tables.isEmpty()) {
-                catalogTables.addAll(tables);
-            }
-        }
-
-        if (fallback || catalogTables.isEmpty()) {
+        if (fallback) {
             Tuple2<CatalogTable, Action> tuple =
                     fallbackParser.parseSource(sourceConfig, jobConfig, 
tableId, parallelism);
             return new Tuple2<>(tableId, Collections.singletonList(tuple));
         }
 
-        if (readonlyConfig.get(SourceOptions.DAG_PARSING_MODE) == 
ParsingMode.SHARDING) {
-            CatalogTable shardingTable = catalogTables.get(0);
-            catalogTables.clear();
-            catalogTables.add(shardingTable);
-        }
-
         List<Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>>>
                 sources =
-                        FactoryUtil.createAndPrepareSource(
-                                catalogTables, readonlyConfig, classLoader, 
factoryId);
+                        FactoryUtil.createAndPrepareSource(readonlyConfig, 
classLoader, factoryId);
 
         Set<URL> factoryUrls =
                 getFactoryUrls(readonlyConfig, classLoader, 
TableSourceFactory.class, factoryId);
@@ -465,9 +444,26 @@ public class MultipleTableJobConfigParser {
 
     public static SeaTunnelDataType<?> getProducedType(Action action) {
         if (action instanceof SourceAction) {
-            return ((SourceAction<?, ?, ?>) 
action).getSource().getProducedType();
+            try {
+                return ((SourceAction<?, ?, ?>) action)
+                        .getSource()
+                        .getProducedCatalogTables()
+                        .get(0)
+                        .getSeaTunnelRowType();
+            } catch (UnsupportedOperationException e) {
+                // TODO remove it when all connector use 
`getProducedCatalogTables`
+                return ((SourceAction<?, ?, ?>) 
action).getSource().getProducedType();
+            }
         } else if (action instanceof TransformAction) {
-            return ((TransformAction) action).getTransform().getProducedType();
+            try {
+                return ((TransformAction) action)
+                        .getTransform()
+                        .getProducedCatalogTable()
+                        .getSeaTunnelRowType();
+            } catch (UnsupportedOperationException e) {
+                // TODO remove it when all connector use 
`getProducedCatalogTables`
+                return ((TransformAction) 
action).getTransform().getProducedType();
+            }
         }
         throw new UnsupportedOperationException();
     }
@@ -534,13 +530,6 @@ public class MultipleTableJobConfigParser {
             return fallbackParser.parseSinks(configIndex, inputVertices, 
sinkConfig, jobConfig);
         }
 
-        Map<TablePath, CatalogTable> tableMap =
-                CatalogTableUtil.getCatalogTables(sinkConfig, 
classLoader).stream()
-                        .collect(
-                                Collectors.toMap(
-                                        catalogTable -> 
catalogTable.getTableId().toTablePath(),
-                                        catalogTable -> catalogTable));
-
         // get factory urls
         Set<URL> factoryUrls =
                 getFactoryUrls(readonlyConfig, classLoader, 
TableSinkFactory.class, factoryId);
@@ -558,7 +547,6 @@ public class MultipleTableJobConfigParser {
             SinkAction<?, ?, ?, ?> sinkAction =
                     createSinkAction(
                             inputActionSample._1(),
-                            tableMap,
                             inputActions,
                             readonlyConfig,
                             classLoader,
@@ -575,7 +563,6 @@ public class MultipleTableJobConfigParser {
             SinkAction<?, ?, ?, ?> sinkAction =
                     createSinkAction(
                             tuple._1(),
-                            tableMap,
                             Collections.singleton(tuple._2()),
                             readonlyConfig,
                             classLoader,
@@ -590,7 +577,6 @@ public class MultipleTableJobConfigParser {
 
     private SinkAction<?, ?, ?, ?> createSinkAction(
             CatalogTable catalogTable,
-            Map<TablePath, CatalogTable> sinkTableMap,
             Set<Action> inputActions,
             ReadonlyConfig readonlyConfig,
             ClassLoader classLoader,
@@ -598,17 +584,6 @@ public class MultipleTableJobConfigParser {
             String factoryId,
             int parallelism,
             int configIndex) {
-        Optional<CatalogTable> insteadTable;
-        if (sinkTableMap.size() == 1) {
-            insteadTable = sinkTableMap.values().stream().findFirst();
-        } else {
-            // TODO: another table full name map
-            insteadTable =
-                    
Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
-        }
-        if (insteadTable.isPresent()) {
-            catalogTable = insteadTable.get();
-        }
         SeaTunnelSink<?, ?, ?, ?> sink =
                 FactoryUtil.createAndPrepareSink(
                         catalogTable, readonlyConfig, classLoader, factoryId);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index d7beaf3a32..7e23c0b123 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -17,9 +17,7 @@
 
 package org.apache.seatunnel.engine.server.dag.execution;
 
-import org.apache.seatunnel.api.table.type.MultipleRowType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -216,8 +214,12 @@ public class ExecutionPlanGenerator {
         }
         ExecutionVertex sourceExecutionVertex = 
sourceExecutionVertices.stream().findFirst().get();
         SourceAction sourceAction = (SourceAction) 
sourceExecutionVertex.getAction();
-        SeaTunnelDataType sourceProducedType = 
sourceAction.getSource().getProducedType();
-        if (!SqlType.MULTIPLE_ROW.equals(sourceProducedType.getSqlType())) {
+        List<CatalogTable> producedCatalogTables = new ArrayList<>();
+        try {
+            producedCatalogTables = 
sourceAction.getSource().getProducedCatalogTables();
+        } catch (UnsupportedOperationException e) {
+        }
+        if (producedCatalogTables.size() <= 1) {
             return executionEdges;
         }
 
@@ -234,7 +236,7 @@ public class ExecutionPlanGenerator {
                 ShuffleMultipleRowStrategy.builder()
                         .jobId(jobImmutableInformation.getJobId())
                         .inputPartitions(sourceAction.getParallelism())
-                        
.inputRowType(MultipleRowType.class.cast(sourceProducedType))
+                        .catalogTables(producedCatalogTables)
                         .queueEmptyQueueTtl((int) 
(checkpointConfig.getCheckpointInterval() * 3))
                         .build();
         ShuffleConfig shuffleConfig =
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 5d0f571d49..25f1e850f5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.engine.server.dag.physical;
 
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
 import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -67,7 +66,6 @@ import lombok.NonNull;
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -323,11 +321,6 @@ public class PhysicalPlanGenerator {
                                     SinkAction sinkAction = (SinkAction) 
sinkFlow.getAction();
                                     String sinkTableId =
                                             
sinkAction.getConfig().getMultipleRowTableId();
-                                    MultipleRowType multipleRowType =
-                                            
shuffleMultipleRowStrategy.getInputRowType();
-                                    int sinkTableIndex =
-                                            
Arrays.asList(multipleRowType.getTableIds())
-                                                    .indexOf(sinkTableId);
 
                                     long taskIDPrefix = 
idGenerator.getNextId();
                                     long taskGroupIDPrefix = 
idGenerator.getNextId();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 80a0dff04e..2a4a129adb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -21,6 +21,9 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
@@ -73,13 +76,22 @@ public class SourceSeaTunnelTask<T, SplitT extends 
SourceSplit> extends SeaTunne
                     "SourceSeaTunnelTask only support SourceFlowLifeCycle, but 
get "
                             + startFlowLifeCycle.getClass().getName());
         } else {
+            SeaTunnelDataType sourceProducedType;
+            try {
+                List<CatalogTable> producedCatalogTables =
+                        
sourceFlow.getAction().getSource().getProducedCatalogTables();
+                sourceProducedType = 
CatalogTableUtil.convertToDataType(producedCatalogTables);
+            } catch (UnsupportedOperationException e) {
+                // TODO remove it when all connector use 
`getProducedCatalogTables`
+                sourceProducedType = 
sourceFlow.getAction().getSource().getProducedType();
+            }
             this.collector =
                     new SeaTunnelSourceCollector<>(
                             checkpointLock,
                             outputs,
                             this.getMetricsContext(),
                             getFlowControlStrategy(),
-                            
sourceFlow.getAction().getSource().getProducedType());
+                            sourceProducedType);
             ((SourceFlowLifeCycle<T, SplitT>) 
startFlowLifeCycle).setCollector(collector);
         }
     }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
index 2271fa641b..d8d8a97962 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
@@ -21,11 +21,10 @@ import 
org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 
 import com.google.auto.service.AutoService;
-import lombok.NonNull;
 
 @AutoService(Factory.class)
 public class CopyFieldTransformFactory implements TableTransformFactory {
@@ -43,9 +42,9 @@ public class CopyFieldTransformFactory implements 
TableTransformFactory {
     }
 
     @Override
-    public TableTransform createTransform(@NonNull TableFactoryContext 
context) {
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
         CopyTransformConfig copyTransformConfig = 
CopyTransformConfig.of(context.getOptions());
-        CatalogTable catalogTable = context.getCatalogTable();
+        CatalogTable catalogTable = context.getCatalogTables().get(0);
         return () -> new CopyFieldTransform(copyTransformConfig, catalogTable);
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java
index 9e6334516b..b7382175ba 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java
@@ -22,8 +22,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 
 import com.google.auto.service.AutoService;
 
@@ -40,8 +40,8 @@ public class FieldMapperTransformFactory implements 
TableTransformFactory {
     }
 
     @Override
-    public TableTransform createTransform(TableFactoryContext context) {
-        CatalogTable catalogTable = context.getCatalogTable();
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
+        CatalogTable catalogTable = context.getCatalogTables().get(0);
         ReadonlyConfig options = context.getOptions();
         FieldMapperTransformConfig fieldMapperTransformConfig =
                 FieldMapperTransformConfig.of(options);
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
index 14259a0a85..f562a7cc28 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 
 import com.google.auto.service.AutoService;
 
@@ -41,8 +41,8 @@ public class FilterFieldTransformFactory implements 
TableTransformFactory {
     }
 
     @Override
-    public TableTransform createTransform(TableFactoryContext context) {
-        CatalogTable catalogTable = context.getCatalogTable();
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
+        CatalogTable catalogTable = context.getCatalogTables().get(0);
         return () -> new FilterFieldTransform(context.getOptions(), 
catalogTable);
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
index 9e89ebe5e2..2191e30bc5 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 
 import com.google.auto.service.AutoService;
 
@@ -43,8 +43,8 @@ public class FilterRowKindTransformFactory implements 
TableTransformFactory {
     }
 
     @Override
-    public TableTransform createTransform(TableFactoryContext context) {
-        CatalogTable catalogTable = context.getCatalogTable();
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
+        CatalogTable catalogTable = context.getCatalogTables().get(0);
         return () -> new FilterRowKindTransform(context.getOptions(), 
catalogTable);
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
index 25696ba6e6..c0bed8977d 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 
 import com.google.auto.service.AutoService;
 
@@ -49,8 +49,8 @@ public class ReplaceTransformFactory implements 
TableTransformFactory {
     }
 
     @Override
-    public TableTransform createTransform(TableFactoryContext context) {
-        CatalogTable catalogTable = context.getCatalogTable();
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
+        CatalogTable catalogTable = context.getCatalogTables().get(0);
         return () -> new ReplaceTransform(context.getOptions(), catalogTable);
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
index 91281251e9..32d660860b 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
@@ -21,11 +21,10 @@ import 
org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 
 import com.google.auto.service.AutoService;
-import lombok.NonNull;
 
 @AutoService(Factory.class)
 public class SplitTransformFactory implements TableTransformFactory {
@@ -45,9 +44,9 @@ public class SplitTransformFactory implements 
TableTransformFactory {
     }
 
     @Override
-    public TableTransform createTransform(@NonNull TableFactoryContext 
context) {
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
         SplitTransformConfig splitTransformConfig = 
SplitTransformConfig.of(context.getOptions());
-        CatalogTable catalogTable = context.getCatalogTable();
+        CatalogTable catalogTable = context.getCatalogTables().get(0);
         return () -> new SplitTransform(splitTransformConfig, catalogTable);
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
index f509af832c..5c4abf53c0 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
 
 import com.google.auto.service.AutoService;
 
@@ -41,8 +41,8 @@ public class SQLTransformFactory implements 
TableTransformFactory {
     }
 
     @Override
-    public TableTransform createTransform(TableFactoryContext context) {
-        CatalogTable catalogTable = context.getCatalogTable();
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
+        CatalogTable catalogTable = context.getCatalogTables().get(0);
         return () -> new SQLTransform(context.getOptions(), catalogTable);
     }
 }


Reply via email to