This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
     new b30064d05 [Feature][linkis-engineplugin-spark] add a new Executor 
(json ETL) (#3715)
b30064d05 is described below

commit b30064d0599a177d952e0d5093e61b788c9e740d
Author: rarexixi <[email protected]>
AuthorDate: Sun Oct 30 00:17:12 2022 +0800

    [Feature][linkis-engineplugin-spark] add a new Executor (json ETL) (#3715)
---
 .../linkis/common/utils/CodeAndRunTypeUtils.scala  |   2 +-
 .../manager/label/entity/engine/RunType.scala      |   2 +
 linkis-engineconn-plugins/spark/pom.xml            |  23 ++
 .../exception/ConfigRuntimeException.java}         |  28 ++-
 .../exception/DatabaseNotConfigException.java}     |  28 ++-
 .../datacalc/exception/HiveSinkException.java}     |  28 ++-
 .../spark/datacalc/model/DataCalcArrayData.java}   |  30 +--
 .../spark/datacalc/model/DataCalcDataConfig.java}  |  44 ++--
 .../spark/datacalc/model/DataCalcDataSource.java   |  77 +++++++
 .../spark/datacalc/model/DataCalcGroupData.java    |  57 +++++
 .../datacalc/model/DataCalcPluginConfig.java}      |  28 ++-
 .../spark/datacalc/model/ResultTableConfig.java}   |  23 +-
 .../spark/datacalc/model/SinkConfig.java           |  52 +++++
 .../spark/datacalc/model/SourceConfig.java         |  58 +++++
 .../spark/datacalc/model/TransformConfig.java      |  68 ++++++
 .../datacalc/service/LinkisDataSourceService.java  |  52 +++++
 .../spark/datacalc/sink/FileSinkConfig.java        |  91 ++++++++
 .../spark/datacalc/sink/HiveSinkConfig.java        | 105 +++++++++
 .../spark/datacalc/sink/JdbcSinkConfig.java        | 136 ++++++++++++
 .../spark/datacalc/sink/ManagedJdbcSinkConfig.java | 106 +++++++++
 .../spark/datacalc/source/FileSourceConfig.java    |  73 +++++++
 .../spark/datacalc/source/JdbcSourceConfig.java    |  87 ++++++++
 .../datacalc/source/ManagedJdbcSourceConfig.java}  |  48 +++--
 .../datacalc/transform/SqlTransformConfig.java}    |  26 ++-
 .../spark/datacalc/util/PluginUtil.java            |  95 +++++++++
 .../spark/errorcode/SparkErrorCodeSummary.java     |  18 +-
 .../engineplugin/spark/common/SparkKind.scala      |   5 +
 .../spark/datacalc/DataCalcExecution.scala         | 237 +++++++++++++++++++++
 .../spark/datacalc/DataCalcTempData.scala          |  49 +++++
 .../api/DataCalcPlugin.scala}                      |  25 +--
 .../api/DataCalcSink.scala}                        |  23 +-
 .../api/DataCalcSource.scala}                      |  23 +-
 .../api/DataCalcTransform.scala}                   |  23 +-
 .../spark/datacalc/sink/FileSink.scala             |  58 +++++
 .../spark/datacalc/sink/HiveSink.scala             | 216 +++++++++++++++++++
 .../spark/datacalc/sink/JdbcSink.scala             | 102 +++++++++
 .../spark/datacalc/sink/ManagedJdbcSink.scala      |  68 ++++++
 .../spark/datacalc/source/FileSource.scala         |  55 +++++
 .../spark/datacalc/source/JdbcSource.scala         |  49 +++++
 .../spark/datacalc/source/ManagedJdbcSource.scala  |  62 ++++++
 .../transform/SqlTransform.scala}                  |  26 ++-
 .../spark/exception/NoSupportEngineException.scala |   2 +
 .../spark/executor/SparkDataCalcExecutor.scala     | 110 ++++++++++
 .../spark/executor/SparkEngineConnExecutor.scala   |   7 +-
 .../factory/SparkDataCalcExecutorFactory.scala     |  50 +++++
 .../spark/factory/SparkEngineConnFactory.scala     |   3 +-
 .../spark/datacalc/TestDataCalcPlugins.scala       | 215 +++++++++++++++++++
 47 files changed, 2573 insertions(+), 220 deletions(-)

diff --git 
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala
 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala
index 9c4f552d9..d00be6d3a 100644
--- 
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala
+++ 
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala
@@ -26,7 +26,7 @@ object CodeAndRunTypeUtils {
 
   val CODE_TYPE_AND_RUN_TYPE_RELATION = CommonVars(
     "wds.linkis.codeType.runType.relation",
-    
"sql=>sql|hql|jdbc|hive|psql|fql|tsql,python=>python|py|pyspark,java=>java,scala=>scala,shell=>sh|shell"
+    
"sql=>sql|hql|jdbc|hive|psql|fql|tsql,python=>python|py|pyspark,java=>java,scala=>scala,shell=>sh|shell,json=>json|data_calc"
   )
 
   val RUN_TYPE_SQL = "sql"
diff --git 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
index 33601c725..3d53c998d 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
+++ 
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
@@ -43,4 +43,6 @@ object RunType extends Enumeration {
 
   val TRINO_SQL = Value("tsql")
 
+  val DATA_CALC = Value("data_calc") // spark datacalc (ETL)
+
 }
diff --git a/linkis-engineconn-plugins/spark/pom.xml 
b/linkis-engineconn-plugins/spark/pom.xml
index 78743bf93..811d87b12 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -383,6 +383,29 @@
       <version>4.0.1</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.hibernate.validator</groupId>
+      <artifactId>hibernate-validator</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.linkis</groupId>
+      <artifactId>linkis-datasource-client</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>javax.servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/ConfigRuntimeException.java
similarity index 50%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/ConfigRuntimeException.java
index ecb62db6a..71b83bfc9 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/ConfigRuntimeException.java
@@ -15,23 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.exception;
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.common.exception.ExceptionLevel;
+import org.apache.linkis.common.exception.LinkisRuntimeException;
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+public class ConfigRuntimeException extends LinkisRuntimeException {
 
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+  public ConfigRuntimeException(int errCode, String desc) {
+    super(errCode, desc);
+  }
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+  @Override
+  public ExceptionLevel getLevel() {
+    return ExceptionLevel.ERROR;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/DatabaseNotConfigException.java
similarity index 50%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/DatabaseNotConfigException.java
index ecb62db6a..442b32d84 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/DatabaseNotConfigException.java
@@ -15,23 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.exception;
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.common.exception.ExceptionLevel;
+import org.apache.linkis.common.exception.LinkisRuntimeException;
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+public class DatabaseNotConfigException extends LinkisRuntimeException {
 
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+  public DatabaseNotConfigException(int errCode, String desc) {
+    super(errCode, desc);
+  }
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+  @Override
+  public ExceptionLevel getLevel() {
+    return ExceptionLevel.ERROR;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/HiveSinkException.java
similarity index 50%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/HiveSinkException.java
index ecb62db6a..ff21abf27 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/HiveSinkException.java
@@ -15,23 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.exception;
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.common.exception.ExceptionLevel;
+import org.apache.linkis.common.exception.LinkisRuntimeException;
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+public class HiveSinkException extends LinkisRuntimeException {
 
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+  public HiveSinkException(int errCode, String desc) {
+    super(errCode, desc);
+  }
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+  @Override
+  public ExceptionLevel getLevel() {
+    return ExceptionLevel.ERROR;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcArrayData.java
similarity index 51%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcArrayData.java
index ecb62db6a..297bec358 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcArrayData.java
@@ -15,23 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.model;
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.server.BDPJettyServerHelper;
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
+import java.io.Serializable;
+
+public class DataCalcArrayData extends DataCalcPluginConfig implements 
Serializable {
 
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
+  private DataCalcDataConfig[] plugins;
 
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+  public DataCalcDataConfig[] getPlugins() {
+    return plugins;
+  }
 
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+  public void setPlugins(DataCalcDataConfig[] plugins) {
+    this.plugins = plugins;
+  }
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+  public static DataCalcArrayData getData(String data) {
+    return BDPJettyServerHelper.gson().fromJson(data, DataCalcArrayData.class);
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcDataConfig.java
similarity index 51%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcDataConfig.java
index ecb62db6a..9a01aaa67 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcDataConfig.java
@@ -15,23 +15,39 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.model;
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import java.io.Serializable;
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
+import com.google.gson.JsonElement;
+
+public class DataCalcDataConfig implements Serializable {
+
+  private String type;
+  private String name;
+  private JsonElement config;
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
 
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
+  public String getName() {
+    return name;
+  }
 
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+  public void setName(String name) {
+    this.name = name;
+  }
 
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+  public JsonElement getConfig() {
+    return config;
+  }
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+  public void setConfig(JsonElement config) {
+    this.config = config;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcDataSource.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcDataSource.java
new file mode 100644
index 000000000..7e1ae1462
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcDataSource.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.model;
+
+import java.io.Serializable;
+
+public class DataCalcDataSource implements Serializable {
+  private String typeName;
+  private String driver;
+  private String url;
+  private String databaseName;
+  private String user;
+  private String password;
+
+  public String getTypeName() {
+    return typeName;
+  }
+
+  public void setTypeName(String typeName) {
+    this.typeName = typeName;
+  }
+
+  public String getDriver() {
+    return driver;
+  }
+
+  public void setDriver(String driver) {
+    this.driver = driver;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  public void setUrl(String url) {
+    this.url = url;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public void setDatabaseName(String databaseName) {
+    this.databaseName = databaseName;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcGroupData.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcGroupData.java
new file mode 100644
index 000000000..ec8c862d3
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcGroupData.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.model;
+
+import org.apache.linkis.server.BDPJettyServerHelper;
+
+import java.io.Serializable;
+
+public class DataCalcGroupData extends DataCalcPluginConfig implements 
Serializable {
+
+  private DataCalcDataConfig[] sources;
+  private DataCalcDataConfig[] transformations;
+  private DataCalcDataConfig[] sinks;
+
+  public DataCalcDataConfig[] getSources() {
+    return sources;
+  }
+
+  public void setSources(DataCalcDataConfig[] sources) {
+    this.sources = sources;
+  }
+
+  public DataCalcDataConfig[] getTransformations() {
+    return transformations;
+  }
+
+  public void setTransformations(DataCalcDataConfig[] transformations) {
+    this.transformations = transformations;
+  }
+
+  public DataCalcDataConfig[] getSinks() {
+    return sinks;
+  }
+
+  public void setSinks(DataCalcDataConfig[] sinks) {
+    this.sinks = sinks;
+  }
+
+  public static DataCalcGroupData getData(String data) {
+    return BDPJettyServerHelper.gson().fromJson(data, DataCalcGroupData.class);
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcPluginConfig.java
similarity index 50%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcPluginConfig.java
index ecb62db6a..7e2085ff9 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcPluginConfig.java
@@ -15,23 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.model;
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
+public abstract class DataCalcPluginConfig implements Serializable {
 
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+  protected Map<String, String> variables = new HashMap<>();
 
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+  public Map<String, String> getVariables() {
+    return variables;
+  }
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+  public void setVariables(Map<String, String> variables) {
+    if (variables != null) this.variables = variables;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/ResultTableConfig.java
similarity index 50%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/ResultTableConfig.java
index ecb62db6a..4919444e7 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/ResultTableConfig.java
@@ -15,23 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.model;
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import java.io.Serializable;
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+public interface ResultTableConfig extends Serializable {
+  String getResultTable();
 
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+  Boolean getPersist();
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+  String getStorageLevel();
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/SinkConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/SinkConfig.java
new file mode 100644
index 000000000..735d4deb4
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/SinkConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.model;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.AssertTrue;
+
+import java.io.Serializable;
+
+public abstract class SinkConfig extends DataCalcPluginConfig implements 
Serializable {
+
+  protected String sourceTable;
+
+  protected String sourceQuery;
+
+  public String getSourceTable() {
+    return sourceTable;
+  }
+
+  public void setSourceTable(String sourceTable) {
+    this.sourceTable = sourceTable;
+  }
+
+  public String getSourceQuery() {
+    return sourceQuery;
+  }
+
+  public void setSourceQuery(String sourceQuery) {
+    this.sourceQuery = sourceQuery;
+  }
+
+  @AssertTrue(message = "[sourceTable, sourceQuery] cannot be blank at the 
same time.")
+  public boolean isSourceOK() {
+    return StringUtils.isNotBlank(sourceTable) || 
StringUtils.isNotBlank(sourceQuery);
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/SourceConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/SourceConfig.java
new file mode 100644
index 000000000..31cad254d
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/SourceConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.model;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotBlank;
+
+import java.io.Serializable;
+
+public abstract class SourceConfig extends DataCalcPluginConfig
+    implements ResultTableConfig, Serializable {
+
+  @NotBlank protected String resultTable;
+
+  private Boolean persist = false;
+
+  private String storageLevel = "MEMORY_AND_DISK";
+
+  public String getResultTable() {
+    return resultTable;
+  }
+
+  public void setResultTable(String resultTable) {
+    this.resultTable = resultTable;
+  }
+
+  public Boolean getPersist() {
+    return persist;
+  }
+
+  public void setPersist(Boolean persist) {
+    this.persist = persist;
+  }
+
+  public String getStorageLevel() {
+    return storageLevel;
+  }
+
+  public void setStorageLevel(String storageLevel) {
+    if (StringUtils.isNotBlank(storageLevel)) this.storageLevel = storageLevel;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/TransformConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/TransformConfig.java
new file mode 100644
index 000000000..d2ed12297
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/TransformConfig.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.model;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotBlank;
+
+import java.io.Serializable;
+
+public abstract class TransformConfig extends DataCalcPluginConfig
+    implements ResultTableConfig, Serializable {
+
+  protected String sourceTable;
+
+  @NotBlank protected String resultTable;
+
+  private Boolean persist = false;
+
+  private String storageLevel = "MEMORY_AND_DISK";
+
+  public String getSourceTable() {
+    return sourceTable;
+  }
+
+  public void setSourceTable(String sourceTable) {
+    this.sourceTable = sourceTable;
+  }
+
+  public String getResultTable() {
+    return resultTable;
+  }
+
+  public void setResultTable(String resultTable) {
+    this.resultTable = resultTable;
+  }
+
+  public Boolean getPersist() {
+    return persist;
+  }
+
+  public void setPersist(Boolean persist) {
+    this.persist = persist;
+  }
+
+  public String getStorageLevel() {
+    return storageLevel;
+  }
+
+  public void setStorageLevel(String storageLevel) {
+    if (StringUtils.isNotBlank(storageLevel)) this.storageLevel = storageLevel;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/service/LinkisDataSourceService.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/service/LinkisDataSourceService.java
new file mode 100644
index 000000000..f66d15d0a
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/service/LinkisDataSourceService.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.service;
+
+import org.apache.linkis.datasource.client.impl.LinkisDataSourceRemoteClient;
+import 
org.apache.linkis.datasource.client.request.GetInfoPublishedByDataSourceNameAction;
+import org.apache.linkis.datasourcemanager.common.domain.DataSource;
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcDataSource;
+import org.apache.linkis.storage.utils.StorageUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LinkisDataSourceService {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(LinkisDataSourceService.class);
+
+  private static final LinkisDataSourceRemoteClient dataSourceClient =
+      new LinkisDataSourceRemoteClient();
+
+  public static DataCalcDataSource getDatasource(String datasourceName) {
+    GetInfoPublishedByDataSourceNameAction action =
+        GetInfoPublishedByDataSourceNameAction.builder()
+            .setDataSourceName(datasourceName)
+            .setUser(StorageUtils.getJvmUser())
+            .build(); // ignore parameter 'system'
+    DataSource datasource =
+        
dataSourceClient.getInfoPublishedByDataSourceName(action).getDataSource();
+    datasource.getConnectParams();
+    return transform(datasource);
+  }
+
+  private static DataCalcDataSource transform(DataSource datasource) {
+    DataCalcDataSource ds = new DataCalcDataSource();
+    return ds;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/FileSinkConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/FileSinkConfig.java
new file mode 100644
index 000000000..22ac537d8
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/FileSinkConfig.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FileSinkConfig extends SinkConfig {
+
+  @NotBlank
+  @Pattern(
+      regexp = "^(file|hdfs)://.*",
+      message =
+          "Invalid path URI, please set the following allowed schemas: 
'file://' or 'hdfs://'.")
+  private String path;
+
+  @NotBlank private String serializer = "parquet";
+
+  private List<String> partitionBy = new ArrayList<>();
+
+  @NotBlank
+  @Pattern(
+      regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+      message =
+          "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 
'append', 'ignore', 'error', 'errorifexists'.")
+  private String saveMode = "overwrite";
+
+  private Map<String, String> options = new HashMap<>();
+
+  public String getPath() {
+    return path;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
+
+  public String getSerializer() {
+    return serializer;
+  }
+
+  public void setSerializer(String serializer) {
+    this.serializer = serializer;
+  }
+
+  public List<String> getPartitionBy() {
+    return partitionBy;
+  }
+
+  public void setPartitionBy(List<String> partitionBy) {
+    this.partitionBy = partitionBy;
+  }
+
+  public String getSaveMode() {
+    return saveMode;
+  }
+
+  public void setSaveMode(String saveMode) {
+    this.saveMode = saveMode;
+  }
+
+  public Map<String, String> getOptions() {
+    return options;
+  }
+
+  public void setOptions(Map<String, String> options) {
+    this.options = options;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSinkConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSinkConfig.java
new file mode 100644
index 000000000..56589de24
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSinkConfig.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class HiveSinkConfig extends SinkConfig {
+
+  private String targetDatabase;
+
+  @NotBlank private String targetTable;
+
+  @NotBlank
+  @Pattern(
+      regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+      message =
+          "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 
'append', 'ignore', 'error', 'errorifexists'.")
+  private String saveMode = "overwrite";
+
+  private Boolean strongCheck = true;
+
+  private Boolean writeAsFile = false;
+
+  private Integer numPartitions = 10;
+
+  private Map<String, String> options = new HashMap<>();
+
+  public String getTargetDatabase() {
+    return targetDatabase;
+  }
+
+  public void setTargetDatabase(String targetDatabase) {
+    this.targetDatabase = targetDatabase;
+  }
+
+  public String getTargetTable() {
+    return targetTable;
+  }
+
+  public void setTargetTable(String targetTable) {
+    this.targetTable = targetTable;
+  }
+
+  public String getSaveMode() {
+    return saveMode;
+  }
+
+  public void setSaveMode(String saveMode) {
+    this.saveMode = saveMode;
+  }
+
+  public Boolean getStrongCheck() {
+    return strongCheck;
+  }
+
+  public void setStrongCheck(Boolean strongCheck) {
+    this.strongCheck = strongCheck;
+  }
+
+  public Boolean getWriteAsFile() {
+    return writeAsFile;
+  }
+
+  public void setWriteAsFile(Boolean writeAsFile) {
+    this.writeAsFile = writeAsFile;
+  }
+
+  public Integer getNumPartitions() {
+    return numPartitions;
+  }
+
+  public void setNumPartitions(Integer numPartitions) {
+    if (numPartitions == null) return;
+    this.numPartitions = numPartitions > 20 ? 20 : numPartitions;
+  }
+
+  public Map<String, String> getOptions() {
+    return options;
+  }
+
+  public void setOptions(Map<String, String> options) {
+    this.options = options;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSinkConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSinkConfig.java
new file mode 100644
index 000000000..21f9aba59
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSinkConfig.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class JdbcSinkConfig extends SinkConfig {
+
+  @NotBlank private String url;
+
+  @NotBlank private String driver;
+
+  @NotBlank private String user;
+
+  @NotBlank private String password;
+
+  private String targetDatabase;
+
+  @NotBlank private String targetTable;
+
+  @NotBlank
+  @Pattern(
+      regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+      message =
+          "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 
'append', 'ignore', 'error', 'errorifexists'.")
+  private String saveMode = "overwrite";
+
+  private List<String> preQueries = new ArrayList<>();
+
+  private Integer numPartitions = 10;
+
+  private Map<String, String> options;
+
+  public String getUrl() {
+    return url;
+  }
+
+  public void setUrl(String url) {
+    this.url = url;
+  }
+
+  public String getDriver() {
+    return driver;
+  }
+
+  public void setDriver(String driver) {
+    this.driver = driver;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public String getTargetDatabase() {
+    return targetDatabase;
+  }
+
+  public void setTargetDatabase(String targetDatabase) {
+    this.targetDatabase = targetDatabase;
+  }
+
+  public String getTargetTable() {
+    return targetTable;
+  }
+
+  public void setTargetTable(String targetTable) {
+    this.targetTable = targetTable;
+  }
+
+  public String getSaveMode() {
+    return saveMode;
+  }
+
+  public void setSaveMode(String saveMode) {
+    this.saveMode = saveMode;
+  }
+
+  public List<String> getPreQueries() {
+    return preQueries;
+  }
+
+  public void setPreQueries(List<String> preQueries) {
+    this.preQueries = preQueries;
+  }
+
+  public Integer getNumPartitions() {
+    return numPartitions;
+  }
+
+  public void setNumPartitions(Integer numPartitions) {
+    if (numPartitions == null) return;
+    this.numPartitions = numPartitions > 20 ? 20 : numPartitions;
+  }
+
+  public Map<String, String> getOptions() {
+    return options;
+  }
+
+  public void setOptions(Map<String, String> options) {
+    this.options = options;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ManagedJdbcSinkConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ManagedJdbcSinkConfig.java
new file mode 100644
index 000000000..3ccec8287
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ManagedJdbcSinkConfig.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ManagedJdbcSinkConfig extends SinkConfig {
+
+  @NotBlank private String targetDatasource;
+
+  private String targetDatabase;
+
+  @NotBlank private String targetTable;
+
+  @NotBlank
+  @Pattern(
+      regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+      message =
+          "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite', 
'append', 'ignore', 'error', 'errorifexists'.")
+  private String saveMode = "overwrite";
+
+  private List<String> preQueries = new ArrayList<>();
+
+  private Integer numPartitions = 10;
+
+  private Map<String, String> options;
+
+  public String getTargetDatasource() {
+    return targetDatasource;
+  }
+
+  public void setTargetDatasource(String targetDatasource) {
+    this.targetDatasource = targetDatasource;
+  }
+
+  public String getTargetDatabase() {
+    return targetDatabase;
+  }
+
+  public void setTargetDatabase(String targetDatabase) {
+    this.targetDatabase = targetDatabase;
+  }
+
+  public String getTargetTable() {
+    return targetTable;
+  }
+
+  public void setTargetTable(String targetTable) {
+    this.targetTable = targetTable;
+  }
+
+  public String getSaveMode() {
+    return saveMode;
+  }
+
+  public void setSaveMode(String saveMode) {
+    this.saveMode = saveMode;
+  }
+
+  public List<String> getPreQueries() {
+    return preQueries;
+  }
+
+  public void setPreQueries(List<String> preQueries) {
+    this.preQueries = preQueries;
+  }
+
+  public Integer getNumPartitions() {
+    return numPartitions;
+  }
+
+  public void setNumPartitions(Integer numPartitions) {
+    if (numPartitions == null) return;
+    this.numPartitions = numPartitions > 20 ? 20 : numPartitions;
+  }
+
+  public Map<String, String> getOptions() {
+    return options;
+  }
+
+  public void setOptions(Map<String, String> options) {
+    this.options = options;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/FileSourceConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/FileSourceConfig.java
new file mode 100644
index 000000000..7e83bdd82
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/FileSourceConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+import java.util.Map;
+
+public class FileSourceConfig extends SourceConfig {
+
+  @NotBlank
+  @Pattern(
+      regexp = "^(file|hdfs)://.*",
+      message =
+          "Invalid path URI, please set the following allowed schemas: 
'file://' or 'hdfs://'.")
+  private String path;
+
+  @NotBlank private String serializer = "parquet";
+
+  private String[] columnNames;
+
+  private Map<String, String> options;
+
+  public String getPath() {
+    return path;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
+
+  public String getSerializer() {
+    return serializer;
+  }
+
+  public void setSerializer(String serializer) {
+    this.serializer = serializer;
+  }
+
+  public String[] getColumnNames() {
+    return columnNames;
+  }
+
+  public void setColumnNames(String[] columnNames) {
+    this.columnNames = columnNames;
+  }
+
+  public Map<String, String> getOptions() {
+    return options;
+  }
+
+  public void setOptions(Map<String, String> options) {
+    this.options = options;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/JdbcSourceConfig.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/JdbcSourceConfig.java
new file mode 100644
index 000000000..fc13dfcb3
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/JdbcSourceConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+
+import java.util.Map;
+
+public class JdbcSourceConfig extends SourceConfig {
+
+  @NotBlank private String url;
+
+  @NotBlank private String driver;
+
+  @NotBlank private String user;
+
+  @NotBlank private String password;
+
+  @NotBlank private String query;
+
+  private Map<String, String> options;
+
+  public String getUrl() {
+    return url;
+  }
+
+  public void setUrl(String url) {
+    this.url = url;
+  }
+
+  public String getDriver() {
+    return driver;
+  }
+
+  public void setDriver(String driver) {
+    this.driver = driver;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public String getQuery() {
+    return query;
+  }
+
+  public void setQuery(String query) {
+    this.query = query;
+  }
+
+  public Map<String, String> getOptions() {
+    return options;
+  }
+
+  public void setOptions(Map<String, String> options) {
+    this.options = options;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/ManagedJdbcSourceConfig.java
similarity index 50%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/ManagedJdbcSourceConfig.java
index ecb62db6a..57cab73e3 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/ManagedJdbcSourceConfig.java
@@ -15,23 +15,43 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.source;
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
+import javax.validation.constraints.NotBlank;
+
+import java.util.Map;
+
+public class ManagedJdbcSourceConfig extends SourceConfig {
+
+  @NotBlank private String datasource;
+
+  @NotBlank private String query;
+
+  private Map<String, String> options;
+
+  public String getDatasource() {
+    return datasource;
+  }
+
+  public void setDatasource(String datasource) {
+    this.datasource = datasource;
+  }
 
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
+  public String getQuery() {
+    return query;
+  }
 
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+  public void setQuery(String query) {
+    this.query = query;
+  }
 
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+  public Map<String, String> getOptions() {
+    return options;
+  }
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+  public void setOptions(Map<String, String> options) {
+    this.options = options;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/transform/SqlTransformConfig.java
similarity index 51%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/transform/SqlTransformConfig.java
index ecb62db6a..3515d4e61 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/transform/SqlTransformConfig.java
@@ -15,23 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.transform;
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.engineplugin.spark.datacalc.model.TransformConfig;
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
+import javax.validation.constraints.NotBlank;
 
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
+public class SqlTransformConfig extends TransformConfig {
 
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+  @NotBlank private String sql;
 
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+  public String getSql() {
+    return sql;
+  }
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+  public void setSql(String sql) {
+    this.sql = sql;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
new file mode 100644
index 000000000..5dfcd4e4a
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.util;
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.*;
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+import org.apache.linkis.engineplugin.spark.datacalc.model.TransformConfig;
+import org.apache.linkis.server.BDPJettyServerHelper;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.ParameterizedType;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.gson.JsonElement;
+
+public class PluginUtil {
+
+  private static final Map<String, Class<?>> SOURCE_PLUGINS = 
getSourcePlugins();
+  private static final Map<String, Class<?>> TRANSFORM_PLUGINS = 
getTransformPlugins();
+  private static final Map<String, Class<?>> SINK_PLUGINS = getSinkPlugins();
+
+  private static Map<String, Class<?>> getSourcePlugins() {
+    Map<String, Class<?>> classMap = new HashMap<>();
+    // classMap.put("managed_jdbc",
+    // 
org.apache.linkis.engineplugin.spark.datacalc.source.ManagedJdbcSource.class);
+    classMap.put("jdbc", 
org.apache.linkis.engineplugin.spark.datacalc.source.JdbcSource.class);
+    classMap.put("file", 
org.apache.linkis.engineplugin.spark.datacalc.source.FileSource.class);
+    return classMap;
+  }
+
+  private static Map<String, Class<?>> getTransformPlugins() {
+    Map<String, Class<?>> classMap = new HashMap<>();
+    classMap.put("sql", 
org.apache.linkis.engineplugin.spark.datacalc.transform.SqlTransform.class);
+    return classMap;
+  }
+
+  private static Map<String, Class<?>> getSinkPlugins() {
+    Map<String, Class<?>> classMap = new HashMap<>();
+    // classMap.put("managed_jdbc",
+    // 
org.apache.linkis.engineplugin.spark.datacalc.sink.ManagedJdbcSink.class);
+    classMap.put("jdbc", 
org.apache.linkis.engineplugin.spark.datacalc.sink.JdbcSink.class);
+    classMap.put("hive", 
org.apache.linkis.engineplugin.spark.datacalc.sink.HiveSink.class);
+    classMap.put("file", 
org.apache.linkis.engineplugin.spark.datacalc.sink.FileSink.class);
+    return classMap;
+  }
+
+  public static <T extends SourceConfig> DataCalcSource<T> createSource(
+      String name, JsonElement config)
+      throws InstantiationException, IllegalAccessException, 
InvocationTargetException,
+          NoSuchMethodException {
+    return createPlugin(SOURCE_PLUGINS, name, config);
+  }
+
+  public static <T extends TransformConfig> DataCalcTransform<T> 
createTransform(
+      String name, JsonElement config)
+      throws InstantiationException, IllegalAccessException, 
InvocationTargetException,
+          NoSuchMethodException {
+    return createPlugin(TRANSFORM_PLUGINS, name, config);
+  }
+
+  public static <T extends SinkConfig> DataCalcSink<T> createSink(String name, 
JsonElement config)
+      throws InstantiationException, IllegalAccessException, 
InvocationTargetException,
+          NoSuchMethodException {
+    return createPlugin(SINK_PLUGINS, name, config);
+  }
+
+  static <T extends DataCalcPlugin> T createPlugin(
+      Map<String, Class<?>> pluginMap, String name, JsonElement config)
+      throws InstantiationException, IllegalAccessException, 
NoSuchMethodException,
+          InvocationTargetException {
+    Class<?> type = pluginMap.get(name);
+    ParameterizedType genericSuperclass = (ParameterizedType) 
type.getGenericInterfaces()[0];
+    Class<?> configType = (Class<?>) 
genericSuperclass.getActualTypeArguments()[0];
+    T plugin = (T) type.getDeclaredConstructor().newInstance();
+    plugin.setConfig(BDPJettyServerHelper.gson().fromJson(config, configType));
+    return plugin;
+  }
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
index 0c1a9e21e..b41c09496 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
+++ 
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
@@ -40,7 +40,23 @@ public enum SparkErrorCodeSummary {
       "Invalid EngineConn engine session obj, failed to create sparkSql 
executor(EngineConn 引擎会话 obj 无效,无法创建 sparkSql 执行程序)"),
   INVALID_CREATE_SPARKPYTHON(
       420002,
-      "Invalid EngineConn engine session obj, failed to create sparkPython 
executor(EngineConn 引擎会话 obj 无效,无法创建 sparkPython 执行程序)");
+      "Invalid EngineConn engine session obj, failed to create sparkPython 
executor(EngineConn 引擎会话 obj 无效,无法创建 sparkPython 执行程序)"),
+
+  DATA_CALC_CONFIG_VALID_FAILED(43001, "Config data validate failed"),
+  DATA_CALC_CONFIG_TYPE_NOT_VALID(43002, "[{0}] is not a valid type"),
+
+  DATA_CALC_DATASOURCE_NOT_CONFIG(43011, "Datasource {0} is not configured!"),
+
+  DATA_CALC_COLUMN_NOT_MATCH(
+      43021,
+      "{0}st column ({1}[{2}]) name or data type does not match target table 
column ({3}[{4}])"),
+  DATA_CALC_COLUMN_NUM_NOT_MATCH(
+      43022,
+      "{0} requires that the data to be inserted have the same number of 
columns as the target table: target table has {1} column(s) but the inserted 
data has {2} column(s)"),
+  DATA_CALC_FIELD_NOT_EXIST(43023, "{0} columns({1}) are not exist in source 
columns"),
+  DATA_CALC_VARIABLE_NOT_EXIST(43024, "Please set [{0}] in variables"),
+  ;
+
   /** (errorCode)错误码 */
   private int errorCode;
   /** (errorDesc)错误描述 */
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
index 6ba8b210c..46e8b5def 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
@@ -37,6 +37,7 @@ object SparkKind {
   val SPARKMIX_TYPE = "sparkmix"
   val MIX_TYPE = "mix"
   val SPARKSQL_TYPE = "sparksql"
+  val SPARK_DATA_CALC_TYPE = "spark_data_calc"
   val SPARKMLSQL_TYPE = "mlsql"
   val FUNCTION_MDQ_TYPE = "function.mdq"
 
@@ -83,6 +84,10 @@ case class SparkSQL() extends Kind {
   override val toString: String = SparkKind.SPARKSQL_TYPE
 }
 
+case class SparkDataCalc() extends Kind {
+  override val toString: String = SparkKind.SPARK_DATA_CALC_TYPE
+}
+
 case class SparkMLSQL() extends Kind {
   override val toString = SparkKind.SPARKMLSQL_TYPE
 }
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/DataCalcExecution.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/DataCalcExecution.scala
new file mode 100644
index 000000000..5d2870138
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/DataCalcExecution.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.{
+  DataCalcPlugin,
+  DataCalcSink,
+  DataCalcSource,
+  DataCalcTransform
+}
+import 
org.apache.linkis.engineplugin.spark.datacalc.exception.ConfigRuntimeException
+import org.apache.linkis.engineplugin.spark.datacalc.model._
+import org.apache.linkis.engineplugin.spark.datacalc.util.PluginUtil
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+import org.apache.linkis.server.BDPJettyServerHelper
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
+
+import javax.validation.{Validation, Validator}
+
+import java.text.MessageFormat
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+import org.slf4j.{Logger, LoggerFactory}
+
+object DataCalcExecution {
+
+  private val log: Logger = LoggerFactory.getLogger(DataCalcExecution.getClass)
+
+  def getPlugins[SR <: SourceConfig, TR <: TransformConfig, SK <: SinkConfig](
+      mapleData: DataCalcGroupData
+  ): (Array[DataCalcSource[SR]], Array[DataCalcTransform[TR]], 
Array[DataCalcSink[SK]]) = {
+    val sources = mapleData.getSources.map(source =>
+      PluginUtil.createSource[SR](source.getName, source.getConfig)
+    )
+    val transformations = mapleData.getTransformations.map(sink =>
+      PluginUtil.createTransform[TR](sink.getName, sink.getConfig)
+    )
+    val sinks =
+      mapleData.getSinks.map(sink => PluginUtil.createSink[SK](sink.getName, 
sink.getConfig))
+
+    val checkResult = new CheckResult()
+    sources.foreach(source => {
+      source.getConfig.setVariables(mapleData.getVariables)
+      checkResult.checkResultTable(source)
+    })
+    transformations.foreach(transformation => {
+      transformation.getConfig.setVariables(mapleData.getVariables)
+      checkResult.checkResultTable(transformation)
+    })
+    sinks.foreach(sink => {
+      sink.getConfig.setVariables(mapleData.getVariables)
+      checkResult.checkPluginConfig(sink)
+    })
+    checkResult.check()
+
+    (sources, transformations, sinks)
+  }
+
+  def execute[SR <: SourceConfig, TR <: TransformConfig, SK <: SinkConfig](
+      spark: SparkSession,
+      sources: Array[DataCalcSource[SR]],
+      transformations: Array[DataCalcTransform[TR]],
+      sinks: Array[DataCalcSink[SK]]
+  ): Unit = {
+    if (sources != null && !sources.isEmpty) sources.foreach(source => 
sourceProcess(spark, source))
+    if (transformations != null && !transformations.isEmpty)
+      transformations.foreach(transformation => transformProcess(spark, 
transformation))
+    if (sinks != null && !sinks.isEmpty) sinks.foreach(sink => 
sinkProcess(spark, sink))
+
+    DataCalcTempData.clean(spark.sqlContext)
+  }
+
+  def getPlugins[SR <: SourceConfig, TR <: TransformConfig, SK <: SinkConfig, 
T <: Object](
+      mapleData: DataCalcArrayData
+  ): Array[Any] = {
+    val checkResult = new CheckResult()
+    val plugins = new Array[Any](mapleData.getPlugins.length)
+    for (i <- mapleData.getPlugins.indices) {
+      val config = mapleData.getPlugins()(i)
+      config.getType match {
+        case "source" =>
+          val source = PluginUtil.createSource[SR](config.getName, 
config.getConfig)
+          source.getConfig.setVariables(mapleData.getVariables)
+          checkResult.checkResultTable(source)
+          plugins(i) = source
+        case "transformation" =>
+          val transformation = PluginUtil.createTransform[TR](config.getName, 
config.getConfig)
+          transformation.getConfig.setVariables(mapleData.getVariables)
+          checkResult.checkResultTable(transformation)
+          plugins(i) = transformation
+        case "sink" =>
+          val sink = PluginUtil.createSink[SK](config.getName, 
config.getConfig)
+          sink.getConfig.setVariables(mapleData.getVariables)
+          checkResult.checkPluginConfig(sink)
+          plugins(i) = sink
+        case t: String =>
+          throw new ConfigRuntimeException(
+            SparkErrorCodeSummary.DATA_CALC_CONFIG_TYPE_NOT_VALID.getErrorCode,
+            MessageFormat.format(
+              
SparkErrorCodeSummary.DATA_CALC_CONFIG_TYPE_NOT_VALID.getErrorDesc,
+              t
+            )
+          )
+      }
+    }
+    checkResult.check()
+    plugins
+  }
+
+  def execute[SR <: SourceConfig, TR <: TransformConfig, SK <: SinkConfig, T 
<: Object](
+      spark: SparkSession,
+      plugins: Array[Any]
+  ): Unit = {
+    if (plugins == null || plugins.isEmpty) return
+    plugins.foreach {
+      case source: DataCalcSource[SR] => sourceProcess(spark, source)
+      case transform: DataCalcTransform[TR] => transformProcess(spark, 
transform)
+      case sink: DataCalcSink[SK] => sinkProcess(spark, sink)
+      case _ =>
+    }
+
+    DataCalcTempData.clean(spark.sqlContext)
+  }
+
+  private def sourceProcess[T <: SourceConfig](
+      spark: SparkSession,
+      source: DataCalcSource[T]
+  ): Unit = {
+    source.prepare(spark)
+    val ds: Dataset[Row] = source.getData(spark)
+    tempSaveResultTable(ds, source.getConfig)
+  }
+
+  private def transformProcess[T <: TransformConfig](
+      spark: SparkSession,
+      transform: DataCalcTransform[T]
+  ): Unit = {
+    transform.prepare(spark)
+    val fromDs: Dataset[Row] = if 
(StringUtils.isNotBlank(transform.getConfig.getSourceTable)) {
+      spark.read.table(transform.getConfig.getSourceTable)
+    } else {
+      null
+    }
+    val ds: Dataset[Row] = transform.process(spark, fromDs)
+    tempSaveResultTable(ds, transform.getConfig)
+  }
+
+  private def sinkProcess[T <: SinkConfig](spark: SparkSession, sink: 
DataCalcSink[T]): Unit = {
+    sink.prepare(spark)
+    val fromDs: Dataset[Row] = if 
(StringUtils.isBlank(sink.getConfig.getSourceQuery)) {
+      spark.read.table(sink.getConfig.getSourceTable)
+    } else {
+      spark.sql(sink.getConfig.getSourceQuery)
+    }
+    sink.output(spark, fromDs)
+  }
+
+  private def tempSaveResultTable(ds: Dataset[Row], resultTableConfig: 
ResultTableConfig): Unit = {
+    if (ds != null) {
+      ds.createOrReplaceTempView(resultTableConfig.getResultTable)
+      DataCalcTempData.putResultTable(resultTableConfig.getResultTable)
+      if (resultTableConfig.getPersist) {
+        ds.persist(StorageLevel.fromString(resultTableConfig.getStorageLevel))
+        DataCalcTempData.putPersistDataSet(ds)
+      }
+    }
+  }
+
+  private class CheckResult {
+
+    private var success: Boolean = true
+    private val set: mutable.Set[String] = mutable.Set()
+
+    val validator: Validator = 
Validation.buildDefaultValidatorFactory().getValidator
+
+    def checkResultTable[T <: ResultTableConfig](plugin: DataCalcPlugin[T]): 
Unit = {
+      checkPluginConfig(plugin)
+      if (set.contains(plugin.getConfig.getResultTable)) {
+        log.error(s"Result table [${plugin.getConfig.getResultTable}] cannot 
be duplicate")
+        success = false
+      } else {
+        set.add(plugin.getConfig.getResultTable)
+      }
+    }
+
+    def checkPluginConfig[T](plugin: DataCalcPlugin[T]): Unit = {
+      val violations = validator.validate(plugin.getConfig)
+      if (!violations.isEmpty) {
+        success = false
+        log.error(
+          s"Configuration check error, 
${BDPJettyServerHelper.gson.toJson(plugin.getConfig)}"
+        )
+        for (violation <- violations) {
+          if (
+              violation.getMessageTemplate
+                .startsWith("{") && violation.getMessageTemplate.endsWith("}")
+          ) {
+            log.error(s"[${violation.getPropertyPath}] 
${violation.getMessage}")
+          } else {
+            log.error(violation.getMessage)
+          }
+        }
+      }
+    }
+
+    def check(): Unit = {
+      if (!success) {
+        throw new ConfigRuntimeException(
+          SparkErrorCodeSummary.DATA_CALC_CONFIG_VALID_FAILED.getErrorCode,
+          SparkErrorCodeSummary.DATA_CALC_CONFIG_VALID_FAILED.getErrorDesc
+        )
+      }
+    }
+
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/DataCalcTempData.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/DataCalcTempData.scala
new file mode 100644
index 000000000..5d1d84221
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/DataCalcTempData.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.spark.sql.{Dataset, Row, SQLContext}
+
+import scala.collection.mutable
+
+object DataCalcTempData {
+
+  private val RESULT_TABLES: mutable.Set[String] = mutable.Set[String]()
+  private val PERSIST_DATASETS: mutable.Set[Dataset[Row]] = 
mutable.Set[Dataset[Row]]()
+
+  def putResultTable(resultTableName: String): Unit = {
+    RESULT_TABLES.add(resultTableName)
+  }
+
+  def putPersistDataSet(ds: Dataset[Row]): Unit = {
+    PERSIST_DATASETS.add(ds)
+  }
+
+  /**
+   * clean temporary data
+   * @param sqlContext
+   */
+  def clean(sqlContext: SQLContext): Unit = {
+    RESULT_TABLES.foreach(resultTable => sqlContext.dropTempTable(resultTable))
+    RESULT_TABLES.clear()
+
+    PERSIST_DATASETS.foreach(ds => ds.unpersist())
+    PERSIST_DATASETS.clear()
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcPlugin.scala
similarity index 50%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcPlugin.scala
index ecb62db6a..23a840d83 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcPlugin.scala
@@ -15,23 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.api
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.spark.sql.SparkSession
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
+trait DataCalcPlugin[T] extends Serializable {
+  protected var config: T = _
 
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+  def getConfig: T = config
 
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+  def setConfig(config: T): Unit = {
+    this.config = config
+  }
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+  def prepare(spark: SparkSession): Unit = {}
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcSink.scala
similarity index 50%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcSink.scala
index ecb62db6a..895a3fcca 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcSink.scala
@@ -15,23 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.api
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
-
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+trait DataCalcSink[T <: SinkConfig] extends DataCalcPlugin[T] {
+  def output(spark: SparkSession, data: Dataset[Row]): Unit
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcSource.scala
similarity index 50%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcSource.scala
index ecb62db6a..ea8e58bc0 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcSource.scala
@@ -15,23 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.api
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
-
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+trait DataCalcSource[T <: SourceConfig] extends DataCalcPlugin[T] {
+  def getData(spark: SparkSession): Dataset[Row]
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcTransform.scala
similarity index 50%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcTransform.scala
index ecb62db6a..54ff1f9a3 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcTransform.scala
@@ -15,23 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.api
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.engineplugin.spark.datacalc.model.TransformConfig
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
-
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+trait DataCalcTransform[T <: TransformConfig] extends DataCalcPlugin[T] {
+  def process(spark: SparkSession, ds: Dataset[Row]): Dataset[Row]
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/FileSink.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/FileSink.scala
new file mode 100644
index 000000000..1c9d9e034
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/FileSink.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.commons.text.StringSubstitutor
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import scala.collection.JavaConverters._
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class FileSink extends DataCalcSink[FileSinkConfig] {
+
+  private val log: Logger = LoggerFactory.getLogger(classOf[FileSink])
+
+  def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+    val writer = ds.write.mode(config.getSaveMode)
+
+    if (config.getPartitionBy != null && !config.getPartitionBy.isEmpty) {
+      val partitionKeys = config.getPartitionBy.asScala
+      writer.partitionBy(partitionKeys: _*)
+    }
+
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      writer.options(config.getOptions)
+    }
+    val substitutor = new StringSubstitutor(config.getVariables)
+    val path = substitutor.replace(config.getPath)
+    log.info(s"Save data to file, path: $path")
+
+    config.getSerializer match {
+      case "csv" => writer.csv(path)
+      case "json" => writer.json(path)
+      case "parquet" => writer.parquet(path)
+      case "text" => writer.text(path)
+      case "orc" => writer.orc(path)
+      case _ => writer.format(config.getSerializer).save(path)
+    }
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala
new file mode 100644
index 000000000..348f819c5
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+import 
org.apache.linkis.engineplugin.spark.datacalc.exception.HiveSinkException
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types.StructField
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class HiveSink extends DataCalcSink[HiveSinkConfig] {
+
+  private val log: Logger = LoggerFactory.getLogger(classOf[HiveSink])
+
+  def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+    val targetTable =
+      if (StringUtils.isBlank(config.getTargetDatabase)) config.getTargetTable
+      else config.getTargetDatabase + "." + config.getTargetTable
+    val targetFields = spark.table(targetTable).schema.fields
+    if (config.getWriteAsFile != null && config.getWriteAsFile) {
+      val partitionsColumns = spark.catalog
+        .listColumns(targetTable)
+        .where(col("isPartition") === true)
+        .select("name")
+        .collect()
+        .map(_.getAs[String]("name"))
+      val location = getLocation(spark, targetTable, partitionsColumns)
+      val fileFormat = getTableFileFormat(spark, targetTable)
+
+      log.info(
+        s"Write $fileFormat into target table: $targetTable, location: 
$location, file format: $fileFormat"
+      )
+      val writer = getSaveWriter(
+        ds,
+        targetFields.filter(field => !partitionsColumns.contains(field.name)),
+        targetTable
+      )
+      fileFormat match {
+        case FileFormat.PARQUET => writer.parquet(location)
+        case FileFormat.ORC => writer.orc(location)
+        case _ =>
+      }
+
+      val partition = partitionsColumns
+        .map(colName => s"$colName='${config.getVariables.get(colName)}'")
+        .mkString(",")
+      if (StringUtils.isNotBlank(partition)) {
+        log.info(s"Refresh table partition: $partition")
+        refreshPartition(spark, targetTable, partition)
+      }
+    } else {
+      val writer = getSaveWriter(ds, targetFields, targetTable)
+      log.info(s"InsertInto data to hive table: $targetTable")
+      writer.format("hive").insertInto(targetTable)
+    }
+  }
+
+  def getSaveWriter(
+      ds: Dataset[Row],
+      targetFields: Array[StructField],
+      targetTable: String
+  ): DataFrameWriter[Row] = {
+    val dsSource = sequenceFields(ds, ds.schema.fields, targetFields, 
targetTable)
+    val sourceFields = dsSource.schema.fields
+
+    // Compare column's data type when [strongCheck] is true
+    if (config.getStrongCheck != null && config.getStrongCheck) {
+      for (i <- sourceFields.indices) {
+        val targetField = targetFields(i)
+        val sourceField = sourceFields(i)
+        if (!targetField.dataType.equals(sourceField.dataType)) {
+          logFields(sourceFields, targetFields)
+          throw new HiveSinkException(
+            SparkErrorCodeSummary.DATA_CALC_COLUMN_NOT_MATCH.getErrorCode,
+            s"${i + 1}st column (${sourceField.name}[${sourceField.dataType}]) 
name or data type does not match target table column 
(${targetField.name}[${targetField.dataType}])"
+          )
+        }
+      }
+    }
+
+    val writer = 
dsSource.repartition(config.getNumPartitions).write.mode(config.getSaveMode)
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      writer.options(config.getOptions)
+    }
+    writer
+  }
+
+  def logFields(sourceFields: Array[StructField], targetFields: 
Array[StructField]): Unit = {
+    log.info(s"sourceFields: ${sourceFields.mkString("Array(", ", ", ")")}")
+    log.info(s"targetFields: ${targetFields.mkString("Array(", ", ", ")")}")
+  }
+
+  def sequenceFields(
+      dsSource: Dataset[Row],
+      sourceFields: Array[StructField],
+      targetFields: Array[StructField],
+      targetTable: String
+  ): DataFrame = {
+    if (targetFields.length != sourceFields.length) {
+      logFields(sourceFields, targetFields)
+      throw new HiveSinkException(
+        SparkErrorCodeSummary.DATA_CALC_COLUMN_NUM_NOT_MATCH.getErrorCode,
+        s"$targetTable requires that the data to be inserted have the same 
number of columns as the target table: target table has ${targetFields.length} 
column(s) but the inserted data has ${sourceFields.length} column(s)"
+      )
+    }
+
+    // hive columns is lowercase
+    val sourceFieldMap = sourceFields.map(field => field.name.toLowerCase -> 
field).toMap
+    val targetFieldMap = targetFields.map(field => field.name.toLowerCase -> 
field).toMap
+
+    val subSet = targetFieldMap.keySet -- sourceFieldMap.keySet
+    if (subSet.isEmpty) {
+      // sort column
+      dsSource.select(targetFields.map(field => col(field.name)): _*)
+    } else if (subSet.size == targetFieldMap.size) {
+      log.info("None target table fields match with source fields, write in 
order")
+      dsSource.toDF(targetFields.map(field => field.name): _*)
+    } else {
+      throw new HiveSinkException(
+        SparkErrorCodeSummary.DATA_CALC_FIELD_NOT_EXIST.getErrorCode,
+        s"$targetTable fields(${subSet.mkString(",")}) are not exist in source 
fields"
+      )
+    }
+  }
+
+  /**
+   * get hive table location
+   *
+   * @param spark
+   * @param targetTable
+   * @return
+   *   hive table location
+   */
+  def getLocation(
+      spark: SparkSession,
+      targetTable: String,
+      partitionsColumns: Array[String]
+  ): String = {
+    val locations =
+      spark.sql(s"desc formatted $targetTable").filter(col("col_name") === 
"Location").collect()
+    var location: String = locations(0).getString(1)
+    for (partitionColName <- partitionsColumns) {
+      if (
+          !config.getVariables.containsKey(partitionColName) ||
+          StringUtils.isBlank(config.getVariables.get(partitionColName))
+      ) {
+        throw new HiveSinkException(
+          SparkErrorCodeSummary.DATA_CALC_VARIABLE_NOT_EXIST.getErrorCode,
+          s"Please set [${partitionsColumns.mkString(", ")}] in variables"
+        )
+      }
+      location += 
s"/$partitionColName=${config.getVariables.get(partitionColName)}"
+    }
+    location
+  }
+
+  def getTableFileFormat(spark: SparkSession, targetTable: String): 
FileFormat.Value = {
+    try {
+      var fileFormat: FileFormat.FileFormat = FileFormat.OTHER
+      spark.table(targetTable).queryExecution.optimizedPlan match {
+        case logicalRelation: LogicalRelation =>
+          logicalRelation.relation match {
+            case hadoopFsRelation: HadoopFsRelation =>
+              hadoopFsRelation.fileFormat match {
+                case _: 
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat =>
+                  fileFormat = FileFormat.ORC
+                case _: 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat =>
+                  fileFormat = FileFormat.PARQUET
+                case dataSourceRegister: DataSourceRegister =>
+                  fileFormat = 
FileFormat.withName(dataSourceRegister.shortName.toUpperCase)
+                case _ =>
+              }
+          }
+        case hiveTableRelation: HiveTableRelation =>
+        // todo
+      }
+      fileFormat
+    } catch {
+      case _: Exception => FileFormat.OTHER
+    }
+  }
+
+  def refreshPartition(spark: SparkSession, targetTable: String, partition: 
String): Unit = {
+    spark.sql(s"ALTER TABLE $targetTable DROP IF EXISTS partition($partition)")
+    spark.sql(s"ALTER TABLE $targetTable ADD IF NOT EXISTS 
partition($partition)")
+  }
+
+}
+
+object FileFormat extends Enumeration {
+  type FileFormat = Value
+  val ORC, PARQUET, OTHER = Value
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
new file mode 100644
index 000000000..498c000ae
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
+
+import java.sql.Connection
+
+import scala.collection.JavaConverters._
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class JdbcSink extends DataCalcSink[JdbcSinkConfig] {
+
+  private val log: Logger = LoggerFactory.getLogger(classOf[JdbcSink])
+
+  def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+    val targetTable =
+      if (StringUtils.isBlank(config.getTargetDatabase)) config.getTargetTable
+      else config.getTargetDatabase + "." + config.getTargetTable
+    var options = Map(
+      "url" -> config.getUrl,
+      "driver" -> config.getDriver,
+      "user" -> config.getUser,
+      "password" -> config.getPassword,
+      "dbtable" -> targetTable,
+      "connectionCollation" -> "utf8mb4_unicode_ci"
+    )
+
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      options = config.getOptions.asScala.toMap ++ options
+    }
+
+    options = options ++ Map(
+      "isolationLevel" -> options.getOrElse("isolationLevel", "NONE"),
+      "batchsize" -> options.getOrElse("batchsize", "5000")
+    )
+
+    if (config.getPreQueries != null && !config.getPreQueries.isEmpty) {
+      spark
+        .sql("select 1")
+        .repartition(1)
+        .foreachPartition(_ => {
+          val jdbcOptions = new JDBCOptions(options)
+          val conn: Connection = 
JdbcUtils.createConnectionFactory(jdbcOptions)()
+          try {
+            config.getPreQueries.asScala.foreach(query => {
+              log.info(s"Execute pre query: $query")
+              execute(conn, jdbcOptions, query)
+            })
+          } catch {
+            case e: Exception => log.error("Execute preQueries failed. ", e)
+          } finally {
+            conn.close()
+          }
+        })
+    }
+
+    val writer = ds.repartition(config.getNumPartitions).write.format("jdbc")
+    if (StringUtils.isNotBlank(config.getSaveMode)) {
+      writer.mode(config.getSaveMode)
+    }
+    log.info(
+      s"Save data to jdbc url: ${config.getUrl}, driver: ${config.getDriver}, 
username: ${config.getUser}, table: $targetTable"
+    )
+    writer.options(options).save()
+  }
+
+  private def execute(conn: Connection, jdbcOptions: JDBCOptions, query: 
String): Unit = {
+    log.info("Execute query: {}", query)
+    val statement = conn.prepareStatement(query)
+    try {
+      statement.setQueryTimeout(jdbcOptions.queryTimeout)
+      val rows = statement.executeUpdate()
+      log.info("{} rows affected", rows)
+    } catch {
+      case e: Exception => log.error("Execute query failed. ", e)
+    } finally {
+      statement.close()
+    }
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ManagedJdbcSink.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ManagedJdbcSink.scala
new file mode 100644
index 000000000..d64e0b6a5
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ManagedJdbcSink.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+import 
org.apache.linkis.engineplugin.spark.datacalc.exception.DatabaseNotConfigException
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcDataSource
+import 
org.apache.linkis.engineplugin.spark.datacalc.service.LinkisDataSourceService
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import java.text.MessageFormat
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class ManagedJdbcSink extends DataCalcSink[ManagedJdbcSinkConfig] {
+
+  private val log: Logger = LoggerFactory.getLogger(classOf[ManagedJdbcSink])
+
+  def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+    val db: DataCalcDataSource = 
LinkisDataSourceService.getDatasource(config.getTargetDatasource)
+    if (db == null) {
+      throw new DatabaseNotConfigException(
+        SparkErrorCodeSummary.DATA_CALC_DATASOURCE_NOT_CONFIG.getErrorCode,
+        MessageFormat.format(
+          SparkErrorCodeSummary.DATA_CALC_DATASOURCE_NOT_CONFIG.getErrorDesc,
+          config.getTargetDatasource
+        )
+      )
+    }
+
+    val jdbcConfig = new JdbcSinkConfig()
+    jdbcConfig.setUrl(db.getUrl)
+    jdbcConfig.setDriver(db.getDriver)
+    jdbcConfig.setUser(db.getUser)
+    jdbcConfig.setPassword(db.getPassword)
+    jdbcConfig.setTargetDatabase(config.getTargetDatabase)
+    jdbcConfig.setTargetTable(config.getTargetTable)
+    jdbcConfig.setSaveMode(config.getSaveMode)
+    jdbcConfig.setPreQueries(config.getPreQueries)
+    jdbcConfig.setNumPartitions(config.getNumPartitions)
+    jdbcConfig.setOptions(config.getOptions)
+    jdbcConfig.setSourceTable(config.getSourceTable)
+    jdbcConfig.setSourceQuery(config.getSourceQuery)
+    jdbcConfig.setVariables(config.getVariables)
+
+    val sinkPlugin = new JdbcSink()
+    sinkPlugin.setConfig(jdbcConfig)
+    sinkPlugin.output(spark, ds)
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/FileSource.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/FileSource.scala
new file mode 100644
index 000000000..95f2e0535
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/FileSource.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.commons.text.StringSubstitutor
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class FileSource extends DataCalcSource[FileSourceConfig] {
+
+  private val log: Logger = LoggerFactory.getLogger(classOf[FileSource])
+
+  override def getData(spark: SparkSession): Dataset[Row] = {
+    val reader = spark.read
+
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      reader.options(config.getOptions)
+    }
+    val substitutor = new StringSubstitutor(config.getVariables)
+    val path = substitutor.replace(config.getPath)
+    log.info(s"Load data from file <$path>")
+
+    var df = config.getSerializer match {
+      case "csv" => reader.csv(path)
+      case "json" => reader.json(path)
+      case "parquet" => reader.parquet(path)
+      case "text" => reader.text(path)
+      case "orc" => reader.orc(path)
+      case _ => reader.format(config.getSerializer).load(path)
+    }
+    if (config.getColumnNames != null && config.getColumnNames.length > 0) {
+      df = df.toDF(config.getColumnNames: _*)
+    }
+    df
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/JdbcSource.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/JdbcSource.scala
new file mode 100644
index 000000000..7bdc43b38
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/JdbcSource.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class JdbcSource extends DataCalcSource[JdbcSourceConfig] {
+
+  private val log: Logger = LoggerFactory.getLogger(classOf[JdbcSource])
+
+  override def getData(spark: SparkSession): Dataset[Row] = {
+    val reader = spark.read.format("jdbc")
+    if (config.getOptions != null && !config.getOptions.isEmpty) {
+      reader.options(config.getOptions)
+    }
+
+    log.info(
+      s"Load data from jdbc url: ${config.getUrl}, driver: 
${config.getDriver}, username: ${config.getUser}, query: ${config.getQuery}"
+    )
+
+    reader
+      .option("url", config.getUrl)
+      .option("driver", config.getDriver)
+      .option("user", config.getUser)
+      .option("password", config.getPassword)
+      .option("query", config.getQuery)
+      .load()
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/ManagedJdbcSource.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/ManagedJdbcSource.scala
new file mode 100644
index 000000000..47a0f55ba
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/ManagedJdbcSource.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+import 
org.apache.linkis.engineplugin.spark.datacalc.exception.DatabaseNotConfigException
+import 
org.apache.linkis.engineplugin.spark.datacalc.service.LinkisDataSourceService
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import java.text.MessageFormat
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class ManagedJdbcSource extends DataCalcSource[ManagedJdbcSourceConfig] {
+
+  private val log: Logger = LoggerFactory.getLogger(classOf[ManagedJdbcSource])
+
+  override def getData(spark: SparkSession): Dataset[Row] = {
+    val db = LinkisDataSourceService.getDatasource(config.getDatasource)
+    if (db == null) {
+      throw new DatabaseNotConfigException(
+        SparkErrorCodeSummary.DATA_CALC_DATASOURCE_NOT_CONFIG.getErrorCode,
+        MessageFormat.format(
+          SparkErrorCodeSummary.DATA_CALC_DATASOURCE_NOT_CONFIG.getErrorDesc,
+          config.getDatasource
+        )
+      )
+    }
+
+    val jdbcConfig = new JdbcSourceConfig()
+    jdbcConfig.setUrl(db.getUrl)
+    jdbcConfig.setDriver(db.getDriver)
+    jdbcConfig.setUser(db.getUser)
+    jdbcConfig.setPassword(db.getPassword)
+    jdbcConfig.setQuery(config.getQuery)
+    jdbcConfig.setPersist(config.getPersist)
+    jdbcConfig.setOptions(config.getOptions)
+    jdbcConfig.setResultTable(config.getResultTable)
+
+    val sourcePlugin = new JdbcSource()
+    sourcePlugin.setConfig(jdbcConfig)
+    sourcePlugin.getData(spark)
+  }
+
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/transform/SqlTransform.scala
similarity index 51%
copy from 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to 
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/transform/SqlTransform.scala
index ecb62db6a..bff241546 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/transform/SqlTransform.scala
@@ -15,23 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.transform
 
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcTransform
 
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
-    extends ErrorException(errCode, desc)
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import org.slf4j.{Logger, LoggerFactory}
 
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
-    extends ErrorException(errorCode, desc)
+class SqlTransform extends DataCalcTransform[SqlTransformConfig] {
 
-case class NotSupportSparkSqlTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+  private val log: Logger = LoggerFactory.getLogger(classOf[SqlTransform])
 
-case class NotSupportSparkPythonTypeException(desc: String)
-    extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+  override def process(spark: SparkSession, ds: Dataset[Row]): Dataset[Row] = {
+    log.info(s"Load data from query: ${config.getSql}")
+    spark.sql(config.getSql)
+  }
 
-case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
index ecb62db6a..db60850c5 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
@@ -35,3 +35,5 @@ case class NotSupportSparkPythonTypeException(desc: String)
     extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
 
 case class NotSupportSparkScalaTypeException(desc: String) extends 
ErrorException(420003, desc)
+
+case class NotSupportSparkDataCalcTypeException(desc: String) extends 
ErrorException(420004, desc)
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala
new file mode 100644
index 000000000..81679dce0
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.executor
+
+import org.apache.linkis.common.exception.FatalException
+import org.apache.linkis.common.utils.Utils
+import 
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
+import org.apache.linkis.engineconn.core.executor.ExecutorManager
+import org.apache.linkis.engineplugin.spark.common.{Kind, SparkDataCalc}
+import org.apache.linkis.engineplugin.spark.datacalc.DataCalcExecution
+import org.apache.linkis.engineplugin.spark.datacalc.model.{DataCalcArrayData, 
DataCalcGroupData}
+import org.apache.linkis.engineplugin.spark.entity.SparkEngineSession
+import org.apache.linkis.engineplugin.spark.utils.EngineUtils
+import org.apache.linkis.governance.common.paser.EmptyCodeParser
+import org.apache.linkis.scheduler.executer.{
+  CompletedExecuteResponse,
+  ErrorExecuteResponse,
+  ExecuteResponse,
+  SuccessExecuteResponse
+}
+
+import org.apache.arrow.memory.OutOfMemoryException
+import org.apache.commons.lang.exception.ExceptionUtils
+import org.apache.orc.storage.common.io.Allocator.AllocatorOutOfMemoryException
+
+import java.lang.reflect.InvocationTargetException
+
+class SparkDataCalcExecutor(sparkEngineSession: SparkEngineSession, id: Long)
+    extends SparkEngineConnExecutor(sparkEngineSession.sparkContext, id) {
+
+  override def init(): Unit = {
+    setCodeParser(new EmptyCodeParser)
+    super.init()
+    logger.info("spark data-calc executor start")
+  }
+
+  override def runCode(
+      executor: SparkEngineConnExecutor,
+      code: String,
+      context: EngineExecutionContext,
+      jobGroup: String
+  ): ExecuteResponse = {
+    logger.info("DataCalcExecutor run query: " + code)
+    context.appendStdout(s"${EngineUtils.getName} >> $code")
+    Utils.tryCatch {
+      val execType = context.getProperties.getOrDefault("exec-type", 
"array").toString
+      if ("group" == execType) {
+        val (sources, transformations, sinks) =
+          DataCalcExecution.getPlugins(DataCalcGroupData.getData(code))
+        DataCalcExecution.execute(sparkEngineSession.sparkSession, sources, 
transformations, sinks)
+      } else {
+        val plugins = 
DataCalcExecution.getPlugins(DataCalcArrayData.getData(code))
+        DataCalcExecution.execute(sparkEngineSession.sparkSession, plugins)
+      }
+      SuccessExecuteResponse().asInstanceOf[CompletedExecuteResponse]
+    } {
+      case e: InvocationTargetException =>
+        logger.error("execute sparkDataCalc has InvocationTargetException!", e)
+        var cause = ExceptionUtils.getCause(e)
+        if (cause == null) cause = e
+        ErrorExecuteResponse(ExceptionUtils.getRootCauseMessage(e), cause)
+      case e: OutOfMemoryException =>
+        getErrorResponse(e, true)
+      case e: AllocatorOutOfMemoryException =>
+        getErrorResponse(e, true)
+      case e: FatalException =>
+        getErrorResponse(e, true)
+      case e: Exception =>
+        getErrorResponse(e, false)
+      case err: OutOfMemoryError =>
+        getErrorResponse(err, true)
+      case err: VirtualMachineError =>
+        getErrorResponse(err, true)
+      case err: Error =>
+        getErrorResponse(err, false)
+    }
+  }
+
+  def getErrorResponse(throwable: Throwable, needToStopEC: Boolean): 
ErrorExecuteResponse = {
+    if (needToStopEC) {
+      logger.error(
+        s"execute sparkSQL has ${throwable.getClass.getName} now to set status 
to shutdown!",
+        throwable
+      )
+      ExecutorManager.getInstance.getReportExecutor.tryShutdown()
+    } else {
+      logger.error(s"execute sparkSQL has ${throwable.getClass.getName}!", 
throwable)
+    }
+    ErrorExecuteResponse(ExceptionUtils.getRootCauseMessage(throwable), 
throwable)
+  }
+
+  override protected def getExecutorIdPreFix: String = "SparkDataCalcExecutor_"
+
+  override protected def getKind: Kind = SparkDataCalc()
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 35cf6801a..f2a40dd06 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -26,7 +26,7 @@ import 
org.apache.linkis.engineconn.computation.executor.execute.{
 import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils
 import org.apache.linkis.engineconn.core.exception.ExecutorHookFatalException
 import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
-import org.apache.linkis.engineplugin.spark.common.Kind
+import org.apache.linkis.engineplugin.spark.common.{Kind, SparkDataCalc}
 import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper
 import org.apache.linkis.engineplugin.spark.extension.{
   SparkPostExecutionHook,
@@ -112,7 +112,10 @@ abstract class SparkEngineConnExecutor(val sc: 
SparkContext, id: Long)
         logger.error(s"execute preExecution hook : ${hookName} failed.")
     }
     
Utils.tryAndWarn(CSSparkHelper.setContextIDInfoToSparkConf(engineExecutorContext,
 sc))
-    val _code = Kind.getRealCode(preCode)
+    val _code = kind match {
+      case _: SparkDataCalc => preCode
+      case _ => Kind.getRealCode(preCode)
+    }
     logger.info(s"Ready to run code with kind $kind.")
     val jobId = JobUtils.getJobIdFromMap(engineExecutorContext.getProperties)
     val jobGroupId = if (StringUtils.isNotBlank(jobId)) {
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkDataCalcExecutorFactory.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkDataCalcExecutorFactory.scala
new file mode 100644
index 000000000..4b80a581e
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkDataCalcExecutorFactory.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.factory
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import 
org.apache.linkis.engineconn.computation.executor.creation.ComputationExecutorFactory
+import 
org.apache.linkis.engineconn.computation.executor.execute.ComputationExecutor
+import org.apache.linkis.engineplugin.spark.entity.SparkEngineSession
+import 
org.apache.linkis.engineplugin.spark.exception.NotSupportSparkDataCalcTypeException
+import org.apache.linkis.engineplugin.spark.executor.SparkDataCalcExecutor
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.RunType
+import org.apache.linkis.manager.label.entity.engine.RunType.RunType
+
+class SparkDataCalcExecutorFactory extends ComputationExecutorFactory {
+
+  override protected def newExecutor(
+      id: Int,
+      engineCreationContext: EngineCreationContext,
+      engineConn: EngineConn,
+      label: Array[Label[_]]
+  ): ComputationExecutor = {
+    engineConn.getEngineConnSession match {
+      case sparkEngineSession: SparkEngineSession =>
+        new SparkDataCalcExecutor(sparkEngineSession, id)
+      case _ =>
+        throw NotSupportSparkDataCalcTypeException(
+          "Invalid EngineConn engine session obj, failed to create sparkSql 
executor"
+        )
+    }
+  }
+
+  override protected def getRunType: RunType = RunType.DATA_CALC
+}
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index 32ba4d0cf..098dccb80 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -192,7 +192,8 @@ class SparkEngineConnFactory extends 
MultiExecutorEngineConnFactory with Logging
   private val executorFactoryArray = Array[ExecutorFactory](
     new SparkSqlExecutorFactory,
     new SparkPythonExecutorFactory,
-    new SparkScalaExecutorFactory
+    new SparkScalaExecutorFactory,
+    new SparkDataCalcExecutorFactory
   )
 
   override def getExecutorFactories: Array[ExecutorFactory] = {
diff --git 
a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDataCalcPlugins.scala
 
b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDataCalcPlugins.scala
new file mode 100644
index 000000000..efd128666
--- /dev/null
+++ 
b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDataCalcPlugins.scala
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.engineplugin.spark.common.Kind
+import org.apache.linkis.engineplugin.spark.datacalc.model.{DataCalcArrayData, 
DataCalcGroupData}
+import org.apache.linkis.engineplugin.spark.datacalc.util.PluginUtil
+import org.apache.linkis.engineplugin.spark.extension.SparkPreExecutionHook
+
+import org.junit.jupiter.api.{Assertions, Test};
+
+class TestDataCalcPlugins {
+
+  @Test
+  def testArrayBuild: Unit = {
+    val data = DataCalcArrayData.getData(arrayConfigJson)
+    Assertions.assertTrue(data != null)
+
+    val array = DataCalcExecution.getPlugins(data)
+    Assertions.assertTrue(array != null)
+  }
+
+  @Test
+  def testGroupBuild: Unit = {
+    val data = DataCalcGroupData.getData(groupConfigJson)
+    Assertions.assertTrue(data != null)
+
+    val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+    Assertions.assertTrue(sources != null)
+    Assertions.assertTrue(transforms != null)
+    Assertions.assertTrue(sinks != null)
+  }
+
+  @Test
+  def testGetRealCode: Unit = {
+
+    var preCode = arrayConfigJson
+
+    val hooks = SparkPreExecutionHook.getSparkPreExecutionHooks();
+    hooks.foreach(hook => {
+      preCode = hook.callPreExecutionHook(null, preCode)
+    })
+    Assertions.assertTrue(preCode != null)
+  }
+
+  val arrayConfigJson =
+    """
+      |{
+      |    "plugins": [
+      |        {
+      |            "name": "jdbc",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "test1",
+      |                "url": 
"jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
+      |                "driver": "com.mysql.jdbc.Driver",
+      |                "user": "root",
+      |                "password": "123456",
+      |                "query": "select * from 
dip_linkis.linkis_ps_udf_baseinfo",
+      |                "options": {
+      |                }
+      |            }
+      |        },
+      |        {
+      |            "name": "sql",
+      |            "type": "transformation",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "sql": "select * from test1"
+      |            }
+      |        },
+      |        {
+      |            "name": "file",
+      |            "type": "sink",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "path": "hdfs:///tmp/test_new",
+      |                "partitionBy": ["create_user"],
+      |                "saveMode": "overwrite",
+      |                "serializer": "csv"
+      |            }
+      |        },
+      |        {
+      |            "name": "file",
+      |            "type": "sink",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "path": "hdfs:///tmp/test_new_no_partition",
+      |                "saveMode": "overwrite",
+      |                "serializer": "csv"
+      |            }
+      |        },
+      |        {
+      |            "name": "file",
+      |            "type": "sink",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "path": "hdfs:///tmp/test_new_partition",
+      |                "partitionBy": ["create_user"],
+      |                "saveMode": "overwrite",
+      |                "serializer": "csv"
+      |            }
+      |        },
+      |        {
+      |            "name": "file",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "test2",
+      |                "path": "hdfs:///tmp/test_new_no_partition",
+      |                "serializer": "csv",
+      |                "columnNames": ["id", "create_user", "udf_name", 
"udf_type", "tree_id", "create_time", "update_time", "sys", "cluster_name", 
"is_expire", "is_shared"]
+      |            }
+      |        },
+      |        {
+      |            "name": "jdbc",
+      |            "type": "sink",
+      |            "config": {
+      |                "sourceTable": "test2",
+      |                "url": 
"jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
+      |                "driver": "com.mysql.jdbc.Driver",
+      |                "user": "root",
+      |                "password": "123456",
+      |                "targetTable": "linkis_ps_udf_baseinfo2",
+      |                "options": {
+      |                }
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
+  val groupConfigJson =
+    """
+      |{
+      |    "sources": [
+      |        {
+      |            "name": "jdbc",
+      |            "type": "",
+      |            "config": {
+      |                "resultTable": "test1",
+      |                "url": 
"jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
+      |                "driver": "com.mysql.jdbc.Driver",
+      |                "user": "root",
+      |                "password": "123456",
+      |                "query": "select * from 
dip_linkis.linkis_ps_udf_baseinfo",
+      |                "options": {
+      |                }
+      |            }
+      |        },
+      |        {
+      |            "name": "file",
+      |            "type": "source",
+      |            "config": {
+      |                "resultTable": "test2",
+      |                "path": "hdfs:///tmp/test_new_no_partition",
+      |                "serializer": "csv",
+      |                "columnNames": ["id", "create_user", "udf_name", 
"udf_type", "tree_id", "create_time", "update_time", "sys", "cluster_name", 
"is_expire", "is_shared"]
+      |            }
+      |        }
+      |    ],
+      |    "transformations": [
+      |        {
+      |            "name": "sql",
+      |            "type": "transformation",
+      |            "config": {
+      |                "resultTable": "T1654611700631",
+      |                "sql": "select * from test1"
+      |            }
+      |        }
+      |    ],
+      |    "sinks": [
+      |        {
+      |            "name": "file",
+      |            "config": {
+      |                "sourceTable": "T1654611700631",
+      |                "path": "hdfs:///tmp/test_new",
+      |                "partitionBy": ["create_user"],
+      |                "saveMode": "overwrite",
+      |                "serializer": "csv"
+      |            }
+      |        },
+      |        {
+      |            "name": "jdbc",
+      |            "type": "sink",
+      |            "config": {
+      |                "sourceTable": "test2",
+      |                "url": 
"jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
+      |                "driver": "com.mysql.jdbc.Driver",
+      |                "user": "root",
+      |                "password": "123456",
+      |                "targetTable": "linkis_ps_udf_baseinfo2",
+      |                "options": {
+      |                }
+      |            }
+      |        }
+      |    ]
+      |}
+      |""".stripMargin
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to