This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.2 by this push:
new b30064d05 [Feature][linkis-engineplugin-spark] add a new Executor
(json ETL) (#3715)
b30064d05 is described below
commit b30064d0599a177d952e0d5093e61b788c9e740d
Author: rarexixi <[email protected]>
AuthorDate: Sun Oct 30 00:17:12 2022 +0800
[Feature][linkis-engineplugin-spark] add a new Executor (json ETL) (#3715)
---
.../linkis/common/utils/CodeAndRunTypeUtils.scala | 2 +-
.../manager/label/entity/engine/RunType.scala | 2 +
linkis-engineconn-plugins/spark/pom.xml | 23 ++
.../exception/ConfigRuntimeException.java} | 28 ++-
.../exception/DatabaseNotConfigException.java} | 28 ++-
.../datacalc/exception/HiveSinkException.java} | 28 ++-
.../spark/datacalc/model/DataCalcArrayData.java} | 30 +--
.../spark/datacalc/model/DataCalcDataConfig.java} | 44 ++--
.../spark/datacalc/model/DataCalcDataSource.java | 77 +++++++
.../spark/datacalc/model/DataCalcGroupData.java | 57 +++++
.../datacalc/model/DataCalcPluginConfig.java} | 28 ++-
.../spark/datacalc/model/ResultTableConfig.java} | 23 +-
.../spark/datacalc/model/SinkConfig.java | 52 +++++
.../spark/datacalc/model/SourceConfig.java | 58 +++++
.../spark/datacalc/model/TransformConfig.java | 68 ++++++
.../datacalc/service/LinkisDataSourceService.java | 52 +++++
.../spark/datacalc/sink/FileSinkConfig.java | 91 ++++++++
.../spark/datacalc/sink/HiveSinkConfig.java | 105 +++++++++
.../spark/datacalc/sink/JdbcSinkConfig.java | 136 ++++++++++++
.../spark/datacalc/sink/ManagedJdbcSinkConfig.java | 106 +++++++++
.../spark/datacalc/source/FileSourceConfig.java | 73 +++++++
.../spark/datacalc/source/JdbcSourceConfig.java | 87 ++++++++
.../datacalc/source/ManagedJdbcSourceConfig.java} | 48 +++--
.../datacalc/transform/SqlTransformConfig.java} | 26 ++-
.../spark/datacalc/util/PluginUtil.java | 95 +++++++++
.../spark/errorcode/SparkErrorCodeSummary.java | 18 +-
.../engineplugin/spark/common/SparkKind.scala | 5 +
.../spark/datacalc/DataCalcExecution.scala | 237 +++++++++++++++++++++
.../spark/datacalc/DataCalcTempData.scala | 49 +++++
.../api/DataCalcPlugin.scala} | 25 +--
.../api/DataCalcSink.scala} | 23 +-
.../api/DataCalcSource.scala} | 23 +-
.../api/DataCalcTransform.scala} | 23 +-
.../spark/datacalc/sink/FileSink.scala | 58 +++++
.../spark/datacalc/sink/HiveSink.scala | 216 +++++++++++++++++++
.../spark/datacalc/sink/JdbcSink.scala | 102 +++++++++
.../spark/datacalc/sink/ManagedJdbcSink.scala | 68 ++++++
.../spark/datacalc/source/FileSource.scala | 55 +++++
.../spark/datacalc/source/JdbcSource.scala | 49 +++++
.../spark/datacalc/source/ManagedJdbcSource.scala | 62 ++++++
.../transform/SqlTransform.scala} | 26 ++-
.../spark/exception/NoSupportEngineException.scala | 2 +
.../spark/executor/SparkDataCalcExecutor.scala | 110 ++++++++++
.../spark/executor/SparkEngineConnExecutor.scala | 7 +-
.../factory/SparkDataCalcExecutorFactory.scala | 50 +++++
.../spark/factory/SparkEngineConnFactory.scala | 3 +-
.../spark/datacalc/TestDataCalcPlugins.scala | 215 +++++++++++++++++++
47 files changed, 2573 insertions(+), 220 deletions(-)
diff --git
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala
index 9c4f552d9..d00be6d3a 100644
---
a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala
+++
b/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/CodeAndRunTypeUtils.scala
@@ -26,7 +26,7 @@ object CodeAndRunTypeUtils {
val CODE_TYPE_AND_RUN_TYPE_RELATION = CommonVars(
"wds.linkis.codeType.runType.relation",
-
"sql=>sql|hql|jdbc|hive|psql|fql|tsql,python=>python|py|pyspark,java=>java,scala=>scala,shell=>sh|shell"
+
"sql=>sql|hql|jdbc|hive|psql|fql|tsql,python=>python|py|pyspark,java=>java,scala=>scala,shell=>sh|shell,json=>json|data_calc"
)
val RUN_TYPE_SQL = "sql"
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
index 33601c725..3d53c998d 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
@@ -43,4 +43,6 @@ object RunType extends Enumeration {
val TRINO_SQL = Value("tsql")
+ val DATA_CALC = Value("data_calc") // spark datacalc (ETL)
+
}
diff --git a/linkis-engineconn-plugins/spark/pom.xml
b/linkis-engineconn-plugins/spark/pom.xml
index 78743bf93..811d87b12 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -383,6 +383,29 @@
<version>4.0.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.hibernate.validator</groupId>
+ <artifactId>hibernate-validator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.linkis</groupId>
+ <artifactId>linkis-datasource-client</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>javax.servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/ConfigRuntimeException.java
similarity index 50%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/ConfigRuntimeException.java
index ecb62db6a..71b83bfc9 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/ConfigRuntimeException.java
@@ -15,23 +15,19 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.exception;
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.common.exception.ExceptionLevel;
+import org.apache.linkis.common.exception.LinkisRuntimeException;
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+public class ConfigRuntimeException extends LinkisRuntimeException {
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+ public ConfigRuntimeException(int errCode, String desc) {
+ super(errCode, desc);
+ }
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+ @Override
+ public ExceptionLevel getLevel() {
+ return ExceptionLevel.ERROR;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/DatabaseNotConfigException.java
similarity index 50%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/DatabaseNotConfigException.java
index ecb62db6a..442b32d84 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/DatabaseNotConfigException.java
@@ -15,23 +15,19 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.exception;
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.common.exception.ExceptionLevel;
+import org.apache.linkis.common.exception.LinkisRuntimeException;
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+public class DatabaseNotConfigException extends LinkisRuntimeException {
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+ public DatabaseNotConfigException(int errCode, String desc) {
+ super(errCode, desc);
+ }
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+ @Override
+ public ExceptionLevel getLevel() {
+ return ExceptionLevel.ERROR;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/HiveSinkException.java
similarity index 50%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/HiveSinkException.java
index ecb62db6a..ff21abf27 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/exception/HiveSinkException.java
@@ -15,23 +15,19 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.exception;
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.common.exception.ExceptionLevel;
+import org.apache.linkis.common.exception.LinkisRuntimeException;
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+public class HiveSinkException extends LinkisRuntimeException {
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+ public HiveSinkException(int errCode, String desc) {
+ super(errCode, desc);
+ }
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+ @Override
+ public ExceptionLevel getLevel() {
+ return ExceptionLevel.ERROR;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcArrayData.java
similarity index 51%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcArrayData.java
index ecb62db6a..297bec358 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcArrayData.java
@@ -15,23 +15,25 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.model;
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.server.BDPJettyServerHelper;
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
+import java.io.Serializable;
+
+public class DataCalcArrayData extends DataCalcPluginConfig implements
Serializable {
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
+ private DataCalcDataConfig[] plugins;
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+ public DataCalcDataConfig[] getPlugins() {
+ return plugins;
+ }
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+ public void setPlugins(DataCalcDataConfig[] plugins) {
+ this.plugins = plugins;
+ }
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+ public static DataCalcArrayData getData(String data) {
+ return BDPJettyServerHelper.gson().fromJson(data, DataCalcArrayData.class);
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcDataConfig.java
similarity index 51%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcDataConfig.java
index ecb62db6a..9a01aaa67 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcDataConfig.java
@@ -15,23 +15,39 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.model;
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import java.io.Serializable;
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
+import com.google.gson.JsonElement;
+
+public class DataCalcDataConfig implements Serializable {
+
+ private String type;
+ private String name;
+ private JsonElement config;
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
+ public String getName() {
+ return name;
+ }
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+ public void setName(String name) {
+ this.name = name;
+ }
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+ public JsonElement getConfig() {
+ return config;
+ }
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+ public void setConfig(JsonElement config) {
+ this.config = config;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcDataSource.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcDataSource.java
new file mode 100644
index 000000000..7e1ae1462
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcDataSource.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.model;
+
+import java.io.Serializable;
+
+public class DataCalcDataSource implements Serializable {
+ private String typeName;
+ private String driver;
+ private String url;
+ private String databaseName;
+ private String user;
+ private String password;
+
+ public String getTypeName() {
+ return typeName;
+ }
+
+ public void setTypeName(String typeName) {
+ this.typeName = typeName;
+ }
+
+ public String getDriver() {
+ return driver;
+ }
+
+ public void setDriver(String driver) {
+ this.driver = driver;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ public void setDatabaseName(String databaseName) {
+ this.databaseName = databaseName;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcGroupData.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcGroupData.java
new file mode 100644
index 000000000..ec8c862d3
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcGroupData.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.model;
+
+import org.apache.linkis.server.BDPJettyServerHelper;
+
+import java.io.Serializable;
+
+public class DataCalcGroupData extends DataCalcPluginConfig implements
Serializable {
+
+ private DataCalcDataConfig[] sources;
+ private DataCalcDataConfig[] transformations;
+ private DataCalcDataConfig[] sinks;
+
+ public DataCalcDataConfig[] getSources() {
+ return sources;
+ }
+
+ public void setSources(DataCalcDataConfig[] sources) {
+ this.sources = sources;
+ }
+
+ public DataCalcDataConfig[] getTransformations() {
+ return transformations;
+ }
+
+ public void setTransformations(DataCalcDataConfig[] transformations) {
+ this.transformations = transformations;
+ }
+
+ public DataCalcDataConfig[] getSinks() {
+ return sinks;
+ }
+
+ public void setSinks(DataCalcDataConfig[] sinks) {
+ this.sinks = sinks;
+ }
+
+ public static DataCalcGroupData getData(String data) {
+ return BDPJettyServerHelper.gson().fromJson(data, DataCalcGroupData.class);
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcPluginConfig.java
similarity index 50%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcPluginConfig.java
index ecb62db6a..7e2085ff9 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/DataCalcPluginConfig.java
@@ -15,23 +15,21 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.model;
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
+public abstract class DataCalcPluginConfig implements Serializable {
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+ protected Map<String, String> variables = new HashMap<>();
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+ public Map<String, String> getVariables() {
+ return variables;
+ }
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+ public void setVariables(Map<String, String> variables) {
+ if (variables != null) this.variables = variables;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/ResultTableConfig.java
similarity index 50%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/ResultTableConfig.java
index ecb62db6a..4919444e7 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/ResultTableConfig.java
@@ -15,23 +15,14 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.model;
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import java.io.Serializable;
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+public interface ResultTableConfig extends Serializable {
+ String getResultTable();
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+ Boolean getPersist();
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+ String getStorageLevel();
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/SinkConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/SinkConfig.java
new file mode 100644
index 000000000..735d4deb4
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/SinkConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.model;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.AssertTrue;
+
+import java.io.Serializable;
+
+public abstract class SinkConfig extends DataCalcPluginConfig implements
Serializable {
+
+ protected String sourceTable;
+
+ protected String sourceQuery;
+
+ public String getSourceTable() {
+ return sourceTable;
+ }
+
+ public void setSourceTable(String sourceTable) {
+ this.sourceTable = sourceTable;
+ }
+
+ public String getSourceQuery() {
+ return sourceQuery;
+ }
+
+ public void setSourceQuery(String sourceQuery) {
+ this.sourceQuery = sourceQuery;
+ }
+
+ @AssertTrue(message = "[sourceTable, sourceQuery] cannot be blank at the
same time.")
+ public boolean isSourceOK() {
+ return StringUtils.isNotBlank(sourceTable) ||
StringUtils.isNotBlank(sourceQuery);
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/SourceConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/SourceConfig.java
new file mode 100644
index 000000000..31cad254d
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/SourceConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.model;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotBlank;
+
+import java.io.Serializable;
+
+public abstract class SourceConfig extends DataCalcPluginConfig
+ implements ResultTableConfig, Serializable {
+
+ @NotBlank protected String resultTable;
+
+ private Boolean persist = false;
+
+ private String storageLevel = "MEMORY_AND_DISK";
+
+ public String getResultTable() {
+ return resultTable;
+ }
+
+ public void setResultTable(String resultTable) {
+ this.resultTable = resultTable;
+ }
+
+ public Boolean getPersist() {
+ return persist;
+ }
+
+ public void setPersist(Boolean persist) {
+ this.persist = persist;
+ }
+
+ public String getStorageLevel() {
+ return storageLevel;
+ }
+
+ public void setStorageLevel(String storageLevel) {
+ if (StringUtils.isNotBlank(storageLevel)) this.storageLevel = storageLevel;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/TransformConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/TransformConfig.java
new file mode 100644
index 000000000..d2ed12297
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/model/TransformConfig.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.model;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotBlank;
+
+import java.io.Serializable;
+
+public abstract class TransformConfig extends DataCalcPluginConfig
+ implements ResultTableConfig, Serializable {
+
+ protected String sourceTable;
+
+ @NotBlank protected String resultTable;
+
+ private Boolean persist = false;
+
+ private String storageLevel = "MEMORY_AND_DISK";
+
+ public String getSourceTable() {
+ return sourceTable;
+ }
+
+ public void setSourceTable(String sourceTable) {
+ this.sourceTable = sourceTable;
+ }
+
+ public String getResultTable() {
+ return resultTable;
+ }
+
+ public void setResultTable(String resultTable) {
+ this.resultTable = resultTable;
+ }
+
+ public Boolean getPersist() {
+ return persist;
+ }
+
+ public void setPersist(Boolean persist) {
+ this.persist = persist;
+ }
+
+ public String getStorageLevel() {
+ return storageLevel;
+ }
+
+ public void setStorageLevel(String storageLevel) {
+ if (StringUtils.isNotBlank(storageLevel)) this.storageLevel = storageLevel;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/service/LinkisDataSourceService.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/service/LinkisDataSourceService.java
new file mode 100644
index 000000000..f66d15d0a
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/service/LinkisDataSourceService.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.service;
+
+import org.apache.linkis.datasource.client.impl.LinkisDataSourceRemoteClient;
+import
org.apache.linkis.datasource.client.request.GetInfoPublishedByDataSourceNameAction;
+import org.apache.linkis.datasourcemanager.common.domain.DataSource;
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcDataSource;
+import org.apache.linkis.storage.utils.StorageUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LinkisDataSourceService {
+
+ private static final Logger logger =
LoggerFactory.getLogger(LinkisDataSourceService.class);
+
+ private static final LinkisDataSourceRemoteClient dataSourceClient =
+ new LinkisDataSourceRemoteClient();
+
+ public static DataCalcDataSource getDatasource(String datasourceName) {
+ GetInfoPublishedByDataSourceNameAction action =
+ GetInfoPublishedByDataSourceNameAction.builder()
+ .setDataSourceName(datasourceName)
+ .setUser(StorageUtils.getJvmUser())
+ .build(); // ignore parameter 'system'
+ DataSource datasource =
+
dataSourceClient.getInfoPublishedByDataSourceName(action).getDataSource();
+ datasource.getConnectParams();
+ return transform(datasource);
+ }
+
+ private static DataCalcDataSource transform(DataSource datasource) {
+ DataCalcDataSource ds = new DataCalcDataSource();
+ return ds;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/FileSinkConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/FileSinkConfig.java
new file mode 100644
index 000000000..22ac537d8
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/FileSinkConfig.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FileSinkConfig extends SinkConfig {
+
+ @NotBlank
+ @Pattern(
+ regexp = "^(file|hdfs)://.*",
+ message =
+ "Invalid path URI, please set the following allowed schemas:
'file://' or 'hdfs://'.")
+ private String path;
+
+ @NotBlank private String serializer = "parquet";
+
+ private List<String> partitionBy = new ArrayList<>();
+
+ @NotBlank
+ @Pattern(
+ regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+ message =
+ "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite',
'append', 'ignore', 'error', 'errorifexists'.")
+ private String saveMode = "overwrite";
+
+ private Map<String, String> options = new HashMap<>();
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public String getSerializer() {
+ return serializer;
+ }
+
+ public void setSerializer(String serializer) {
+ this.serializer = serializer;
+ }
+
+ public List<String> getPartitionBy() {
+ return partitionBy;
+ }
+
+ public void setPartitionBy(List<String> partitionBy) {
+ this.partitionBy = partitionBy;
+ }
+
+ public String getSaveMode() {
+ return saveMode;
+ }
+
+ public void setSaveMode(String saveMode) {
+ this.saveMode = saveMode;
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public void setOptions(Map<String, String> options) {
+ this.options = options;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSinkConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSinkConfig.java
new file mode 100644
index 000000000..56589de24
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSinkConfig.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class HiveSinkConfig extends SinkConfig {
+
+ private String targetDatabase;
+
+ @NotBlank private String targetTable;
+
+ @NotBlank
+ @Pattern(
+ regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+ message =
+ "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite',
'append', 'ignore', 'error', 'errorifexists'.")
+ private String saveMode = "overwrite";
+
+ private Boolean strongCheck = true;
+
+ private Boolean writeAsFile = false;
+
+ private Integer numPartitions = 10;
+
+ private Map<String, String> options = new HashMap<>();
+
+ public String getTargetDatabase() {
+ return targetDatabase;
+ }
+
+ public void setTargetDatabase(String targetDatabase) {
+ this.targetDatabase = targetDatabase;
+ }
+
+ public String getTargetTable() {
+ return targetTable;
+ }
+
+ public void setTargetTable(String targetTable) {
+ this.targetTable = targetTable;
+ }
+
+ public String getSaveMode() {
+ return saveMode;
+ }
+
+ public void setSaveMode(String saveMode) {
+ this.saveMode = saveMode;
+ }
+
+ public Boolean getStrongCheck() {
+ return strongCheck;
+ }
+
+ public void setStrongCheck(Boolean strongCheck) {
+ this.strongCheck = strongCheck;
+ }
+
+ public Boolean getWriteAsFile() {
+ return writeAsFile;
+ }
+
+ public void setWriteAsFile(Boolean writeAsFile) {
+ this.writeAsFile = writeAsFile;
+ }
+
+ public Integer getNumPartitions() {
+ return numPartitions;
+ }
+
+ public void setNumPartitions(Integer numPartitions) {
+ if (numPartitions == null) return;
+ this.numPartitions = numPartitions > 20 ? 20 : numPartitions;
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public void setOptions(Map<String, String> options) {
+ this.options = options;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSinkConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSinkConfig.java
new file mode 100644
index 000000000..21f9aba59
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSinkConfig.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class JdbcSinkConfig extends SinkConfig {
+
+ @NotBlank private String url;
+
+ @NotBlank private String driver;
+
+ @NotBlank private String user;
+
+ @NotBlank private String password;
+
+ private String targetDatabase;
+
+ @NotBlank private String targetTable;
+
+ @NotBlank
+ @Pattern(
+ regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+ message =
+ "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite',
'append', 'ignore', 'error', 'errorifexists'.")
+ private String saveMode = "overwrite";
+
+ private List<String> preQueries = new ArrayList<>();
+
+ private Integer numPartitions = 10;
+
+ private Map<String, String> options;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getDriver() {
+ return driver;
+ }
+
+ public void setDriver(String driver) {
+ this.driver = driver;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getTargetDatabase() {
+ return targetDatabase;
+ }
+
+ public void setTargetDatabase(String targetDatabase) {
+ this.targetDatabase = targetDatabase;
+ }
+
+ public String getTargetTable() {
+ return targetTable;
+ }
+
+ public void setTargetTable(String targetTable) {
+ this.targetTable = targetTable;
+ }
+
+ public String getSaveMode() {
+ return saveMode;
+ }
+
+ public void setSaveMode(String saveMode) {
+ this.saveMode = saveMode;
+ }
+
+ public List<String> getPreQueries() {
+ return preQueries;
+ }
+
+ public void setPreQueries(List<String> preQueries) {
+ this.preQueries = preQueries;
+ }
+
+ public Integer getNumPartitions() {
+ return numPartitions;
+ }
+
+ public void setNumPartitions(Integer numPartitions) {
+ if (numPartitions == null) return;
+ this.numPartitions = numPartitions > 20 ? 20 : numPartitions;
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public void setOptions(Map<String, String> options) {
+ this.options = options;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ManagedJdbcSinkConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ManagedJdbcSinkConfig.java
new file mode 100644
index 000000000..3ccec8287
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/sink/ManagedJdbcSinkConfig.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ManagedJdbcSinkConfig extends SinkConfig {
+
+ @NotBlank private String targetDatasource;
+
+ private String targetDatabase;
+
+ @NotBlank private String targetTable;
+
+ @NotBlank
+ @Pattern(
+ regexp = "^(overwrite|append|ignore|error|errorifexists)$",
+ message =
+ "Unknown save mode: {saveMode}. Accepted save modes are 'overwrite',
'append', 'ignore', 'error', 'errorifexists'.")
+ private String saveMode = "overwrite";
+
+ private List<String> preQueries = new ArrayList<>();
+
+ private Integer numPartitions = 10;
+
+ private Map<String, String> options;
+
+ public String getTargetDatasource() {
+ return targetDatasource;
+ }
+
+ public void setTargetDatasource(String targetDatasource) {
+ this.targetDatasource = targetDatasource;
+ }
+
+ public String getTargetDatabase() {
+ return targetDatabase;
+ }
+
+ public void setTargetDatabase(String targetDatabase) {
+ this.targetDatabase = targetDatabase;
+ }
+
+ public String getTargetTable() {
+ return targetTable;
+ }
+
+ public void setTargetTable(String targetTable) {
+ this.targetTable = targetTable;
+ }
+
+ public String getSaveMode() {
+ return saveMode;
+ }
+
+ public void setSaveMode(String saveMode) {
+ this.saveMode = saveMode;
+ }
+
+ public List<String> getPreQueries() {
+ return preQueries;
+ }
+
+ public void setPreQueries(List<String> preQueries) {
+ this.preQueries = preQueries;
+ }
+
+ public Integer getNumPartitions() {
+ return numPartitions;
+ }
+
+ public void setNumPartitions(Integer numPartitions) {
+ if (numPartitions == null) return;
+ this.numPartitions = numPartitions > 20 ? 20 : numPartitions;
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public void setOptions(Map<String, String> options) {
+ this.options = options;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/FileSourceConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/FileSourceConfig.java
new file mode 100644
index 000000000..7e83bdd82
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/FileSourceConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
+import java.util.Map;
+
+public class FileSourceConfig extends SourceConfig {
+
+ @NotBlank
+ @Pattern(
+ regexp = "^(file|hdfs)://.*",
+ message =
+ "Invalid path URI, please set the following allowed schemas:
'file://' or 'hdfs://'.")
+ private String path;
+
+ @NotBlank private String serializer = "parquet";
+
+ private String[] columnNames;
+
+ private Map<String, String> options;
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public String getSerializer() {
+ return serializer;
+ }
+
+ public void setSerializer(String serializer) {
+ this.serializer = serializer;
+ }
+
+ public String[] getColumnNames() {
+ return columnNames;
+ }
+
+ public void setColumnNames(String[] columnNames) {
+ this.columnNames = columnNames;
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public void setOptions(Map<String, String> options) {
+ this.options = options;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/JdbcSourceConfig.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/JdbcSourceConfig.java
new file mode 100644
index 000000000..fc13dfcb3
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/JdbcSourceConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source;
+
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+
+import javax.validation.constraints.NotBlank;
+
+import java.util.Map;
+
+public class JdbcSourceConfig extends SourceConfig {
+
+ @NotBlank private String url;
+
+ @NotBlank private String driver;
+
+ @NotBlank private String user;
+
+ @NotBlank private String password;
+
+ @NotBlank private String query;
+
+ private Map<String, String> options;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getDriver() {
+ return driver;
+ }
+
+ public void setDriver(String driver) {
+ this.driver = driver;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ public void setOptions(Map<String, String> options) {
+ this.options = options;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/ManagedJdbcSourceConfig.java
similarity index 50%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/ManagedJdbcSourceConfig.java
index ecb62db6a..57cab73e3 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/source/ManagedJdbcSourceConfig.java
@@ -15,23 +15,43 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.source;
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
+import javax.validation.constraints.NotBlank;
+
+import java.util.Map;
+
+public class ManagedJdbcSourceConfig extends SourceConfig {
+
+ @NotBlank private String datasource;
+
+ @NotBlank private String query;
+
+ private Map<String, String> options;
+
+ public String getDatasource() {
+ return datasource;
+ }
+
+ public void setDatasource(String datasource) {
+ this.datasource = datasource;
+ }
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
+ public String getQuery() {
+ return query;
+ }
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+ public void setQuery(String query) {
+ this.query = query;
+ }
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+ public Map<String, String> getOptions() {
+ return options;
+ }
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+ public void setOptions(Map<String, String> options) {
+ this.options = options;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/transform/SqlTransformConfig.java
similarity index 51%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/transform/SqlTransformConfig.java
index ecb62db6a..3515d4e61 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/transform/SqlTransformConfig.java
@@ -15,23 +15,21 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.transform;
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.engineplugin.spark.datacalc.model.TransformConfig;
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
+import javax.validation.constraints.NotBlank;
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
+public class SqlTransformConfig extends TransformConfig {
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+ @NotBlank private String sql;
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+ public String getSql() {
+ return sql;
+ }
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+ public void setSql(String sql) {
+ this.sql = sql;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
new file mode 100644
index 000000000..5dfcd4e4a
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/datacalc/util/PluginUtil.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.util;
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.*;
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig;
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig;
+import org.apache.linkis.engineplugin.spark.datacalc.model.TransformConfig;
+import org.apache.linkis.server.BDPJettyServerHelper;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.ParameterizedType;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.gson.JsonElement;
+
+public class PluginUtil {
+
+ private static final Map<String, Class<?>> SOURCE_PLUGINS =
getSourcePlugins();
+ private static final Map<String, Class<?>> TRANSFORM_PLUGINS =
getTransformPlugins();
+ private static final Map<String, Class<?>> SINK_PLUGINS = getSinkPlugins();
+
+ private static Map<String, Class<?>> getSourcePlugins() {
+ Map<String, Class<?>> classMap = new HashMap<>();
+ // classMap.put("managed_jdbc",
+ //
org.apache.linkis.engineplugin.spark.datacalc.source.ManagedJdbcSource.class);
+ classMap.put("jdbc",
org.apache.linkis.engineplugin.spark.datacalc.source.JdbcSource.class);
+ classMap.put("file",
org.apache.linkis.engineplugin.spark.datacalc.source.FileSource.class);
+ return classMap;
+ }
+
+ private static Map<String, Class<?>> getTransformPlugins() {
+ Map<String, Class<?>> classMap = new HashMap<>();
+ classMap.put("sql",
org.apache.linkis.engineplugin.spark.datacalc.transform.SqlTransform.class);
+ return classMap;
+ }
+
+ private static Map<String, Class<?>> getSinkPlugins() {
+ Map<String, Class<?>> classMap = new HashMap<>();
+ // classMap.put("managed_jdbc",
+ //
org.apache.linkis.engineplugin.spark.datacalc.sink.ManagedJdbcSink.class);
+ classMap.put("jdbc",
org.apache.linkis.engineplugin.spark.datacalc.sink.JdbcSink.class);
+ classMap.put("hive",
org.apache.linkis.engineplugin.spark.datacalc.sink.HiveSink.class);
+ classMap.put("file",
org.apache.linkis.engineplugin.spark.datacalc.sink.FileSink.class);
+ return classMap;
+ }
+
+ public static <T extends SourceConfig> DataCalcSource<T> createSource(
+ String name, JsonElement config)
+ throws InstantiationException, IllegalAccessException,
InvocationTargetException,
+ NoSuchMethodException {
+ return createPlugin(SOURCE_PLUGINS, name, config);
+ }
+
+ public static <T extends TransformConfig> DataCalcTransform<T>
createTransform(
+ String name, JsonElement config)
+ throws InstantiationException, IllegalAccessException,
InvocationTargetException,
+ NoSuchMethodException {
+ return createPlugin(TRANSFORM_PLUGINS, name, config);
+ }
+
+ public static <T extends SinkConfig> DataCalcSink<T> createSink(String name,
JsonElement config)
+ throws InstantiationException, IllegalAccessException,
InvocationTargetException,
+ NoSuchMethodException {
+ return createPlugin(SINK_PLUGINS, name, config);
+ }
+
+ static <T extends DataCalcPlugin> T createPlugin(
+ Map<String, Class<?>> pluginMap, String name, JsonElement config)
+ throws InstantiationException, IllegalAccessException,
NoSuchMethodException,
+ InvocationTargetException {
+ Class<?> type = pluginMap.get(name);
+ ParameterizedType genericSuperclass = (ParameterizedType)
type.getGenericInterfaces()[0];
+ Class<?> configType = (Class<?>)
genericSuperclass.getActualTypeArguments()[0];
+ T plugin = (T) type.getDeclaredConstructor().newInstance();
+ plugin.setConfig(BDPJettyServerHelper.gson().fromJson(config, configType));
+ return plugin;
+ }
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
index 0c1a9e21e..b41c09496 100644
---
a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
+++
b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/errorcode/SparkErrorCodeSummary.java
@@ -40,7 +40,23 @@ public enum SparkErrorCodeSummary {
"Invalid EngineConn engine session obj, failed to create sparkSql
executor(EngineConn 引擎会话 obj 无效,无法创建 sparkSql 执行程序)"),
INVALID_CREATE_SPARKPYTHON(
420002,
- "Invalid EngineConn engine session obj, failed to create sparkPython
executor(EngineConn 引擎会话 obj 无效,无法创建 sparkPython 执行程序)");
+ "Invalid EngineConn engine session obj, failed to create sparkPython
executor(EngineConn 引擎会话 obj 无效,无法创建 sparkPython 执行程序)"),
+
+ DATA_CALC_CONFIG_VALID_FAILED(43001, "Config data validate failed"),
+ DATA_CALC_CONFIG_TYPE_NOT_VALID(43002, "[{0}] is not a valid type"),
+
+ DATA_CALC_DATASOURCE_NOT_CONFIG(43011, "Datasource {0} is not configured!"),
+
+ DATA_CALC_COLUMN_NOT_MATCH(
+ 43021,
+ "{0}st column ({1}[{2}]) name or data type does not match target table
column ({3}[{4}])"),
+ DATA_CALC_COLUMN_NUM_NOT_MATCH(
+ 43022,
+ "{0} requires that the data to be inserted have the same number of
columns as the target table: target table has {1} column(s) but the inserted
data has {2} column(s)"),
+ DATA_CALC_FIELD_NOT_EXIST(43023, "{0} columns({1}) are not exist in source
columns"),
+ DATA_CALC_VARIABLE_NOT_EXIST(43024, "Please set [{0}] in variables"),
+ ;
+
/** (errorCode)错误码 */
private int errorCode;
/** (errorDesc)错误描述 */
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
index 6ba8b210c..46e8b5def 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/common/SparkKind.scala
@@ -37,6 +37,7 @@ object SparkKind {
val SPARKMIX_TYPE = "sparkmix"
val MIX_TYPE = "mix"
val SPARKSQL_TYPE = "sparksql"
+ val SPARK_DATA_CALC_TYPE = "spark_data_calc"
val SPARKMLSQL_TYPE = "mlsql"
val FUNCTION_MDQ_TYPE = "function.mdq"
@@ -83,6 +84,10 @@ case class SparkSQL() extends Kind {
override val toString: String = SparkKind.SPARKSQL_TYPE
}
+case class SparkDataCalc() extends Kind {
+ override val toString: String = SparkKind.SPARK_DATA_CALC_TYPE
+}
+
case class SparkMLSQL() extends Kind {
override val toString = SparkKind.SPARKMLSQL_TYPE
}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/DataCalcExecution.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/DataCalcExecution.scala
new file mode 100644
index 000000000..5d2870138
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/DataCalcExecution.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.{
+ DataCalcPlugin,
+ DataCalcSink,
+ DataCalcSource,
+ DataCalcTransform
+}
+import
org.apache.linkis.engineplugin.spark.datacalc.exception.ConfigRuntimeException
+import org.apache.linkis.engineplugin.spark.datacalc.model._
+import org.apache.linkis.engineplugin.spark.datacalc.util.PluginUtil
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+import org.apache.linkis.server.BDPJettyServerHelper
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
+
+import javax.validation.{Validation, Validator}
+
+import java.text.MessageFormat
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+import org.slf4j.{Logger, LoggerFactory}
+
+object DataCalcExecution {
+
+ private val log: Logger = LoggerFactory.getLogger(DataCalcExecution.getClass)
+
+ def getPlugins[SR <: SourceConfig, TR <: TransformConfig, SK <: SinkConfig](
+ mapleData: DataCalcGroupData
+ ): (Array[DataCalcSource[SR]], Array[DataCalcTransform[TR]],
Array[DataCalcSink[SK]]) = {
+ val sources = mapleData.getSources.map(source =>
+ PluginUtil.createSource[SR](source.getName, source.getConfig)
+ )
+ val transformations = mapleData.getTransformations.map(sink =>
+ PluginUtil.createTransform[TR](sink.getName, sink.getConfig)
+ )
+ val sinks =
+ mapleData.getSinks.map(sink => PluginUtil.createSink[SK](sink.getName,
sink.getConfig))
+
+ val checkResult = new CheckResult()
+ sources.foreach(source => {
+ source.getConfig.setVariables(mapleData.getVariables)
+ checkResult.checkResultTable(source)
+ })
+ transformations.foreach(transformation => {
+ transformation.getConfig.setVariables(mapleData.getVariables)
+ checkResult.checkResultTable(transformation)
+ })
+ sinks.foreach(sink => {
+ sink.getConfig.setVariables(mapleData.getVariables)
+ checkResult.checkPluginConfig(sink)
+ })
+ checkResult.check()
+
+ (sources, transformations, sinks)
+ }
+
+ def execute[SR <: SourceConfig, TR <: TransformConfig, SK <: SinkConfig](
+ spark: SparkSession,
+ sources: Array[DataCalcSource[SR]],
+ transformations: Array[DataCalcTransform[TR]],
+ sinks: Array[DataCalcSink[SK]]
+ ): Unit = {
+ if (sources != null && !sources.isEmpty) sources.foreach(source =>
sourceProcess(spark, source))
+ if (transformations != null && !transformations.isEmpty)
+ transformations.foreach(transformation => transformProcess(spark,
transformation))
+ if (sinks != null && !sinks.isEmpty) sinks.foreach(sink =>
sinkProcess(spark, sink))
+
+ DataCalcTempData.clean(spark.sqlContext)
+ }
+
+ def getPlugins[SR <: SourceConfig, TR <: TransformConfig, SK <: SinkConfig,
T <: Object](
+ mapleData: DataCalcArrayData
+ ): Array[Any] = {
+ val checkResult = new CheckResult()
+ val plugins = new Array[Any](mapleData.getPlugins.length)
+ for (i <- mapleData.getPlugins.indices) {
+ val config = mapleData.getPlugins()(i)
+ config.getType match {
+ case "source" =>
+ val source = PluginUtil.createSource[SR](config.getName,
config.getConfig)
+ source.getConfig.setVariables(mapleData.getVariables)
+ checkResult.checkResultTable(source)
+ plugins(i) = source
+ case "transformation" =>
+ val transformation = PluginUtil.createTransform[TR](config.getName,
config.getConfig)
+ transformation.getConfig.setVariables(mapleData.getVariables)
+ checkResult.checkResultTable(transformation)
+ plugins(i) = transformation
+ case "sink" =>
+ val sink = PluginUtil.createSink[SK](config.getName,
config.getConfig)
+ sink.getConfig.setVariables(mapleData.getVariables)
+ checkResult.checkPluginConfig(sink)
+ plugins(i) = sink
+ case t: String =>
+ throw new ConfigRuntimeException(
+ SparkErrorCodeSummary.DATA_CALC_CONFIG_TYPE_NOT_VALID.getErrorCode,
+ MessageFormat.format(
+
SparkErrorCodeSummary.DATA_CALC_CONFIG_TYPE_NOT_VALID.getErrorDesc,
+ t
+ )
+ )
+ }
+ }
+ checkResult.check()
+ plugins
+ }
+
+ def execute[SR <: SourceConfig, TR <: TransformConfig, SK <: SinkConfig, T
<: Object](
+ spark: SparkSession,
+ plugins: Array[Any]
+ ): Unit = {
+ if (plugins == null || plugins.isEmpty) return
+ plugins.foreach {
+ case source: DataCalcSource[SR] => sourceProcess(spark, source)
+ case transform: DataCalcTransform[TR] => transformProcess(spark,
transform)
+ case sink: DataCalcSink[SK] => sinkProcess(spark, sink)
+ case _ =>
+ }
+
+ DataCalcTempData.clean(spark.sqlContext)
+ }
+
+ private def sourceProcess[T <: SourceConfig](
+ spark: SparkSession,
+ source: DataCalcSource[T]
+ ): Unit = {
+ source.prepare(spark)
+ val ds: Dataset[Row] = source.getData(spark)
+ tempSaveResultTable(ds, source.getConfig)
+ }
+
+ private def transformProcess[T <: TransformConfig](
+ spark: SparkSession,
+ transform: DataCalcTransform[T]
+ ): Unit = {
+ transform.prepare(spark)
+ val fromDs: Dataset[Row] = if
(StringUtils.isNotBlank(transform.getConfig.getSourceTable)) {
+ spark.read.table(transform.getConfig.getSourceTable)
+ } else {
+ null
+ }
+ val ds: Dataset[Row] = transform.process(spark, fromDs)
+ tempSaveResultTable(ds, transform.getConfig)
+ }
+
+ private def sinkProcess[T <: SinkConfig](spark: SparkSession, sink:
DataCalcSink[T]): Unit = {
+ sink.prepare(spark)
+ val fromDs: Dataset[Row] = if
(StringUtils.isBlank(sink.getConfig.getSourceQuery)) {
+ spark.read.table(sink.getConfig.getSourceTable)
+ } else {
+ spark.sql(sink.getConfig.getSourceQuery)
+ }
+ sink.output(spark, fromDs)
+ }
+
+ private def tempSaveResultTable(ds: Dataset[Row], resultTableConfig:
ResultTableConfig): Unit = {
+ if (ds != null) {
+ ds.createOrReplaceTempView(resultTableConfig.getResultTable)
+ DataCalcTempData.putResultTable(resultTableConfig.getResultTable)
+ if (resultTableConfig.getPersist) {
+ ds.persist(StorageLevel.fromString(resultTableConfig.getStorageLevel))
+ DataCalcTempData.putPersistDataSet(ds)
+ }
+ }
+ }
+
+ private class CheckResult {
+
+ private var success: Boolean = true
+ private val set: mutable.Set[String] = mutable.Set()
+
+ val validator: Validator =
Validation.buildDefaultValidatorFactory().getValidator
+
+ def checkResultTable[T <: ResultTableConfig](plugin: DataCalcPlugin[T]):
Unit = {
+ checkPluginConfig(plugin)
+ if (set.contains(plugin.getConfig.getResultTable)) {
+ log.error(s"Result table [${plugin.getConfig.getResultTable}] cannot
be duplicate")
+ success = false
+ } else {
+ set.add(plugin.getConfig.getResultTable)
+ }
+ }
+
+ def checkPluginConfig[T](plugin: DataCalcPlugin[T]): Unit = {
+ val violations = validator.validate(plugin.getConfig)
+ if (!violations.isEmpty) {
+ success = false
+ log.error(
+ s"Configuration check error,
${BDPJettyServerHelper.gson.toJson(plugin.getConfig)}"
+ )
+ for (violation <- violations) {
+ if (
+ violation.getMessageTemplate
+ .startsWith("{") && violation.getMessageTemplate.endsWith("}")
+ ) {
+ log.error(s"[${violation.getPropertyPath}]
${violation.getMessage}")
+ } else {
+ log.error(violation.getMessage)
+ }
+ }
+ }
+ }
+
+ def check(): Unit = {
+ if (!success) {
+ throw new ConfigRuntimeException(
+ SparkErrorCodeSummary.DATA_CALC_CONFIG_VALID_FAILED.getErrorCode,
+ SparkErrorCodeSummary.DATA_CALC_CONFIG_VALID_FAILED.getErrorDesc
+ )
+ }
+ }
+
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/DataCalcTempData.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/DataCalcTempData.scala
new file mode 100644
index 000000000..5d1d84221
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/DataCalcTempData.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.spark.sql.{Dataset, Row, SQLContext}
+
+import scala.collection.mutable
+
+object DataCalcTempData {
+
+ private val RESULT_TABLES: mutable.Set[String] = mutable.Set[String]()
+ private val PERSIST_DATASETS: mutable.Set[Dataset[Row]] =
mutable.Set[Dataset[Row]]()
+
+ def putResultTable(resultTableName: String): Unit = {
+ RESULT_TABLES.add(resultTableName)
+ }
+
+ def putPersistDataSet(ds: Dataset[Row]): Unit = {
+ PERSIST_DATASETS.add(ds)
+ }
+
+ /**
+ * clean temporary data
+ * @param sqlContext
+ */
+ def clean(sqlContext: SQLContext): Unit = {
+ RESULT_TABLES.foreach(resultTable => sqlContext.dropTempTable(resultTable))
+ RESULT_TABLES.clear()
+
+ PERSIST_DATASETS.foreach(ds => ds.unpersist())
+ PERSIST_DATASETS.clear()
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcPlugin.scala
similarity index 50%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcPlugin.scala
index ecb62db6a..23a840d83 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcPlugin.scala
@@ -15,23 +15,18 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.api
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.spark.sql.SparkSession
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
+trait DataCalcPlugin[T] extends Serializable {
+ protected var config: T = _
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+ def getConfig: T = config
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+ def setConfig(config: T): Unit = {
+ this.config = config
+ }
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+ def prepare(spark: SparkSession): Unit = {}
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcSink.scala
similarity index 50%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcSink.scala
index ecb62db6a..895a3fcca 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcSink.scala
@@ -15,23 +15,12 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.api
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.engineplugin.spark.datacalc.model.SinkConfig
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
-
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+trait DataCalcSink[T <: SinkConfig] extends DataCalcPlugin[T] {
+ def output(spark: SparkSession, data: Dataset[Row]): Unit
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcSource.scala
similarity index 50%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcSource.scala
index ecb62db6a..ea8e58bc0 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcSource.scala
@@ -15,23 +15,12 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.api
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.engineplugin.spark.datacalc.model.SourceConfig
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
-
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+trait DataCalcSource[T <: SourceConfig] extends DataCalcPlugin[T] {
+ def getData(spark: SparkSession): Dataset[Row]
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcTransform.scala
similarity index 50%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcTransform.scala
index ecb62db6a..54ff1f9a3 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/api/DataCalcTransform.scala
@@ -15,23 +15,12 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.api
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.engineplugin.spark.datacalc.model.TransformConfig
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
-
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
-
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
-
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+trait DataCalcTransform[T <: TransformConfig] extends DataCalcPlugin[T] {
+ def process(spark: SparkSession, ds: Dataset[Row]): Dataset[Row]
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/FileSink.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/FileSink.scala
new file mode 100644
index 000000000..1c9d9e034
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/FileSink.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.commons.text.StringSubstitutor
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import scala.collection.JavaConverters._
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class FileSink extends DataCalcSink[FileSinkConfig] {
+
+ private val log: Logger = LoggerFactory.getLogger(classOf[FileSink])
+
+ def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+ val writer = ds.write.mode(config.getSaveMode)
+
+ if (config.getPartitionBy != null && !config.getPartitionBy.isEmpty) {
+ val partitionKeys = config.getPartitionBy.asScala
+ writer.partitionBy(partitionKeys: _*)
+ }
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ writer.options(config.getOptions)
+ }
+ val substitutor = new StringSubstitutor(config.getVariables)
+ val path = substitutor.replace(config.getPath)
+ log.info(s"Save data to file, path: $path")
+
+ config.getSerializer match {
+ case "csv" => writer.csv(path)
+ case "json" => writer.json(path)
+ case "parquet" => writer.parquet(path)
+ case "text" => writer.text(path)
+ case "orc" => writer.orc(path)
+ case _ => writer.format(config.getSerializer).save(path)
+ }
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala
new file mode 100644
index 000000000..348f819c5
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/HiveSink.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+import
org.apache.linkis.engineplugin.spark.datacalc.exception.HiveSinkException
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types.StructField
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class HiveSink extends DataCalcSink[HiveSinkConfig] {
+
+ private val log: Logger = LoggerFactory.getLogger(classOf[HiveSink])
+
+ def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+ val targetTable =
+ if (StringUtils.isBlank(config.getTargetDatabase)) config.getTargetTable
+ else config.getTargetDatabase + "." + config.getTargetTable
+ val targetFields = spark.table(targetTable).schema.fields
+ if (config.getWriteAsFile != null && config.getWriteAsFile) {
+ val partitionsColumns = spark.catalog
+ .listColumns(targetTable)
+ .where(col("isPartition") === true)
+ .select("name")
+ .collect()
+ .map(_.getAs[String]("name"))
+ val location = getLocation(spark, targetTable, partitionsColumns)
+ val fileFormat = getTableFileFormat(spark, targetTable)
+
+ log.info(
+ s"Write $fileFormat into target table: $targetTable, location:
$location, file format: $fileFormat"
+ )
+ val writer = getSaveWriter(
+ ds,
+ targetFields.filter(field => !partitionsColumns.contains(field.name)),
+ targetTable
+ )
+ fileFormat match {
+ case FileFormat.PARQUET => writer.parquet(location)
+ case FileFormat.ORC => writer.orc(location)
+ case _ =>
+ }
+
+ val partition = partitionsColumns
+ .map(colName => s"$colName='${config.getVariables.get(colName)}'")
+ .mkString(",")
+ if (StringUtils.isNotBlank(partition)) {
+ log.info(s"Refresh table partition: $partition")
+ refreshPartition(spark, targetTable, partition)
+ }
+ } else {
+ val writer = getSaveWriter(ds, targetFields, targetTable)
+ log.info(s"InsertInto data to hive table: $targetTable")
+ writer.format("hive").insertInto(targetTable)
+ }
+ }
+
+ def getSaveWriter(
+ ds: Dataset[Row],
+ targetFields: Array[StructField],
+ targetTable: String
+ ): DataFrameWriter[Row] = {
+ val dsSource = sequenceFields(ds, ds.schema.fields, targetFields,
targetTable)
+ val sourceFields = dsSource.schema.fields
+
+ // Compare column's data type when [strongCheck] is true
+ if (config.getStrongCheck != null && config.getStrongCheck) {
+ for (i <- sourceFields.indices) {
+ val targetField = targetFields(i)
+ val sourceField = sourceFields(i)
+ if (!targetField.dataType.equals(sourceField.dataType)) {
+ logFields(sourceFields, targetFields)
+ throw new HiveSinkException(
+ SparkErrorCodeSummary.DATA_CALC_COLUMN_NOT_MATCH.getErrorCode,
+ s"${i + 1}st column (${sourceField.name}[${sourceField.dataType}])
name or data type does not match target table column
(${targetField.name}[${targetField.dataType}])"
+ )
+ }
+ }
+ }
+
+ val writer =
dsSource.repartition(config.getNumPartitions).write.mode(config.getSaveMode)
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ writer.options(config.getOptions)
+ }
+ writer
+ }
+
+ def logFields(sourceFields: Array[StructField], targetFields:
Array[StructField]): Unit = {
+ log.info(s"sourceFields: ${sourceFields.mkString("Array(", ", ", ")")}")
+ log.info(s"targetFields: ${targetFields.mkString("Array(", ", ", ")")}")
+ }
+
+ def sequenceFields(
+ dsSource: Dataset[Row],
+ sourceFields: Array[StructField],
+ targetFields: Array[StructField],
+ targetTable: String
+ ): DataFrame = {
+ if (targetFields.length != sourceFields.length) {
+ logFields(sourceFields, targetFields)
+ throw new HiveSinkException(
+ SparkErrorCodeSummary.DATA_CALC_COLUMN_NUM_NOT_MATCH.getErrorCode,
+ s"$targetTable requires that the data to be inserted have the same
number of columns as the target table: target table has ${targetFields.length}
column(s) but the inserted data has ${sourceFields.length} column(s)"
+ )
+ }
+
+ // hive columns is lowercase
+ val sourceFieldMap = sourceFields.map(field => field.name.toLowerCase ->
field).toMap
+ val targetFieldMap = targetFields.map(field => field.name.toLowerCase ->
field).toMap
+
+ val subSet = targetFieldMap.keySet -- sourceFieldMap.keySet
+ if (subSet.isEmpty) {
+ // sort column
+ dsSource.select(targetFields.map(field => col(field.name)): _*)
+ } else if (subSet.size == targetFieldMap.size) {
+ log.info("None target table fields match with source fields, write in
order")
+ dsSource.toDF(targetFields.map(field => field.name): _*)
+ } else {
+ throw new HiveSinkException(
+ SparkErrorCodeSummary.DATA_CALC_FIELD_NOT_EXIST.getErrorCode,
+ s"$targetTable fields(${subSet.mkString(",")}) are not exist in source
fields"
+ )
+ }
+ }
+
+ /**
+ * get hive table location
+ *
+ * @param spark
+ * @param targetTable
+ * @return
+ * hive table location
+ */
+ def getLocation(
+ spark: SparkSession,
+ targetTable: String,
+ partitionsColumns: Array[String]
+ ): String = {
+ val locations =
+ spark.sql(s"desc formatted $targetTable").filter(col("col_name") ===
"Location").collect()
+ var location: String = locations(0).getString(1)
+ for (partitionColName <- partitionsColumns) {
+ if (
+ !config.getVariables.containsKey(partitionColName) ||
+ StringUtils.isBlank(config.getVariables.get(partitionColName))
+ ) {
+ throw new HiveSinkException(
+ SparkErrorCodeSummary.DATA_CALC_VARIABLE_NOT_EXIST.getErrorCode,
+ s"Please set [${partitionsColumns.mkString(", ")}] in variables"
+ )
+ }
+ location +=
s"/$partitionColName=${config.getVariables.get(partitionColName)}"
+ }
+ location
+ }
+
+ def getTableFileFormat(spark: SparkSession, targetTable: String):
FileFormat.Value = {
+ try {
+ var fileFormat: FileFormat.FileFormat = FileFormat.OTHER
+ spark.table(targetTable).queryExecution.optimizedPlan match {
+ case logicalRelation: LogicalRelation =>
+ logicalRelation.relation match {
+ case hadoopFsRelation: HadoopFsRelation =>
+ hadoopFsRelation.fileFormat match {
+ case _:
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat =>
+ fileFormat = FileFormat.ORC
+ case _:
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat =>
+ fileFormat = FileFormat.PARQUET
+ case dataSourceRegister: DataSourceRegister =>
+ fileFormat =
FileFormat.withName(dataSourceRegister.shortName.toUpperCase)
+ case _ =>
+ }
+ }
+ case hiveTableRelation: HiveTableRelation =>
+ // todo
+ }
+ fileFormat
+ } catch {
+ case _: Exception => FileFormat.OTHER
+ }
+ }
+
+ def refreshPartition(spark: SparkSession, targetTable: String, partition:
String): Unit = {
+ spark.sql(s"ALTER TABLE $targetTable DROP IF EXISTS partition($partition)")
+ spark.sql(s"ALTER TABLE $targetTable ADD IF NOT EXISTS
partition($partition)")
+ }
+
+}
+
+object FileFormat extends Enumeration {
+ type FileFormat = Value
+ val ORC, PARQUET, OTHER = Value
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
new file mode 100644
index 000000000..498c000ae
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/JdbcSink.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
+
+import java.sql.Connection
+
+import scala.collection.JavaConverters._
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class JdbcSink extends DataCalcSink[JdbcSinkConfig] {
+
+ private val log: Logger = LoggerFactory.getLogger(classOf[JdbcSink])
+
+ def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+ val targetTable =
+ if (StringUtils.isBlank(config.getTargetDatabase)) config.getTargetTable
+ else config.getTargetDatabase + "." + config.getTargetTable
+ var options = Map(
+ "url" -> config.getUrl,
+ "driver" -> config.getDriver,
+ "user" -> config.getUser,
+ "password" -> config.getPassword,
+ "dbtable" -> targetTable,
+ "connectionCollation" -> "utf8mb4_unicode_ci"
+ )
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ options = config.getOptions.asScala.toMap ++ options
+ }
+
+ options = options ++ Map(
+ "isolationLevel" -> options.getOrElse("isolationLevel", "NONE"),
+ "batchsize" -> options.getOrElse("batchsize", "5000")
+ )
+
+ if (config.getPreQueries != null && !config.getPreQueries.isEmpty) {
+ spark
+ .sql("select 1")
+ .repartition(1)
+ .foreachPartition(_ => {
+ val jdbcOptions = new JDBCOptions(options)
+ val conn: Connection =
JdbcUtils.createConnectionFactory(jdbcOptions)()
+ try {
+ config.getPreQueries.asScala.foreach(query => {
+ log.info(s"Execute pre query: $query")
+ execute(conn, jdbcOptions, query)
+ })
+ } catch {
+ case e: Exception => log.error("Execute preQueries failed. ", e)
+ } finally {
+ conn.close()
+ }
+ })
+ }
+
+ val writer = ds.repartition(config.getNumPartitions).write.format("jdbc")
+ if (StringUtils.isNotBlank(config.getSaveMode)) {
+ writer.mode(config.getSaveMode)
+ }
+ log.info(
+ s"Save data to jdbc url: ${config.getUrl}, driver: ${config.getDriver},
username: ${config.getUser}, table: $targetTable"
+ )
+ writer.options(options).save()
+ }
+
+ private def execute(conn: Connection, jdbcOptions: JDBCOptions, query:
String): Unit = {
+ log.info("Execute query: {}", query)
+ val statement = conn.prepareStatement(query)
+ try {
+ statement.setQueryTimeout(jdbcOptions.queryTimeout)
+ val rows = statement.executeUpdate()
+ log.info("{} rows affected", rows)
+ } catch {
+ case e: Exception => log.error("Execute query failed. ", e)
+ } finally {
+ statement.close()
+ }
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ManagedJdbcSink.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ManagedJdbcSink.scala
new file mode 100644
index 000000000..d64e0b6a5
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/sink/ManagedJdbcSink.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.sink
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSink
+import
org.apache.linkis.engineplugin.spark.datacalc.exception.DatabaseNotConfigException
+import org.apache.linkis.engineplugin.spark.datacalc.model.DataCalcDataSource
+import
org.apache.linkis.engineplugin.spark.datacalc.service.LinkisDataSourceService
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import java.text.MessageFormat
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class ManagedJdbcSink extends DataCalcSink[ManagedJdbcSinkConfig] {
+
+ private val log: Logger = LoggerFactory.getLogger(classOf[ManagedJdbcSink])
+
+ def output(spark: SparkSession, ds: Dataset[Row]): Unit = {
+ val db: DataCalcDataSource =
LinkisDataSourceService.getDatasource(config.getTargetDatasource)
+ if (db == null) {
+ throw new DatabaseNotConfigException(
+ SparkErrorCodeSummary.DATA_CALC_DATASOURCE_NOT_CONFIG.getErrorCode,
+ MessageFormat.format(
+ SparkErrorCodeSummary.DATA_CALC_DATASOURCE_NOT_CONFIG.getErrorDesc,
+ config.getTargetDatasource
+ )
+ )
+ }
+
+ val jdbcConfig = new JdbcSinkConfig()
+ jdbcConfig.setUrl(db.getUrl)
+ jdbcConfig.setDriver(db.getDriver)
+ jdbcConfig.setUser(db.getUser)
+ jdbcConfig.setPassword(db.getPassword)
+ jdbcConfig.setTargetDatabase(config.getTargetDatabase)
+ jdbcConfig.setTargetTable(config.getTargetTable)
+ jdbcConfig.setSaveMode(config.getSaveMode)
+ jdbcConfig.setPreQueries(config.getPreQueries)
+ jdbcConfig.setNumPartitions(config.getNumPartitions)
+ jdbcConfig.setOptions(config.getOptions)
+ jdbcConfig.setSourceTable(config.getSourceTable)
+ jdbcConfig.setSourceQuery(config.getSourceQuery)
+ jdbcConfig.setVariables(config.getVariables)
+
+ val sinkPlugin = new JdbcSink()
+ sinkPlugin.setConfig(jdbcConfig)
+ sinkPlugin.output(spark, ds)
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/FileSource.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/FileSource.scala
new file mode 100644
index 000000000..95f2e0535
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/FileSource.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.commons.text.StringSubstitutor
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class FileSource extends DataCalcSource[FileSourceConfig] {
+
+ private val log: Logger = LoggerFactory.getLogger(classOf[FileSource])
+
+ override def getData(spark: SparkSession): Dataset[Row] = {
+ val reader = spark.read
+
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ reader.options(config.getOptions)
+ }
+ val substitutor = new StringSubstitutor(config.getVariables)
+ val path = substitutor.replace(config.getPath)
+ log.info(s"Load data from file <$path>")
+
+ var df = config.getSerializer match {
+ case "csv" => reader.csv(path)
+ case "json" => reader.json(path)
+ case "parquet" => reader.parquet(path)
+ case "text" => reader.text(path)
+ case "orc" => reader.orc(path)
+ case _ => reader.format(config.getSerializer).load(path)
+ }
+ if (config.getColumnNames != null && config.getColumnNames.length > 0) {
+ df = df.toDF(config.getColumnNames: _*)
+ }
+ df
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/JdbcSource.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/JdbcSource.scala
new file mode 100644
index 000000000..7bdc43b38
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/JdbcSource.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class JdbcSource extends DataCalcSource[JdbcSourceConfig] {
+
+ private val log: Logger = LoggerFactory.getLogger(classOf[JdbcSource])
+
+ override def getData(spark: SparkSession): Dataset[Row] = {
+ val reader = spark.read.format("jdbc")
+ if (config.getOptions != null && !config.getOptions.isEmpty) {
+ reader.options(config.getOptions)
+ }
+
+ log.info(
+ s"Load data from jdbc url: ${config.getUrl}, driver:
${config.getDriver}, username: ${config.getUser}, query: ${config.getQuery}"
+ )
+
+ reader
+ .option("url", config.getUrl)
+ .option("driver", config.getDriver)
+ .option("user", config.getUser)
+ .option("password", config.getPassword)
+ .option("query", config.getQuery)
+ .load()
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/ManagedJdbcSource.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/ManagedJdbcSource.scala
new file mode 100644
index 000000000..47a0f55ba
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/source/ManagedJdbcSource.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc.source
+
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcSource
+import
org.apache.linkis.engineplugin.spark.datacalc.exception.DatabaseNotConfigException
+import
org.apache.linkis.engineplugin.spark.datacalc.service.LinkisDataSourceService
+import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import java.text.MessageFormat
+
+import org.slf4j.{Logger, LoggerFactory}
+
+class ManagedJdbcSource extends DataCalcSource[ManagedJdbcSourceConfig] {
+
+ private val log: Logger = LoggerFactory.getLogger(classOf[ManagedJdbcSource])
+
+ override def getData(spark: SparkSession): Dataset[Row] = {
+ val db = LinkisDataSourceService.getDatasource(config.getDatasource)
+ if (db == null) {
+ throw new DatabaseNotConfigException(
+ SparkErrorCodeSummary.DATA_CALC_DATASOURCE_NOT_CONFIG.getErrorCode,
+ MessageFormat.format(
+ SparkErrorCodeSummary.DATA_CALC_DATASOURCE_NOT_CONFIG.getErrorDesc,
+ config.getDatasource
+ )
+ )
+ }
+
+ val jdbcConfig = new JdbcSourceConfig()
+ jdbcConfig.setUrl(db.getUrl)
+ jdbcConfig.setDriver(db.getDriver)
+ jdbcConfig.setUser(db.getUser)
+ jdbcConfig.setPassword(db.getPassword)
+ jdbcConfig.setQuery(config.getQuery)
+ jdbcConfig.setPersist(config.getPersist)
+ jdbcConfig.setOptions(config.getOptions)
+ jdbcConfig.setResultTable(config.getResultTable)
+
+ val sourcePlugin = new JdbcSource()
+ sourcePlugin.setConfig(jdbcConfig)
+ sourcePlugin.getData(spark)
+ }
+
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/transform/SqlTransform.scala
similarity index 51%
copy from
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
copy to
linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/transform/SqlTransform.scala
index ecb62db6a..bff241546 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/datacalc/transform/SqlTransform.scala
@@ -15,23 +15,21 @@
* limitations under the License.
*/
-package org.apache.linkis.engineplugin.spark.exception
+package org.apache.linkis.engineplugin.spark.datacalc.transform
-import org.apache.linkis.common.exception.ErrorException
-import org.apache.linkis.engineplugin.spark.errorcode.SparkErrorCodeSummary._
+import org.apache.linkis.engineplugin.spark.datacalc.api.DataCalcTransform
-/**
- */
-case class NoSupportEngineException(errCode: Int, desc: String)
- extends ErrorException(errCode, desc)
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+
+import org.slf4j.{Logger, LoggerFactory}
-case class NotSupportSparkTypeException(errorCode: Int, desc: String)
- extends ErrorException(errorCode, desc)
+class SqlTransform extends DataCalcTransform[SqlTransformConfig] {
-case class NotSupportSparkSqlTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKSQL.getErrorCode, desc)
+ private val log: Logger = LoggerFactory.getLogger(classOf[SqlTransform])
-case class NotSupportSparkPythonTypeException(desc: String)
- extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
+ override def process(spark: SparkSession, ds: Dataset[Row]): Dataset[Row] = {
+ log.info(s"Load data from query: ${config.getSql}")
+ spark.sql(config.getSql)
+ }
-case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
index ecb62db6a..db60850c5 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/exception/NoSupportEngineException.scala
@@ -35,3 +35,5 @@ case class NotSupportSparkPythonTypeException(desc: String)
extends ErrorException(INVALID_CREATE_SPARKPYTHON.getErrorCode, desc)
case class NotSupportSparkScalaTypeException(desc: String) extends
ErrorException(420003, desc)
+
+case class NotSupportSparkDataCalcTypeException(desc: String) extends
ErrorException(420004, desc)
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala
new file mode 100644
index 000000000..81679dce0
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkDataCalcExecutor.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.executor
+
+import org.apache.linkis.common.exception.FatalException
+import org.apache.linkis.common.utils.Utils
+import
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
+import org.apache.linkis.engineconn.core.executor.ExecutorManager
+import org.apache.linkis.engineplugin.spark.common.{Kind, SparkDataCalc}
+import org.apache.linkis.engineplugin.spark.datacalc.DataCalcExecution
+import org.apache.linkis.engineplugin.spark.datacalc.model.{DataCalcArrayData,
DataCalcGroupData}
+import org.apache.linkis.engineplugin.spark.entity.SparkEngineSession
+import org.apache.linkis.engineplugin.spark.utils.EngineUtils
+import org.apache.linkis.governance.common.paser.EmptyCodeParser
+import org.apache.linkis.scheduler.executer.{
+ CompletedExecuteResponse,
+ ErrorExecuteResponse,
+ ExecuteResponse,
+ SuccessExecuteResponse
+}
+
+import org.apache.arrow.memory.OutOfMemoryException
+import org.apache.commons.lang.exception.ExceptionUtils
+import org.apache.orc.storage.common.io.Allocator.AllocatorOutOfMemoryException
+
+import java.lang.reflect.InvocationTargetException
+
+class SparkDataCalcExecutor(sparkEngineSession: SparkEngineSession, id: Long)
+ extends SparkEngineConnExecutor(sparkEngineSession.sparkContext, id) {
+
+ override def init(): Unit = {
+ setCodeParser(new EmptyCodeParser)
+ super.init()
+ logger.info("spark data-calc executor start")
+ }
+
+ override def runCode(
+ executor: SparkEngineConnExecutor,
+ code: String,
+ context: EngineExecutionContext,
+ jobGroup: String
+ ): ExecuteResponse = {
+ logger.info("DataCalcExecutor run query: " + code)
+ context.appendStdout(s"${EngineUtils.getName} >> $code")
+ Utils.tryCatch {
+ val execType = context.getProperties.getOrDefault("exec-type",
"array").toString
+ if ("group" == execType) {
+ val (sources, transformations, sinks) =
+ DataCalcExecution.getPlugins(DataCalcGroupData.getData(code))
+ DataCalcExecution.execute(sparkEngineSession.sparkSession, sources,
transformations, sinks)
+ } else {
+ val plugins =
DataCalcExecution.getPlugins(DataCalcArrayData.getData(code))
+ DataCalcExecution.execute(sparkEngineSession.sparkSession, plugins)
+ }
+ SuccessExecuteResponse().asInstanceOf[CompletedExecuteResponse]
+ } {
+ case e: InvocationTargetException =>
+ logger.error("execute sparkDataCalc has InvocationTargetException!", e)
+ var cause = ExceptionUtils.getCause(e)
+ if (cause == null) cause = e
+ ErrorExecuteResponse(ExceptionUtils.getRootCauseMessage(e), cause)
+ case e: OutOfMemoryException =>
+ getErrorResponse(e, true)
+ case e: AllocatorOutOfMemoryException =>
+ getErrorResponse(e, true)
+ case e: FatalException =>
+ getErrorResponse(e, true)
+ case e: Exception =>
+ getErrorResponse(e, false)
+ case err: OutOfMemoryError =>
+ getErrorResponse(err, true)
+ case err: VirtualMachineError =>
+ getErrorResponse(err, true)
+ case err: Error =>
+ getErrorResponse(err, false)
+ }
+ }
+
+ def getErrorResponse(throwable: Throwable, needToStopEC: Boolean):
ErrorExecuteResponse = {
+ if (needToStopEC) {
+ logger.error(
+ s"execute sparkSQL has ${throwable.getClass.getName} now to set status
to shutdown!",
+ throwable
+ )
+ ExecutorManager.getInstance.getReportExecutor.tryShutdown()
+ } else {
+ logger.error(s"execute sparkSQL has ${throwable.getClass.getName}!",
throwable)
+ }
+ ErrorExecuteResponse(ExceptionUtils.getRootCauseMessage(throwable),
throwable)
+ }
+
+ override protected def getExecutorIdPreFix: String = "SparkDataCalcExecutor_"
+
+ override protected def getKind: Kind = SparkDataCalc()
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
index 35cf6801a..f2a40dd06 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkEngineConnExecutor.scala
@@ -26,7 +26,7 @@ import
org.apache.linkis.engineconn.computation.executor.execute.{
import org.apache.linkis.engineconn.computation.executor.utlis.ProgressUtils
import org.apache.linkis.engineconn.core.exception.ExecutorHookFatalException
import org.apache.linkis.engineconn.executor.entity.ResourceFetchExecutor
-import org.apache.linkis.engineplugin.spark.common.Kind
+import org.apache.linkis.engineplugin.spark.common.{Kind, SparkDataCalc}
import org.apache.linkis.engineplugin.spark.cs.CSSparkHelper
import org.apache.linkis.engineplugin.spark.extension.{
SparkPostExecutionHook,
@@ -112,7 +112,10 @@ abstract class SparkEngineConnExecutor(val sc:
SparkContext, id: Long)
logger.error(s"execute preExecution hook : ${hookName} failed.")
}
Utils.tryAndWarn(CSSparkHelper.setContextIDInfoToSparkConf(engineExecutorContext,
sc))
- val _code = Kind.getRealCode(preCode)
+ val _code = kind match {
+ case _: SparkDataCalc => preCode
+ case _ => Kind.getRealCode(preCode)
+ }
logger.info(s"Ready to run code with kind $kind.")
val jobId = JobUtils.getJobIdFromMap(engineExecutorContext.getProperties)
val jobGroupId = if (StringUtils.isNotBlank(jobId)) {
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkDataCalcExecutorFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkDataCalcExecutorFactory.scala
new file mode 100644
index 000000000..4b80a581e
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkDataCalcExecutorFactory.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.factory
+
+import org.apache.linkis.engineconn.common.creation.EngineCreationContext
+import org.apache.linkis.engineconn.common.engineconn.EngineConn
+import
org.apache.linkis.engineconn.computation.executor.creation.ComputationExecutorFactory
+import
org.apache.linkis.engineconn.computation.executor.execute.ComputationExecutor
+import org.apache.linkis.engineplugin.spark.entity.SparkEngineSession
+import
org.apache.linkis.engineplugin.spark.exception.NotSupportSparkDataCalcTypeException
+import org.apache.linkis.engineplugin.spark.executor.SparkDataCalcExecutor
+import org.apache.linkis.manager.label.entity.Label
+import org.apache.linkis.manager.label.entity.engine.RunType
+import org.apache.linkis.manager.label.entity.engine.RunType.RunType
+
+class SparkDataCalcExecutorFactory extends ComputationExecutorFactory {
+
+ override protected def newExecutor(
+ id: Int,
+ engineCreationContext: EngineCreationContext,
+ engineConn: EngineConn,
+ label: Array[Label[_]]
+ ): ComputationExecutor = {
+ engineConn.getEngineConnSession match {
+ case sparkEngineSession: SparkEngineSession =>
+ new SparkDataCalcExecutor(sparkEngineSession, id)
+ case _ =>
+ throw NotSupportSparkDataCalcTypeException(
+ "Invalid EngineConn engine session obj, failed to create sparkSql
executor"
+ )
+ }
+ }
+
+ override protected def getRunType: RunType = RunType.DATA_CALC
+}
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index 32ba4d0cf..098dccb80 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -192,7 +192,8 @@ class SparkEngineConnFactory extends
MultiExecutorEngineConnFactory with Logging
private val executorFactoryArray = Array[ExecutorFactory](
new SparkSqlExecutorFactory,
new SparkPythonExecutorFactory,
- new SparkScalaExecutorFactory
+ new SparkScalaExecutorFactory,
+ new SparkDataCalcExecutorFactory
)
override def getExecutorFactories: Array[ExecutorFactory] = {
diff --git
a/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDataCalcPlugins.scala
b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDataCalcPlugins.scala
new file mode 100644
index 000000000..efd128666
--- /dev/null
+++
b/linkis-engineconn-plugins/spark/src/test/scala/org/apache/linkis/engineplugin/spark/datacalc/TestDataCalcPlugins.scala
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.spark.datacalc
+
+import org.apache.linkis.engineplugin.spark.common.Kind
+import org.apache.linkis.engineplugin.spark.datacalc.model.{DataCalcArrayData,
DataCalcGroupData}
+import org.apache.linkis.engineplugin.spark.datacalc.util.PluginUtil
+import org.apache.linkis.engineplugin.spark.extension.SparkPreExecutionHook
+
+import org.junit.jupiter.api.{Assertions, Test};
+
+class TestDataCalcPlugins {
+
+ @Test
+ def testArrayBuild: Unit = {
+ val data = DataCalcArrayData.getData(arrayConfigJson)
+ Assertions.assertTrue(data != null)
+
+ val array = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(array != null)
+ }
+
+ @Test
+ def testGroupBuild: Unit = {
+ val data = DataCalcGroupData.getData(groupConfigJson)
+ Assertions.assertTrue(data != null)
+
+ val (sources, transforms, sinks) = DataCalcExecution.getPlugins(data)
+ Assertions.assertTrue(sources != null)
+ Assertions.assertTrue(transforms != null)
+ Assertions.assertTrue(sinks != null)
+ }
+
+ @Test
+ def testGetRealCode: Unit = {
+
+ var preCode = arrayConfigJson
+
+ val hooks = SparkPreExecutionHook.getSparkPreExecutionHooks();
+ hooks.foreach(hook => {
+ preCode = hook.callPreExecutionHook(null, preCode)
+ })
+ Assertions.assertTrue(preCode != null)
+ }
+
+ val arrayConfigJson =
+ """
+ |{
+ | "plugins": [
+ | {
+ | "name": "jdbc",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "test1",
+ | "url":
"jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
+ | "driver": "com.mysql.jdbc.Driver",
+ | "user": "root",
+ | "password": "123456",
+ | "query": "select * from
dip_linkis.linkis_ps_udf_baseinfo",
+ | "options": {
+ | }
+ | }
+ | },
+ | {
+ | "name": "sql",
+ | "type": "transformation",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "sql": "select * from test1"
+ | }
+ | },
+ | {
+ | "name": "file",
+ | "type": "sink",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "path": "hdfs:///tmp/test_new",
+ | "partitionBy": ["create_user"],
+ | "saveMode": "overwrite",
+ | "serializer": "csv"
+ | }
+ | },
+ | {
+ | "name": "file",
+ | "type": "sink",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "path": "hdfs:///tmp/test_new_no_partition",
+ | "saveMode": "overwrite",
+ | "serializer": "csv"
+ | }
+ | },
+ | {
+ | "name": "file",
+ | "type": "sink",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "path": "hdfs:///tmp/test_new_partition",
+ | "partitionBy": ["create_user"],
+ | "saveMode": "overwrite",
+ | "serializer": "csv"
+ | }
+ | },
+ | {
+ | "name": "file",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "test2",
+ | "path": "hdfs:///tmp/test_new_no_partition",
+ | "serializer": "csv",
+ | "columnNames": ["id", "create_user", "udf_name",
"udf_type", "tree_id", "create_time", "update_time", "sys", "cluster_name",
"is_expire", "is_shared"]
+ | }
+ | },
+ | {
+ | "name": "jdbc",
+ | "type": "sink",
+ | "config": {
+ | "sourceTable": "test2",
+ | "url":
"jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
+ | "driver": "com.mysql.jdbc.Driver",
+ | "user": "root",
+ | "password": "123456",
+ | "targetTable": "linkis_ps_udf_baseinfo2",
+ | "options": {
+ | }
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+ val groupConfigJson =
+ """
+ |{
+ | "sources": [
+ | {
+ | "name": "jdbc",
+ | "type": "",
+ | "config": {
+ | "resultTable": "test1",
+ | "url":
"jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
+ | "driver": "com.mysql.jdbc.Driver",
+ | "user": "root",
+ | "password": "123456",
+ | "query": "select * from
dip_linkis.linkis_ps_udf_baseinfo",
+ | "options": {
+ | }
+ | }
+ | },
+ | {
+ | "name": "file",
+ | "type": "source",
+ | "config": {
+ | "resultTable": "test2",
+ | "path": "hdfs:///tmp/test_new_no_partition",
+ | "serializer": "csv",
+ | "columnNames": ["id", "create_user", "udf_name",
"udf_type", "tree_id", "create_time", "update_time", "sys", "cluster_name",
"is_expire", "is_shared"]
+ | }
+ | }
+ | ],
+ | "transformations": [
+ | {
+ | "name": "sql",
+ | "type": "transformation",
+ | "config": {
+ | "resultTable": "T1654611700631",
+ | "sql": "select * from test1"
+ | }
+ | }
+ | ],
+ | "sinks": [
+ | {
+ | "name": "file",
+ | "config": {
+ | "sourceTable": "T1654611700631",
+ | "path": "hdfs:///tmp/test_new",
+ | "partitionBy": ["create_user"],
+ | "saveMode": "overwrite",
+ | "serializer": "csv"
+ | }
+ | },
+ | {
+ | "name": "jdbc",
+ | "type": "sink",
+ | "config": {
+ | "sourceTable": "test2",
+ | "url":
"jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
+ | "driver": "com.mysql.jdbc.Driver",
+ | "user": "root",
+ | "password": "123456",
+ | "targetTable": "linkis_ps_udf_baseinfo2",
+ | "options": {
+ | }
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]