This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new 3ef1758d8 Upgrade seatunnel to 2.3.1 (#4673)
3ef1758d8 is described below
commit 3ef1758d875f162e2edc7117365e0735bd2d287f
Author: xiutao <[email protected]>
AuthorDate: Tue Jun 20 17:25:43 2023 +0800
Upgrade seatunnel to 2.3.1 (#4673)
---
.github/workflows/publish-docker.yaml | 1 +
.../manager/label/entity/engine/RunType.scala | 3 +-
.../release-docs/licenses/LICENSE-hazelcast.txt | 202 ++++++++++++++++++
linkis-engineconn-plugins/seatunnel/pom.xml | 21 +-
...inkClient.java => LinkSeatunnelZetaClient.java} | 19 +-
...ient.java => LinkisSeatunnelFlinkV2Client.java} | 17 +-
...ient.java => LinkisSeatunnelSparkV2Client.java} | 14 +-
.../errorcode/SeatunnelErrorCodeSummary.java | 2 +-
.../org/apache/seatunnel/common/config/Common.java | 130 +++++++----
.../seatunnel/core/base/config/ConfigBuilder.java | 26 +--
.../seatunnel/core/base/config/PluginFactory.java | 237 ---------------------
.../apache/seatunnel/core/flink/FlinkStarter.java | 74 -------
.../seatunnel/core/flink/FlinkV2Starter.java | 99 +++++++++
.../{SparkStarter.java => SparkV2Starter.java} | 138 ++++++++----
.../apache/seatunnel/core/sql/FlinkSqlStarter.java | 69 ------
.../apache/seatunnel/core/zeta/ZetaStarter.java | 90 ++++++++
.../config/SeatunnelFlinkEnvConfiguration.scala | 9 +-
...n.scala => SeatunnelZetaEnvConfiguration.scala} | 26 +--
.../executor/SeatunnelFlinkOnceCodeExecutor.scala | 21 +-
.../executor/SeatunnelSparkOnceCodeExecutor.scala | 4 +-
...r.scala => SeatunnelZetaOnceCodeExecutor.scala} | 57 ++---
.../factory/SeatunnelEngineConnFactory.scala | 11 +-
...ry.scala => SeatunnelZetaExecutorFactory.scala} | 9 +-
.../seatunnel/util/SeatunnelUtils.scala | 15 +-
.../linkis-metadata-query/service/jdbc/pom.xml | 2 +-
tool/dependencies/known-dependencies.txt | 21 +-
26 files changed, 734 insertions(+), 583 deletions(-)
diff --git a/.github/workflows/publish-docker.yaml
b/.github/workflows/publish-docker.yaml
index fdd998b1e..f9e2a5f7b 100644
--- a/.github/workflows/publish-docker.yaml
+++ b/.github/workflows/publish-docker.yaml
@@ -20,6 +20,7 @@ on:
push:
branches:
- dev-1.4.0
+
env:
MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false
-Dmaven.wagon.http.retryHandler.class=standard
-Dmaven.wagon.http.retryHandler.count=3
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
index 0dee150d2..a24445269 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
@@ -43,7 +43,8 @@ object RunType extends Enumeration {
val TRINO_SQL = Value("tsql")
- val SEATUNNEL_FLINK_SQL = Value("sfsql")
+
+ val SEATUNNEL_ZETA = Value("szeta")
val SEATUNNEL_FLINK = Value("sflink")
val SEATUNNEL_SPARK = Value("sspark")
diff --git a/linkis-dist/release-docs/licenses/LICENSE-hazelcast.txt
b/linkis-dist/release-docs/licenses/LICENSE-hazelcast.txt
new file mode 100644
index 000000000..d64569567
--- /dev/null
+++ b/linkis-dist/release-docs/licenses/LICENSE-hazelcast.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/linkis-engineconn-plugins/seatunnel/pom.xml
b/linkis-engineconn-plugins/seatunnel/pom.xml
index ed843f9f2..a04c71f29 100644
--- a/linkis-engineconn-plugins/seatunnel/pom.xml
+++ b/linkis-engineconn-plugins/seatunnel/pom.xml
@@ -26,6 +26,10 @@
<artifactId>linkis-engineplugin-seatunnel</artifactId>
+ <properties>
+ <seatunnel.version>2.3.1</seatunnel.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.linkis</groupId>
@@ -79,7 +83,17 @@
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-core-spark</artifactId>
+ <artifactId>seatunnel-engine-core</artifactId>
+ <version>${seatunnel.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast</artifactId>
+ <version>5.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-starter</artifactId>
<version>${seatunnel.version}</version>
<exclusions>
<exclusion>
@@ -90,7 +104,7 @@
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-core-flink-sql</artifactId>
+ <artifactId>seatunnel-spark-2-starter</artifactId>
<version>${seatunnel.version}</version>
<exclusions>
<exclusion>
@@ -101,7 +115,7 @@
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-core-flink</artifactId>
+ <artifactId>seatunnel-flink-13-starter</artifactId>
<version>${seatunnel.version}</version>
<exclusions>
<exclusion>
@@ -110,7 +124,6 @@
</exclusion>
</exclusions>
</dependency>
-
</dependencies>
<build>
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkClient.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkSeatunnelZetaClient.java
similarity index 77%
rename from
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkClient.java
rename to
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkSeatunnelZetaClient.java
index 54c2ec27f..be9fdb048 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkClient.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkSeatunnelZetaClient.java
@@ -19,7 +19,7 @@ package org.apache.linkis.engineconnplugin.seatunnel.client;
import org.apache.linkis.engineconn.computation.executor.utlis.JarLoader;
-import org.apache.seatunnel.core.flink.SeatunnelFlink;
+import org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -29,8 +29,8 @@ import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class LinkisSeatunnelFlinkClient {
- private static Logger logger =
LoggerFactory.getLogger(LinkisSeatunnelFlinkClient.class);
+public class LinkSeatunnelZetaClient {
+ private static Logger logger =
LoggerFactory.getLogger(LinkSeatunnelZetaClient.class);
private static Class<?> seatunnelEngineClass;
private static JarLoader jarLoader;
@@ -39,16 +39,23 @@ public class LinkisSeatunnelFlinkClient {
jarLoader =
new JarLoader(
new String[] {
- LinkisSeatunnelFlinkClient.class
+ LinkSeatunnelZetaClient.class
.getProtectionDomain()
.getCodeSource()
.getLocation()
.getPath()
});
jarLoader.loadClass("org.apache.seatunnel.common.config.Common", false);
- seatunnelEngineClass =
jarLoader.loadClass("org.apache.seatunnel.core.flink.FlinkStarter");
+
jarLoader.loadClass("org.apache.seatunnel.core.base.config.ConfigBuilder",
false);
+ //
jarLoader.loadClass("org.apache.seatunnel.core.base.config.PluginFactory",
false);
+ seatunnelEngineClass =
jarLoader.loadClass("org.apache.seatunnel.core.zeta.ZetaStarter");
jarLoader.addJarURL(
-
SeatunnelFlink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+ SeaTunnelClient.class
+ .getProtectionDomain()
+ .getCodeSource()
+ .getLocation()
+ .toURI()
+ .getPath());
Thread.currentThread().setContextClassLoader(jarLoader);
Method method = seatunnelEngineClass.getDeclaredMethod("main",
String[].class);
return (Integer) method.invoke(null, (Object) args);
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkSQLClient.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkV2Client.java
similarity index 84%
rename from
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkSQLClient.java
rename to
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkV2Client.java
index a9526b104..34300d584 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkSQLClient.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelFlinkV2Client.java
@@ -19,7 +19,7 @@ package org.apache.linkis.engineconnplugin.seatunnel.client;
import org.apache.linkis.engineconn.computation.executor.utlis.JarLoader;
-import org.apache.seatunnel.core.sql.SeatunnelSql;
+import org.apache.seatunnel.core.starter.flink.SeaTunnelFlink;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -29,8 +29,8 @@ import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class LinkisSeatunnelFlinkSQLClient {
- private static Logger logger =
LoggerFactory.getLogger(LinkisSeatunnelFlinkSQLClient.class);
+public class LinkisSeatunnelFlinkV2Client {
+ private static Logger logger =
LoggerFactory.getLogger(LinkisSeatunnelFlinkV2Client.class);
private static Class<?> seatunnelEngineClass;
private static JarLoader jarLoader;
@@ -39,16 +39,21 @@ public class LinkisSeatunnelFlinkSQLClient {
jarLoader =
new JarLoader(
new String[] {
- LinkisSeatunnelFlinkSQLClient.class
+ LinkisSeatunnelFlinkV2Client.class
.getProtectionDomain()
.getCodeSource()
.getLocation()
.getPath()
});
jarLoader.loadClass("org.apache.seatunnel.common.config.Common", false);
- seatunnelEngineClass =
jarLoader.loadClass("org.apache.seatunnel.core.sql.FlinkSqlStarter");
+ seatunnelEngineClass =
jarLoader.loadClass("org.apache.seatunnel.core.flink.FlinkV2Starter");
jarLoader.addJarURL(
-
SeatunnelSql.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath());
+ SeaTunnelFlink.class
+ .getProtectionDomain()
+ .getCodeSource()
+ .getLocation()
+ .toURI()
+ .getPath());
Thread.currentThread().setContextClassLoader(jarLoader);
Method method = seatunnelEngineClass.getDeclaredMethod("main",
String[].class);
return (Integer) method.invoke(null, (Object) args);
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelSparkClient.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelSparkV2Client.java
similarity index 87%
rename from
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelSparkClient.java
rename to
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelSparkV2Client.java
index 7beacd4bd..8ec8661d4 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelSparkClient.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/LinkisSeatunnelSparkV2Client.java
@@ -19,7 +19,7 @@ package org.apache.linkis.engineconnplugin.seatunnel.client;
import org.apache.linkis.engineconn.computation.executor.utlis.JarLoader;
-import org.apache.seatunnel.core.spark.SeatunnelSpark;
+import org.apache.seatunnel.core.starter.spark.SeaTunnelSpark;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -29,8 +29,8 @@ import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class LinkisSeatunnelSparkClient {
- private static Logger logger =
LoggerFactory.getLogger(LinkisSeatunnelSparkClient.class);
+public class LinkisSeatunnelSparkV2Client {
+ private static Logger logger =
LoggerFactory.getLogger(LinkisSeatunnelSparkV2Client.class);
private static Class<?> seatunnelEngineClass;
private static JarLoader jarLoader;
@@ -39,7 +39,7 @@ public class LinkisSeatunnelSparkClient {
jarLoader =
new JarLoader(
new String[] {
- LinkisSeatunnelSparkClient.class
+ LinkisSeatunnelSparkV2Client.class
.getProtectionDomain()
.getCodeSource()
.getLocation()
@@ -47,10 +47,10 @@ public class LinkisSeatunnelSparkClient {
});
jarLoader.loadClass("org.apache.seatunnel.common.config.Common", false);
jarLoader.loadClass("org.apache.seatunnel.core.base.config.ConfigBuilder",
false);
-
jarLoader.loadClass("org.apache.seatunnel.core.base.config.PluginFactory",
false);
- seatunnelEngineClass =
jarLoader.loadClass("org.apache.seatunnel.core.spark.SparkStarter");
+ //
jarLoader.loadClass("org.apache.seatunnel.core.base.config.PluginFactory",
false);
+ seatunnelEngineClass =
jarLoader.loadClass("org.apache.seatunnel.core.spark.SparkV2Starter");
jarLoader.addJarURL(
- SeatunnelSpark.class
+ SeaTunnelSpark.class
.getProtectionDomain()
.getCodeSource()
.getLocation()
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/errorcode/SeatunnelErrorCodeSummary.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/errorcode/SeatunnelErrorCodeSummary.java
index 059914997..66275c178 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/errorcode/SeatunnelErrorCodeSummary.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/linkis/engineconnplugin/seatunnel/client/errorcode/SeatunnelErrorCodeSummary.java
@@ -26,7 +26,7 @@ public enum SeatunnelErrorCodeSummary implements
LinkisErrorCode {
17023, "Not support method for requestExpectedResource.(不支持
requestExpectedResource 的方法)"),
EXEC_SPARK_CODE_ERROR(17023, "Exec Seatunnel-Spark Code Error(执行
Seatunnel-Spark 代码错误)"),
EXEC_FLINK_CODE_ERROR(17023, "Exec Seatunnel-Flink Code Error(执行
Seatunnel-Flink 代码错误)"),
- EXEC_FLINKSQL_CODE_ERROR(17023, "Exec Seatunnel-FlinkSQL Code Error(执行
Seatunnel-FlinkSQL 代码错误)");
+ EXEC_SEATUNNEL_CODE_ERROR(17023, "Exec Seatunnel-Zeta Code Error(执行
Seatunnel-Zeta 代码错误)");
/** (errorCode)错误码 */
private final int errorCode;
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/common/config/Common.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/common/config/Common.java
index 739af25de..04d10e412 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/common/config/Common.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -17,64 +17,90 @@
package org.apache.seatunnel.common.config;
+import
org.apache.linkis.engineconnplugin.seatunnel.client.exception.JobExecutionException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.nio.file.FileVisitOption.FOLLOW_LINKS;
public class Common {
+ public static final Log logger = LogFactory.getLog(Common.class.getName());
private Common() {
throw new IllegalStateException("Utility class");
}
- private static final List<String> ALLOWED_MODES =
-
Arrays.stream(DeployMode.values()).map(DeployMode::getName).collect(Collectors.toList());
+ public static final int COLLECTION_SIZE = 16;
+
+ private static final int APP_LIB_DIR_DEPTH = 2;
+
+ private static final int PLUGIN_LIB_DIR_DEPTH = 3;
+
+ private static DeployMode MODE;
- private static Optional<String> MODE = Optional.empty();
+ private static String SEATUNNEL_HOME;
- public static boolean isModeAllowed(String mode) {
- return ALLOWED_MODES.contains(mode.toLowerCase());
+ private static boolean STARTER = false;
+
+ public static void setDeployMode(DeployMode mode) {
+ MODE = mode;
}
- /** Set mode. return false in case of failure */
- public static Boolean setDeployMode(String m) {
- if (isModeAllowed(m)) {
- MODE = Optional.of(m);
- return true;
- } else {
- return false;
- }
+ public static void setStarter(boolean inStarter) {
+ STARTER = inStarter;
}
- public static Optional<String> getDeployMode() {
+ public static DeployMode getDeployMode() {
return MODE;
}
- /**
- * Root dir varies between different spark master and deploy mode, it also
varies between relative
- * and absolute path. When running seatunnel in --master local, you can put
plugins related files
- * in $project_dir/plugins, then these files will be automatically copied to
- * $project_dir/seatunnel-core/target and token in effect if you start
seatunnel in IDE tools such
- * as IDEA. When running seatunnel in --master yarn or --master mesos, you
can put plugins related
- * files in plugins dir.
- */
+ public static Path appStarterDir() {
+ return appRootDir().resolve("starter");
+ }
+
+ private static String getSeaTunnelHome() {
+
+ if (StringUtils.isNotEmpty(SEATUNNEL_HOME)) {
+ return SEATUNNEL_HOME;
+ }
+ String seatunnelHome = System.getProperty("SEATUNNEL_HOME");
+ if (StringUtils.isBlank(seatunnelHome)) {
+ seatunnelHome = System.getenv("SEATUNNEL_HOME");
+ }
+ if (StringUtils.isBlank(seatunnelHome)) {
+ seatunnelHome = appRootDir().toString();
+ }
+ SEATUNNEL_HOME = seatunnelHome;
+ return SEATUNNEL_HOME;
+ }
+
public static Path appRootDir() {
- if (MODE.equals(Optional.of(DeployMode.CLIENT.getName()))) {
+ logger.info("Mode:" + MODE + ",Starter:" + STARTER);
+ if (DeployMode.CLIENT == MODE || DeployMode.RUN == MODE || STARTER) {
try {
String path = System.getProperty("SEATUNNEL_HOME") + "/seatunnel";
path = new File(path).getPath();
+ logger.info("appRootDir:" + path);
return Paths.get(path);
} catch (Exception e) {
throw new RuntimeException(e);
}
- } else if (MODE.equals(Optional.of(DeployMode.CLUSTER.getName()))) {
+ } else if (DeployMode.CLUSTER == MODE || DeployMode.RUN_APPLICATION ==
MODE) {
return Paths.get("");
} else {
- throw new IllegalStateException("MODE not support : " +
MODE.orElse("null"));
+ throw new IllegalStateException("deploy mode not support : " + MODE);
}
}
@@ -97,7 +123,6 @@ public class Common {
return Paths.get(appRootDir().toString(), "connectors",
engine.toLowerCase());
}
- /** Plugin Connector Dir */
public static Path connectorDir() {
return Paths.get(appRootDir().toString(), "connectors");
}
@@ -106,18 +131,47 @@ public class Common {
return appRootDir().resolve("plugins.tar.gz");
}
- /** Get specific plugin dir */
- public static Path pluginDir(String pluginName) {
- return Paths.get(pluginRootDir().toString(), pluginName);
+ public static List<Path> getPluginsJarDependencies() {
+ Path pluginRootDir = Common.pluginRootDir();
+ if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
+ return Collections.emptyList();
+ }
+ try (Stream<Path> stream = Files.walk(pluginRootDir, PLUGIN_LIB_DIR_DEPTH,
FOLLOW_LINKS)) {
+ return stream
+ .filter(it -> pluginRootDir.relativize(it).getNameCount() ==
PLUGIN_LIB_DIR_DEPTH)
+ .filter(it -> it.getParent().endsWith("lib"))
+ .filter(it -> it.getFileName().toString().endsWith(".jar"))
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new JobExecutionException(e.getMessage(), e);
+ }
+ }
+
+ public static Path libDir() {
+ return appRootDir().resolve("lib");
}
- /** Get files dir of specific plugin */
- public static Path pluginFilesDir(String pluginName) {
- return Paths.get(pluginDir(pluginName).toString(), "files");
+ public static List<Path> getLibJars() {
+ Path libRootDir = Common.libDir();
+ if (!Files.exists(libRootDir) || !Files.isDirectory(libRootDir)) {
+ return Collections.emptyList();
+ }
+ try (Stream<Path> stream = Files.walk(libRootDir, APP_LIB_DIR_DEPTH,
FOLLOW_LINKS)) {
+ return stream
+ .filter(it -> !it.toFile().isDirectory())
+ .filter(it -> it.getFileName().toString().endsWith(".jar"))
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new JobExecutionException(e.getMessage(), e);
+ }
}
- /** Get lib dir of specific plugin */
- public static Path pluginLibDir(String pluginName) {
- return Paths.get(pluginDir(pluginName).toString(), "lib");
+ public static Set<Path> getThirdPartyJars(String paths) {
+ logger.info("getThirdPartyJars path:" + paths);
+ return Arrays.stream(paths.split(";"))
+ .filter(s -> !"".equals(s))
+ .filter(it -> it.endsWith(".jar"))
+ .map(path -> Paths.get(URI.create(path)))
+ .collect(Collectors.toSet());
}
}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
index 1aa357966..7cd367b23 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.core.base.config;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -28,22 +27,18 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import java.nio.file.Path;
-/**
- * Used to build the {@link Config} from file.
- *
- * @param <ENVIRONMENT> environment type.
- */
-public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
+public class ConfigBuilder {
public static final Log LOGGER =
LogFactory.getLog(ConfigBuilder.class.getName());
+
private static final String PLUGIN_NAME_KEY = "plugin_name";
+
private final Path configFile;
- private final EngineType engine;
+
private final Config config;
- public ConfigBuilder(Path configFile, EngineType engine) {
+ public ConfigBuilder(Path configFile) {
this.configFile = configFile;
- this.engine = engine;
this.config = load();
}
@@ -72,15 +67,4 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
public Config getConfig() {
return config;
}
-
- /** check if config is valid. */
- public void checkConfig() {
- // check environment
- ENVIRONMENT environment = new EnvironmentFactory<ENVIRONMENT>(config,
engine).getEnvironment();
- // check plugins
- PluginFactory<ENVIRONMENT> pluginFactory = new PluginFactory<>(config,
engine);
- pluginFactory.createPlugins(PluginType.SOURCE);
- pluginFactory.createPlugins(PluginType.TRANSFORM);
- pluginFactory.createPlugins(PluginType.SINK);
- }
}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
deleted file mode 100644
index 2070106e8..000000000
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/base/config/PluginFactory.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.base.config;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
-import org.apache.seatunnel.apis.base.plugin.Plugin;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.flink.BaseFlinkSink;
-import org.apache.seatunnel.flink.BaseFlinkSource;
-import org.apache.seatunnel.flink.BaseFlinkTransform;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
-import org.apache.seatunnel.spark.BaseSparkSink;
-import org.apache.seatunnel.spark.BaseSparkSource;
-import org.apache.seatunnel.spark.BaseSparkTransform;
-
-import javax.annotation.Nonnull;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.*;
-import java.util.stream.Collectors;
-
-/**
- * Used to load the plugins.
- *
- * @param <ENVIRONMENT> environment
- */
-public class PluginFactory<ENVIRONMENT extends RuntimeEnv> {
-
- public static final Log LOGGER =
LogFactory.getLog(PluginFactory.class.getName());
- private final Config config;
- private final EngineType engineType;
- private static final Map<EngineType, Map<PluginType, Class<?>>>
PLUGIN_BASE_CLASS_MAP;
-
- private static final String PLUGIN_NAME_KEY = "plugin_name";
- private static final String PLUGIN_MAPPING_FILE =
"plugin-mapping.properties";
-
- private final List<URL> pluginJarPaths;
- private final ClassLoader defaultClassLoader;
-
- static {
- PLUGIN_BASE_CLASS_MAP = new HashMap<>();
- Map<PluginType, Class<?>> sparkBaseClassMap = new HashMap<>();
- sparkBaseClassMap.put(PluginType.SOURCE, BaseSparkSource.class);
- sparkBaseClassMap.put(PluginType.TRANSFORM, BaseSparkTransform.class);
- sparkBaseClassMap.put(PluginType.SINK, BaseSparkSink.class);
- PLUGIN_BASE_CLASS_MAP.put(EngineType.SPARK, sparkBaseClassMap);
-
- Map<PluginType, Class<?>> flinkBaseClassMap = new HashMap<>();
- flinkBaseClassMap.put(PluginType.SOURCE, BaseFlinkSource.class);
- flinkBaseClassMap.put(PluginType.TRANSFORM, BaseFlinkTransform.class);
- flinkBaseClassMap.put(PluginType.SINK, BaseFlinkSink.class);
- PLUGIN_BASE_CLASS_MAP.put(EngineType.FLINK, flinkBaseClassMap);
- }
-
- public PluginFactory(Config config, EngineType engineType) {
- this.config = config;
- this.engineType = engineType;
- this.pluginJarPaths = searchPluginJar();
- this.defaultClassLoader = initClassLoaderWithPaths(this.pluginJarPaths);
- }
-
- private ClassLoader initClassLoaderWithPaths(List<URL> pluginJarPaths) {
- return new URLClassLoader(
- pluginJarPaths.toArray(new URL[0]),
Thread.currentThread().getContextClassLoader());
- }
-
- @Nonnull
- private List<URL> searchPluginJar() {
-
- File pluginDir =
Common.connectorJarDir(this.engineType.getEngine()).toFile();
- if (!pluginDir.exists() || pluginDir.listFiles() == null) {
- return new ArrayList<>();
- }
- Config pluginMapping =
- ConfigFactory.parseFile(new File(getPluginMappingPath()))
- .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
- .resolveWith(
- ConfigFactory.systemProperties(),
- ConfigResolveOptions.defaults().setAllowUnresolved(true));
- File[] plugins =
- Arrays.stream(pluginDir.listFiles())
- .filter(f -> f.getName().endsWith(".jar"))
- .toArray(File[]::new);
-
- return Arrays.stream(PluginType.values())
- .filter(type -> !PluginType.TRANSFORM.equals(type))
- .flatMap(
- type -> {
- List<URL> pluginList = new ArrayList<>();
- List<? extends Config> configList =
config.getConfigList(type.getType());
- configList.forEach(
- pluginConfig -> {
- Optional<String> mappingValue =
- getPluginMappingValue(
- pluginMapping, type,
pluginConfig.getString(PLUGIN_NAME_KEY));
- if (mappingValue.isPresent()) {
- try {
- for (File plugin : plugins) {
- if (plugin.getName().startsWith(mappingValue.get()))
{
- pluginList.add(plugin.toURI().toURL());
- break;
- }
- }
- } catch (MalformedURLException e) {
- LOGGER.warn("can get plugin url", e);
- }
- } else {
- throw new IllegalArgumentException(
- String.format(
- "can't find connector %s in "
- + "%s. If you add connector to connectors
dictionary, please modify this "
- + "file.",
- getPluginMappingKey(type,
pluginConfig.getString(PLUGIN_NAME_KEY)),
- getPluginMappingPath()));
- }
- });
- return pluginList.stream();
- })
- .collect(Collectors.toList());
- }
-
- public List<URL> getPluginJarPaths() {
- return this.pluginJarPaths;
- }
-
- private String getPluginMappingPath() {
- return Common.connectorDir() + "/" + PLUGIN_MAPPING_FILE;
- }
-
- private String getPluginMappingKey(PluginType type, String pluginName) {
- return this.engineType.getEngine() + "." + type.getType() + "." +
pluginName;
- }
-
- Optional<String> getPluginMappingValue(Config pluginMapping, PluginType
type, String pluginName) {
-
- return
pluginMapping.getConfig(this.engineType.getEngine()).getConfig(type.getType()).entrySet()
- .stream()
- .filter(entry -> entry.getKey().equalsIgnoreCase(pluginName))
- .map(entry -> entry.getValue().unwrapped().toString())
- .findAny();
- }
-
- /**
- * Create the plugins by plugin type.
- *
- * @param type plugin type
- * @param <T> plugin
- * @return plugin list.
- */
- @SuppressWarnings("unchecked")
- public <T extends Plugin<ENVIRONMENT>> List<T> createPlugins(PluginType
type) {
- Objects.requireNonNull(type, "PluginType can not be null when create
plugins!");
- List<T> basePluginList = new ArrayList<>();
- List<? extends Config> configList = config.getConfigList(type.getType());
- configList.forEach(
- plugin -> {
- try {
- T t =
- (T)
- createPluginInstanceIgnoreCase(
- type, plugin.getString(PLUGIN_NAME_KEY),
this.defaultClassLoader);
- t.setConfig(plugin);
- basePluginList.add(t);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
-
- return basePluginList;
- }
-
- /** create plugin class instance, ignore case. */
- @SuppressWarnings("unchecked")
- private Plugin<?> createPluginInstanceIgnoreCase(
- PluginType pluginType, String pluginName, ClassLoader classLoader)
throws Exception {
- Class<Plugin<?>> pluginBaseClass =
- (Class<Plugin<?>>) getPluginBaseClass(engineType, pluginType);
-
- if (pluginName.split("\\.").length != 1) {
- // canonical class name
- Class<Plugin<?>> pluginClass = (Class<Plugin<?>>)
Class.forName(pluginName);
- if (pluginClass.isAssignableFrom(pluginBaseClass)) {
- throw new IllegalArgumentException(
- "plugin: " + pluginName + " is not extends from " +
pluginBaseClass);
- }
- return pluginClass.getDeclaredConstructor().newInstance();
- }
- ServiceLoader<Plugin<?>> plugins = ServiceLoader.load(pluginBaseClass,
classLoader);
- for (Iterator<Plugin<?>> it = plugins.iterator(); it.hasNext(); ) {
- try {
- Plugin<?> plugin = it.next();
- if (StringUtils.equalsIgnoreCase(plugin.getPluginName(), pluginName)) {
- return plugin;
- }
- } catch (ServiceConfigurationError e) {
- // Iterator.next() may throw ServiceConfigurationError,
- // but maybe caused by a not used plugin in this job
- LOGGER.warn("Error when load plugin:" + pluginName, e);
- }
- }
- throw new ClassNotFoundException("Plugin class not found by name :[" +
pluginName + "]");
- }
-
- private Class<?> getPluginBaseClass(EngineType engineType, PluginType
pluginType) {
- if (!PLUGIN_BASE_CLASS_MAP.containsKey(engineType)) {
- throw new IllegalStateException("PluginType not support : [" +
pluginType + "]");
- }
- Map<PluginType, Class<?>> pluginTypeClassMap =
PLUGIN_BASE_CLASS_MAP.get(engineType);
- if (!pluginTypeClassMap.containsKey(pluginType)) {
- throw new IllegalStateException(pluginType + " is not supported in
engine " + engineType);
- }
- return pluginTypeClassMap.get(pluginType);
- }
-}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
deleted file mode 100644
index ffa7af7bf..000000000
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkStarter.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.flink;
-
-import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.core.base.Starter;
-import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
-
-import java.util.List;
-
-/**
- * The SeaTunnel flink starter. This class is responsible for generate the
final flink job execute
- * command.
- */
-public class FlinkStarter implements Starter {
- public static final Log logger =
LogFactory.getLog(FlinkStarter.class.getName());
- private static final String APP_NAME = SeatunnelFlink.class.getName();
- private static final String APP_JAR_NAME = "seatunnel-core-flink.jar";
-
- /** SeaTunnel parameters, used by SeaTunnel application. e.g. `-c
config.conf` */
- private final FlinkCommandArgs flinkCommandArgs;
-
- /** SeaTunnel flink job jar. */
- private final String appJar;
-
- FlinkStarter(String[] args) {
- this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args,
FlinkJobType.JAR);
- // set the deployment mode, used to get the job jar path.
- Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
- this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
- }
-
- @SuppressWarnings("checkstyle:RegexpSingleline")
- public static int main(String[] args) {
- logger.info("FlinkStarter start");
- int exitCode = 0;
- try {
- FlinkStarter flinkStarter = new FlinkStarter(args);
- String commandVal = String.join(" ", flinkStarter.buildCommands());
- logger.info("commandVal:" + commandVal);
- exitCode = SeatunnelUtils.executeLine(commandVal);
- } catch (Exception e) {
- exitCode = 1;
- logger.error("\n\n该任务最可能的错误原因是:\n" + e);
- }
- return exitCode;
- }
-
- @Override
- public List<String> buildCommands() {
- return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, APP_NAME,
appJar);
- }
-}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkV2Starter.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkV2Starter.java
new file mode 100644
index 000000000..dde6f6c3b
--- /dev/null
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/flink/FlinkV2Starter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink;
+
+import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.core.starter.Starter;
+import org.apache.seatunnel.core.starter.enums.EngineType;
+import org.apache.seatunnel.core.starter.flink.SeaTunnelFlink;
+import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+public class FlinkV2Starter implements Starter {
+ public static final Log logger =
LogFactory.getLog(FlinkV2Starter.class.getName());
+ private static final String APP_NAME = SeaTunnelFlink.class.getName();
+ private static final String APP_JAR_NAME =
EngineType.FLINK13.getStarterJarName();
+ private static final String SHELL_NAME =
EngineType.FLINK13.getStarterShellName();
+
+ /** SeaTunnel parameters, used by SeaTunnel application. e.g. `-c
config.conf` */
+ private final FlinkCommandArgs flinkCommandArgs;
+
+ /** SeaTunnel flink job jar. */
+ private final String appJar;
+
+ FlinkV2Starter(String[] args) {
+ this.flinkCommandArgs = CommandLineUtils.parse(args, new
FlinkCommandArgs(), SHELL_NAME, true);
+ logger.info("this.flinkCommandArgs = " + this.flinkCommandArgs);
+ // set the deployment mode, used to get the job jar path.
+ Common.setDeployMode(flinkCommandArgs.getDeployMode());
+ Common.setStarter(true);
+ this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
+ }
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ public static int main(String[] args) {
+ logger.info("FlinkStarter start:" + Arrays.toString(args));
+ int exitCode = 0;
+ try {
+ FlinkV2Starter flinkStarter = new FlinkV2Starter(args);
+ String commandVal = String.join(" ", flinkStarter.buildCommands());
+ logger.info("FlinkV2Starter commandVal:" + commandVal);
+ exitCode = SeatunnelUtils.executeLine(commandVal);
+ } catch (Exception e) {
+ exitCode = 1;
+ logger.error("\n\nFlinkV2Starter error:\n" + e);
+ }
+ return exitCode;
+ }
+
+ @Override
+ public List<String> buildCommands() {
+ List<String> command = new ArrayList<>();
+ command.add("${FLINK_HOME}/bin/flink");
+ // set deploy mode, run or run-application
+ command.add(flinkCommandArgs.getDeployMode().getDeployMode());
+ // set submitted target master
+ if (flinkCommandArgs.getMasterType() != null) {
+ command.add("--target");
+ command.add(flinkCommandArgs.getMasterType().getMaster());
+ }
+ logger.info("FlinkV2Starter OriginalParameters:" +
flinkCommandArgs.getOriginalParameters());
+ command.add("-c");
+ command.add(APP_NAME);
+ command.add(appJar);
+ command.add("--config");
+ command.add(flinkCommandArgs.getConfigFile());
+ command.add("--name");
+ command.add(flinkCommandArgs.getJobName());
+ // set System properties
+ flinkCommandArgs.getVariables().stream()
+ .filter(Objects::nonNull)
+ .map(String::trim)
+ .forEach(variable -> command.add("-D" + variable));
+ return command;
+ }
+}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkV2Starter.java
similarity index 68%
rename from
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
rename to
linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkV2Starter.java
index 60a0204c7..5f905731b 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/spark/SparkV2Starter.java
@@ -22,26 +22,34 @@ import
org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.base.Starter;
-import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.config.ConfigParser;
-import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.PluginFactory;
-import org.apache.seatunnel.core.base.utils.CompressionUtils;
-import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.Starter;
+import org.apache.seatunnel.core.starter.enums.EngineType;
+import org.apache.seatunnel.core.starter.enums.PluginType;
+import org.apache.seatunnel.core.starter.spark.SeaTunnelSpark;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
+import org.apache.seatunnel.core.starter.utils.CompressionUtils;
+import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -50,8 +58,8 @@ import com.beust.jcommander.UnixStyleUsageFormatter;
import static java.nio.file.FileVisitOption.FOLLOW_LINKS;
-public class SparkStarter implements Starter {
- public static final Log logger =
LogFactory.getLog(SparkStarter.class.getName());
+public class SparkV2Starter implements Starter {
+ public static final Log logger =
LogFactory.getLog(SparkV2Starter.class.getName());
private static final int USAGE_EXIT_CODE = 234;
@@ -75,7 +83,7 @@ public class SparkStarter implements Starter {
/** spark configuration properties */
protected Map<String, String> sparkConf;
- private SparkStarter(String[] args, SparkCommandArgs commandArgs) {
+ private SparkV2Starter(String[] args, SparkCommandArgs commandArgs) {
this.args = args;
this.commandArgs = commandArgs;
}
@@ -85,14 +93,14 @@ public class SparkStarter implements Starter {
int exitCode = 0;
logger.info("starter start");
try {
- SparkStarter starter = getInstance(args);
+ SparkV2Starter starter = getInstance(args);
List<String> command = starter.buildCommands();
String commandVal = String.join(" ", command);
- logger.info("commandVal:" + commandVal);
+ logger.info("sparkV2starter commandVal:" + commandVal);
exitCode = SeatunnelUtils.executeLine(commandVal);
} catch (Exception e) {
exitCode = 1;
- logger.error("\n\n该任务最可能的错误原因是:\n" + e);
+ logger.error("\n\nsparkV2Starter error:\n" + e);
}
return exitCode;
}
@@ -101,8 +109,10 @@ public class SparkStarter implements Starter {
* method to get SparkStarter instance, will return {@link
ClusterModeSparkStarter} or {@link
* ClientModeSparkStarter} depending on deploy mode.
*/
- static SparkStarter getInstance(String[] args) {
- SparkCommandArgs commandArgs = parseCommandArgs(args);
+ static SparkV2Starter getInstance(String[] args) {
+ SparkCommandArgs commandArgs =
+ CommandLineUtils.parse(
+ args, new SparkCommandArgs(),
EngineType.SPARK2.getStarterShellName(), true);
DeployMode deployMode = commandArgs.getDeployMode();
switch (deployMode) {
case CLUSTER:
@@ -135,11 +145,15 @@ public class SparkStarter implements Starter {
public List<String> buildCommands() throws IOException {
setSparkConf();
logger.info("setSparkConf start");
- logger.info(commandArgs.getDeployMode().getName());
- Common.setDeployMode(commandArgs.getDeployMode().getName());
- this.jars.addAll(getPluginsJarDependencies());
- this.jars.addAll(listJars(Common.appLibDir()));
+ logger.info(commandArgs.getDeployMode().toString());
+ Common.setDeployMode(commandArgs.getDeployMode());
+ Common.setStarter(true);
+ this.jars.addAll(Common.getPluginsJarDependencies());
+ this.jars.addAll(Common.getLibJars());
this.jars.addAll(getConnectorJarDependencies());
+ this.jars.addAll(
+ new ArrayList<>(
+
Common.getThirdPartyJars(sparkConf.getOrDefault(EnvCommonOptions.JARS.key(),
""))));
this.appName = this.sparkConf.getOrDefault("spark.app.name",
Constants.LOGO);
logger.info("buildFinal end");
return buildFinal();
@@ -167,7 +181,19 @@ public class SparkStarter implements Starter {
/** Get spark configurations from SeaTunnel job config file. */
static Map<String, String> getSparkConf(String configFile) throws
FileNotFoundException {
- return ConfigParser.getConfigEnvValues(configFile);
+ File file = new File(configFile);
+ if (!file.exists()) {
+ throw new FileNotFoundException("config file '" + file + "' does not
exists!");
+ }
+ Config appConfig =
+ ConfigFactory.parseFile(file)
+ .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+ .resolveWith(
+ ConfigFactory.systemProperties(),
+ ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+ return appConfig.getConfig("env").entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().unwrapped().toString()));
}
/** return plugin's dependent jars, which located in
'plugins/${pluginName}/lib/*'. */
@@ -185,18 +211,45 @@ public class SparkStarter implements Starter {
}
}
- /** return connector's jars, which located in 'connectors/spark/*'. */
+ /** return connector's jars, which located in 'connectors/spark/*'.
2.3.0改为链接seatunnel中 */
private List<Path> getConnectorJarDependencies() {
- Path pluginRootDir = Common.connectorJarDir("SPARK");
+ Path pluginRootDir = Common.connectorJarDir("seatunnel");
if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
return Collections.emptyList();
}
- // Config config =
ConfigFactory.parseFile(Paths.get(commandArgs.getConfigFile()).toFile());
- Config config =
- new ConfigBuilder<>(Paths.get(commandArgs.getConfigFile()),
EngineType.SPARK).getConfig();
- PluginFactory<RuntimeEnv> pluginFactory = new PluginFactory<>(config,
EngineType.SPARK);
- return pluginFactory.getPluginJarPaths().stream()
- .map(url -> new File(url.getPath()).toPath())
+ Config config = ConfigBuilder.of(commandArgs.getConfigFile());
+ Set<URL> pluginJars = new HashSet<>();
+ SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
+ new SeaTunnelSourcePluginDiscovery();
+ SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery = new
SeaTunnelSinkPluginDiscovery();
+ pluginJars.addAll(
+ seaTunnelSourcePluginDiscovery.getPluginJarPaths(
+ getPluginIdentifiers(config, PluginType.SOURCE)));
+ pluginJars.addAll(
+ seaTunnelSinkPluginDiscovery.getPluginJarPaths(
+ getPluginIdentifiers(config, PluginType.SINK)));
+ List<Path> connectPaths =
+ pluginJars.stream()
+ .map(url -> new File(url.getPath()).toPath())
+ .collect(Collectors.toList());
+ logger.info("getConnector jar paths:" + connectPaths.toString());
+ return connectPaths;
+ }
+
+ private List<PluginIdentifier> getPluginIdentifiers(Config config,
PluginType... pluginTypes) {
+ return Arrays.stream(pluginTypes)
+ .flatMap(
+ (Function<PluginType, Stream<PluginIdentifier>>)
+ pluginType -> {
+ List<? extends Config> configList =
config.getConfigList(pluginType.getType());
+ return configList.stream()
+ .map(
+ pluginConfig ->
+ PluginIdentifier.of(
+ "seatunnel",
+ pluginType.getType(),
+ pluginConfig.getString("plugin_name")));
+ })
.collect(Collectors.toList());
}
@@ -213,16 +266,20 @@ public class SparkStarter implements Starter {
/** build final spark-submit commands */
protected List<String> buildFinal() {
List<String> commands = new ArrayList<>();
- commands.add("${SPARK_HOME}/bin/spark-submit");
- appendOption(commands, "--class", SeatunnelSpark.class.getName());
+ commands.add(System.getenv("SPARK_HOME") + "/bin/spark-submit");
+ appendOption(commands, "--class", SeaTunnelSpark.class.getName());
appendOption(commands, "--name", this.appName);
appendOption(commands, "--master", this.commandArgs.getMaster());
- appendOption(commands, "--deploy-mode",
this.commandArgs.getDeployMode().getName());
+ appendOption(commands, "--deploy-mode",
this.commandArgs.getDeployMode().getDeployMode());
appendJars(commands, this.jars);
appendFiles(commands, this.files);
appendSparkConf(commands, this.sparkConf);
appendAppJar(commands);
appendArgs(commands, args);
+ if (this.commandArgs.isCheckConfig()) {
+ commands.add("--check");
+ }
+ logger.info("build command:" + commands);
return commands;
}
@@ -266,11 +323,15 @@ public class SparkStarter implements Starter {
/** append appJar to StringBuilder */
protected void appendAppJar(List<String> commands) {
-
commands.add(Common.appLibDir().resolve("seatunnel-core-spark.jar").toString());
+ //
commands.add(Common.appLibDir().resolve("seatunnel-spark-starter.jar").toString());
+ String appJarPath =
+
Common.appStarterDir().resolve(EngineType.SPARK2.getStarterJarName()).toString();
+ logger.info("spark appJarPath:" + appJarPath);
+ commands.add(appJarPath);
}
/** a Starter for building spark-submit commands with client mode options */
- private static class ClientModeSparkStarter extends SparkStarter {
+ private static class ClientModeSparkStarter extends SparkV2Starter {
/** client mode specified spark options */
private enum ClientModeSparkConfigs {
@@ -329,7 +390,7 @@ public class SparkStarter implements Starter {
}
/** a Starter for building spark-submit commands with cluster mode options */
- private static class ClusterModeSparkStarter extends SparkStarter {
+ private static class ClusterModeSparkStarter extends SparkV2Starter {
private ClusterModeSparkStarter(String[] args, SparkCommandArgs
commandArgs) {
super(args, commandArgs);
@@ -337,11 +398,10 @@ public class SparkStarter implements Starter {
@Override
public List<String> buildCommands() throws IOException {
- Common.setDeployMode(commandArgs.getDeployMode().getName());
+ Common.setDeployMode(commandArgs.getDeployMode());
+ Common.setStarter(true);
Path pluginTarball = Common.pluginTarball();
- if (Files.notExists(pluginTarball)) {
- CompressionUtils.tarGzip(Common.pluginRootDir(), pluginTarball);
- }
+ CompressionUtils.tarGzip(Common.pluginRootDir(), pluginTarball);
this.files.add(pluginTarball);
this.files.add(Paths.get(commandArgs.getConfigFile()));
return super.buildCommands();
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
deleted file mode 100644
index b7fd2fec6..000000000
---
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/sql/FlinkSqlStarter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.sql;
-
-import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.core.base.Starter;
-import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.flink.config.FlinkJobType;
-import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
-
-import java.util.List;
-
-public class FlinkSqlStarter implements Starter {
- public static final Log logger =
LogFactory.getLog(FlinkSqlStarter.class.getName());
- private static final String APP_JAR_NAME = "seatunnel-core-flink-sql.jar";
- private static final String CLASS_NAME = SeatunnelSql.class.getName();
-
- private final FlinkCommandArgs flinkCommandArgs;
- /** SeaTunnel flink sql job jar. */
- private final String appJar;
-
- FlinkSqlStarter(String[] args) {
- this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args,
FlinkJobType.SQL);
- // set the deployment mode, used to get the job jar path.
- Common.setDeployMode(flinkCommandArgs.getDeployMode().getName());
- this.appJar = Common.appLibDir().resolve(APP_JAR_NAME).toString();
- }
-
- @Override
- public List<String> buildCommands() throws Exception {
- return CommandLineUtils.buildFlinkCommand(flinkCommandArgs, CLASS_NAME,
appJar);
- }
-
- @SuppressWarnings("checkstyle:RegexpSingleline")
- public static int main(String[] args) {
- int exitCode = 0;
- logger.info("FlinkSqlStarter start");
- try {
- FlinkSqlStarter flinkSqlStarter = new FlinkSqlStarter(args);
- String commandVal = String.join(" ", flinkSqlStarter.buildCommands());
- logger.info("commandVal:" + commandVal);
- exitCode = SeatunnelUtils.executeLine(commandVal);
- } catch (Exception e) {
- exitCode = 1;
- logger.error("\n\n该任务最可能的错误原因是:\n" + e);
- }
-
- return exitCode;
- }
-}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/zeta/ZetaStarter.java
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/zeta/ZetaStarter.java
new file mode 100644
index 000000000..59d90b9d9
--- /dev/null
+++
b/linkis-engineconn-plugins/seatunnel/src/main/java/org/apache/seatunnel/core/zeta/ZetaStarter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.zeta;
+
+import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.core.starter.Starter;
+import org.apache.seatunnel.core.starter.enums.EngineType;
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class ZetaStarter implements Starter {
+ public static final Log logger =
LogFactory.getLog(ZetaStarter.class.getName());
+
+ private static final String APP_JAR_NAME =
EngineType.SEATUNNEL.getStarterJarName();
+ private static final String SHELL_NAME =
EngineType.SEATUNNEL.getStarterShellName();
+ private static final String namePrefix = "seaTunnel";
+ private final ClientCommandArgs commandArgs;
+ private final String appJar;
+
+ ZetaStarter(String[] args) {
+ this.commandArgs = CommandLineUtils.parse(args, new ClientCommandArgs(),
SHELL_NAME, true);
+ logger.info("this.commandArgs = " + this.commandArgs);
+ // set the deployment mode, used to get the job jar path.
+ Common.setDeployMode(commandArgs.getDeployMode());
+ Common.setStarter(true);
+ this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
+ }
+
+ public static int main(String[] args) {
+ int exitCode = 0;
+ try {
+ logger.info("seaTunnel Zeta process..");
+ ZetaStarter zetaStarter = new ZetaStarter(args);
+ String commandVal = String.join(" ", zetaStarter.buildCommands());
+ logger.info("ZetaStarter commandVal:" + commandVal);
+ exitCode = SeatunnelUtils.executeLine(commandVal);
+ } catch (Exception e) {
+ exitCode = 1;
+ logger.error("\n\nZetaStarter error:\n" + e);
+ }
+ return exitCode;
+ }
+
+ @Override
+ public List<String> buildCommands() {
+ List<String> command = new ArrayList<>();
+ command.add("${SEATUNNEL_HOME}/bin/" + SHELL_NAME);
+ command.add("--master");
+ command.add(this.commandArgs.getMasterType().name());
+ command.add("--cluster");
+ command.add(
+ StringUtils.isNotBlank(this.commandArgs.getClusterName())
+ ? this.commandArgs.getClusterName()
+ : randomClusterName());
+ command.add("--config");
+ command.add(this.commandArgs.getConfigFile());
+ command.add("--name");
+ command.add(this.commandArgs.getJobName());
+ return command;
+ }
+
+ public String randomClusterName() {
+ Random random = new Random();
+ return namePrefix + "-" + random.nextInt(1000000);
+ }
+}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
index bce3a0d38..5f42fa6aa 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
@@ -24,16 +24,23 @@ object SeatunnelFlinkEnvConfiguration {
val LINKIS_FLINK_RUNMODE: CommonVars[String] =
CommonVars[String]("linkis.flink.run.mode", "run-mode")
+ val LINKIS_FLINK_DEPLOY_MODE: CommonVars[String] =
+ CommonVars[String]("linkis.flink.delpoy.mode", "deploy-mode")
+
val LINKIS_FLINK_CONFIG: CommonVars[String] =
CommonVars[String]("linkis.flink.config", "config")
val LINKIS_FLINK_VARIABLE: CommonVars[String] =
CommonVars[String]("linkis.flink.variable", "variable")
+ val LINKIS_FLINK_MASTER: CommonVars[String] =
+ CommonVars[String]("linkis.flink.master", "master")
+
val LINKIS_FLINK_CHECK: CommonVars[String] =
CommonVars[String]("linkis.flink.check", "check")
- val GET_LINKIS_FLINK_RUNMODE = "--" + LINKIS_FLINK_RUNMODE.getValue
+ val GET_LINKIS_FLINK_DEPLOY_MODE = "--" + LINKIS_FLINK_DEPLOY_MODE.getValue
val GET_LINKIS_FLINK_CONFIG = "--" + LINKIS_FLINK_CONFIG.getValue
val GET_LINKIS_FLINK_VARIABLE = "--" + LINKIS_FLINK_VARIABLE.getValue
val GET_LINKIS_FLINK_CHECK = "--" + LINKIS_FLINK_CHECK.getValue
+ val GET_LINKIS_FLINK_MASTER = "--" + LINKIS_FLINK_MASTER.getValue
}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelZetaEnvConfiguration.scala
similarity index 52%
copy from
linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
copy to
linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelZetaEnvConfiguration.scala
index bce3a0d38..335f81d19 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelFlinkEnvConfiguration.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/config/SeatunnelZetaEnvConfiguration.scala
@@ -19,21 +19,23 @@ package org.apache.linkis.engineconnplugin.seatunnel.config
import org.apache.linkis.common.conf.CommonVars
-object SeatunnelFlinkEnvConfiguration {
+object SeatunnelZetaEnvConfiguration {
- val LINKIS_FLINK_RUNMODE: CommonVars[String] =
- CommonVars[String]("linkis.flink.run.mode", "run-mode")
+ val LINKIS_SEATUNNEL_MASTER: CommonVars[String] =
+ CommonVars[String]("linkis.seatunnel.master", "master")
- val LINKIS_FLINK_CONFIG: CommonVars[String] =
- CommonVars[String]("linkis.flink.config", "config")
+ val LINKIS_SEATUNNEL_VARIABLE: CommonVars[String] =
+ CommonVars[String]("linkis.seatunnel.variable", "variable")
- val LINKIS_FLINK_VARIABLE: CommonVars[String] =
- CommonVars[String]("linkis.flink.variable", "variable")
+ val LINKIS_SEATUNNEL_CONFIG: CommonVars[String] =
+ CommonVars[String]("linkis.seatunnel.config", "config")
- val LINKIS_FLINK_CHECK: CommonVars[String] =
CommonVars[String]("linkis.flink.check", "check")
+ val LINKIS_SEATUNNEL_CLUSTER_NAME: CommonVars[String] =
+ CommonVars[String]("linkis.seatunnel.cluster", "cluster")
+
+ val GET_LINKIS_SEATUNNEL_MASTER = "--" + LINKIS_SEATUNNEL_MASTER.getValue
+ val GET_LINKIS_SEATUNNEL_VARIABLE = "--" + LINKIS_SEATUNNEL_VARIABLE.getValue
+ val GET_LINKIS_SEATUNNEL_CONFIG = "--" + LINKIS_SEATUNNEL_CONFIG.getValue
+ val GET_LINKIS_SEATUNNEL_CLUSTER_NAME = "--" +
LINKIS_SEATUNNEL_CLUSTER_NAME.getValue
- val GET_LINKIS_FLINK_RUNMODE = "--" + LINKIS_FLINK_RUNMODE.getValue
- val GET_LINKIS_FLINK_CONFIG = "--" + LINKIS_FLINK_CONFIG.getValue
- val GET_LINKIS_FLINK_VARIABLE = "--" + LINKIS_FLINK_VARIABLE.getValue
- val GET_LINKIS_FLINK_CHECK = "--" + LINKIS_FLINK_CHECK.getValue
}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
index 2a0109d12..b9adf97d9 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkOnceCodeExecutor.scala
@@ -25,18 +25,20 @@ import org.apache.linkis.engineconn.once.executor.{
OnceExecutorExecutionContext,
OperableOnceExecutor
}
-import
org.apache.linkis.engineconnplugin.seatunnel.client.LinkisSeatunnelFlinkClient
+import
org.apache.linkis.engineconnplugin.seatunnel.client.LinkisSeatunnelFlinkV2Client
import
org.apache.linkis.engineconnplugin.seatunnel.client.errorcode.SeatunnelErrorCodeSummary.EXEC_FLINK_CODE_ERROR
import
org.apache.linkis.engineconnplugin.seatunnel.client.exception.JobExecutionException
import
org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelEnvConfiguration
import
org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelFlinkEnvConfiguration.{
GET_LINKIS_FLINK_CHECK,
GET_LINKIS_FLINK_CONFIG,
- GET_LINKIS_FLINK_RUNMODE,
+ GET_LINKIS_FLINK_DEPLOY_MODE,
+ GET_LINKIS_FLINK_MASTER,
GET_LINKIS_FLINK_VARIABLE,
LINKIS_FLINK_CHECK,
LINKIS_FLINK_CONFIG,
- LINKIS_FLINK_RUNMODE,
+ LINKIS_FLINK_DEPLOY_MODE,
+ LINKIS_FLINK_MASTER,
LINKIS_FLINK_VARIABLE
}
import
org.apache.linkis.engineconnplugin.seatunnel.context.SeatunnelEngineConnContext
@@ -79,6 +81,7 @@ class SeatunnelFlinkOnceCodeExecutor(
val code: String = options(TaskConstant.CODE)
params = onceExecutorExecutionContext.getOnceExecutorContent.getJobContent
.asInstanceOf[util.Map[String, String]]
+ logger.info("flink doSubmit args:" + params)
future = Utils.defaultScheduler.submit(new Runnable {
override def run(): Unit = {
logger.info("Try to execute codes." + code)
@@ -106,21 +109,24 @@ class SeatunnelFlinkOnceCodeExecutor(
logger.info("Execute SeatunnelFlink Process")
var args: Array[String] = Array.empty
- val flinkRunMode = LINKIS_FLINK_RUNMODE.getValue
+ val flinkRunMode = LINKIS_FLINK_DEPLOY_MODE.getValue
if (params != null && params.containsKey(flinkRunMode)) {
val config = LINKIS_FLINK_CONFIG.getValue
val variable = LINKIS_FLINK_VARIABLE.getValue
val check = LINKIS_FLINK_CHECK.getValue
+ val master = LINKIS_FLINK_MASTER.getValue
args = Array(
- GET_LINKIS_FLINK_RUNMODE,
+ GET_LINKIS_FLINK_DEPLOY_MODE,
params.getOrDefault(flinkRunMode, "run"),
GET_LINKIS_FLINK_CHECK,
params.getOrDefault(check, "false"),
+ GET_LINKIS_FLINK_MASTER,
+ params.getOrDefault(master, "local"),
GET_LINKIS_FLINK_CONFIG,
generateExecFile(code)
)
-
+ logger.info("runCode args:" + args.mkString("Array(", ", ", ")"))
if (params.containsKey(variable)) {
val variableMap = GSON.fromJson(params.get(variable),
classOf[util.HashMap[String, String]])
variableMap.asScala.foreach(f => {
@@ -130,6 +136,7 @@ class SeatunnelFlinkOnceCodeExecutor(
} else {
args = localArray(code)
+ logger.info("runCode no args:" + args.mkString("Array(", ", ", ")"))
}
System.setProperty("SEATUNNEL_HOME",
System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue))
Files.createSymbolicLink(
@@ -137,7 +144,7 @@ class SeatunnelFlinkOnceCodeExecutor(
new File(SeatunnelEnvConfiguration.SEATUNNEL_HOME.getValue).toPath
)
logger.info(s"Execute SeatunnelFlink Process end args:${args.mkString("
")}")
- LinkisSeatunnelFlinkClient.main(args)
+ LinkisSeatunnelFlinkV2Client.main(args)
}
override protected def waitToRunning(): Unit = {
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
index b0cf6ece7..d938db412 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelSparkOnceCodeExecutor.scala
@@ -24,7 +24,7 @@ import org.apache.linkis.engineconn.once.executor.{
OnceExecutorExecutionContext,
OperableOnceExecutor
}
-import
org.apache.linkis.engineconnplugin.seatunnel.client.LinkisSeatunnelSparkClient
+import
org.apache.linkis.engineconnplugin.seatunnel.client.LinkisSeatunnelSparkV2Client
import
org.apache.linkis.engineconnplugin.seatunnel.client.errorcode.SeatunnelErrorCodeSummary.EXEC_SPARK_CODE_ERROR
import
org.apache.linkis.engineconnplugin.seatunnel.client.exception.JobExecutionException
import
org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelEnvConfiguration
@@ -116,7 +116,7 @@ class SeatunnelSparkOnceCodeExecutor(
new File(SeatunnelEnvConfiguration.SEATUNNEL_HOME.getValue).toPath
)
logger.info(s"Execute SeatunnelSpark Process end args:${args.mkString("
")}")
- LinkisSeatunnelSparkClient.main(args)
+ LinkisSeatunnelSparkV2Client.main(args)
}
override protected def waitToRunning(): Unit = {
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelZetaOnceCodeExecutor.scala
similarity index 78%
rename from
linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
rename to
linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelZetaOnceCodeExecutor.scala
index 784ec92a4..e4fd89198 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelFlinkSQLOnceCodeExecutor.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/executor/SeatunnelZetaOnceCodeExecutor.scala
@@ -25,31 +25,17 @@ import org.apache.linkis.engineconn.once.executor.{
OnceExecutorExecutionContext,
OperableOnceExecutor
}
-import
org.apache.linkis.engineconnplugin.seatunnel.client.LinkisSeatunnelFlinkSQLClient
-import
org.apache.linkis.engineconnplugin.seatunnel.client.errorcode.SeatunnelErrorCodeSummary.EXEC_FLINKSQL_CODE_ERROR
+import
org.apache.linkis.engineconnplugin.seatunnel.client.LinkSeatunnelZetaClient
+import
org.apache.linkis.engineconnplugin.seatunnel.client.errorcode.SeatunnelErrorCodeSummary.EXEC_SEATUNNEL_CODE_ERROR
import
org.apache.linkis.engineconnplugin.seatunnel.client.exception.JobExecutionException
import
org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelEnvConfiguration
-import
org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelFlinkEnvConfiguration.{
- GET_LINKIS_FLINK_CHECK,
- GET_LINKIS_FLINK_CONFIG,
- GET_LINKIS_FLINK_RUNMODE,
- GET_LINKIS_FLINK_VARIABLE,
- LINKIS_FLINK_CHECK,
- LINKIS_FLINK_CONFIG,
- LINKIS_FLINK_RUNMODE,
- LINKIS_FLINK_VARIABLE
-}
+import
org.apache.linkis.engineconnplugin.seatunnel.config.SeatunnelZetaEnvConfiguration._
import
org.apache.linkis.engineconnplugin.seatunnel.context.SeatunnelEngineConnContext
import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils.{
generateExecFile,
localArray
}
-import org.apache.linkis.manager.common.entity.resource.{
- CommonNodeResource,
- LoadInstanceResource,
- NodeResource
-}
-import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
+import org.apache.linkis.manager.common.entity.resource.{CommonNodeResource,
NodeResource}
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.engine.JobProgressInfo
@@ -62,7 +48,7 @@ import java.util.concurrent.{Future, TimeUnit}
import scala.collection.JavaConverters._
-class SeatunnelFlinkSQLOnceCodeExecutor(
+class SeatunnelZetaOnceCodeExecutor(
override val id: Long,
override protected val seatunnelEngineConnContext:
SeatunnelEngineConnContext
) extends SeatunnelOnceExecutor
@@ -87,7 +73,7 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
setResponse(
ErrorExecuteResponse(
"Run code failed!",
- new JobExecutionException(EXEC_FLINKSQL_CODE_ERROR.getErrorDesc)
+ new JobExecutionException(EXEC_SEATUNNEL_CODE_ERROR.getErrorDesc)
)
)
tryFailed()
@@ -103,28 +89,27 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
}
protected def runCode(code: String): Int = {
- logger.info("Execute SeatunnelFlink Process")
+ logger.info("Execute SeaTunnelZeta Process")
var args: Array[String] = Array.empty
- val flinkRunMode = LINKIS_FLINK_RUNMODE.getValue
- if (params != null && params.containsKey(flinkRunMode)) {
- val config = LINKIS_FLINK_CONFIG.getValue
- val variable = LINKIS_FLINK_VARIABLE.getValue
- val check = LINKIS_FLINK_CHECK.getValue
-
+ if (params != null) {
+ val config = LINKIS_SEATUNNEL_CONFIG.getValue
+ val variable = LINKIS_SEATUNNEL_VARIABLE.getValue
+ val masterKey = LINKIS_SEATUNNEL_MASTER.getValue
+ val clusterName = LINKIS_SEATUNNEL_CLUSTER_NAME.getValue
args = Array(
- GET_LINKIS_FLINK_RUNMODE,
- params.getOrDefault(flinkRunMode, "run"),
- GET_LINKIS_FLINK_CHECK,
- params.getOrDefault(check, "false"),
- GET_LINKIS_FLINK_CONFIG,
+ GET_LINKIS_SEATUNNEL_MASTER,
+ params.getOrDefault(masterKey, "cluster"),
+ GET_LINKIS_SEATUNNEL_CLUSTER_NAME,
+ params.getOrDefault(clusterName, "linkis_seatunnel_cluster"),
+ GET_LINKIS_SEATUNNEL_CONFIG,
generateExecFile(code)
)
if (params.containsKey(variable)) {
val variableMap = GSON.fromJson(params.get(variable),
classOf[util.HashMap[String, String]])
variableMap.asScala.foreach(f => {
- args ++ Array(GET_LINKIS_FLINK_VARIABLE, s"${f._1}=${f._2}")
+ args ++ Array(GET_LINKIS_SEATUNNEL_VARIABLE, s"${f._1}=${f._2}")
})
}
@@ -136,8 +121,8 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
new File(System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue) +
"/seatunnel").toPath,
new File(SeatunnelEnvConfiguration.SEATUNNEL_HOME.getValue).toPath
)
- logger.info(s"Execute SeatunnelFlinkSQL Process end args:${args.mkString("
")}")
- LinkisSeatunnelFlinkSQLClient.main(args)
+ logger.info(s"Execute SeatunnelZeta Process end args:${args.mkString("
")}")
+ LinkSeatunnelZetaClient.main(args)
}
override protected def waitToRunning(): Unit = {
@@ -146,7 +131,7 @@ class SeatunnelFlinkSQLOnceCodeExecutor(
new Runnable {
override def run(): Unit = {
if (!(future.isDone || future.isCancelled)) {
- logger.info("The SeatunnelFlinkSQL Process In Running")
+ logger.info("The Seatunnel Zeta Process In Running")
}
}
},
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelEngineConnFactory.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelEngineConnFactory.scala
index 66bcdd9db..a71944a51 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelEngineConnFactory.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelEngineConnFactory.scala
@@ -36,11 +36,12 @@ class SeatunnelEngineConnFactory extends
MultiExecutorEngineConnFactory with Log
override protected def getEngineConnType: EngineType = EngineType.SEATUNNEL
- private val executorFactoryArray = Array[ExecutorFactory](
- new SeatunnelSparkExecutorFactory,
- new SeatunnelFlinkSQLExecutorFactory,
- new SeatunnelFlinkExecutorFactory
- )
+ private val executorFactoryArray =
+ Array[ExecutorFactory](
+ new SeatunnelSparkExecutorFactory,
+ new SeatunnelFlinkExecutorFactory,
+ new SeatunnelZetaExecutorFactory
+ )
override protected def createEngineConnSession(
engineCreationContext: EngineCreationContext
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelFlinkSQLExecutorFactory.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelZetaExecutorFactory.scala
similarity index 91%
rename from
linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelFlinkSQLExecutorFactory.scala
rename to
linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelZetaExecutorFactory.scala
index e5258f3d0..82aa36d4d 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelFlinkSQLExecutorFactory.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/factory/SeatunnelZetaExecutorFactory.scala
@@ -22,14 +22,12 @@ import
org.apache.linkis.engineconn.common.engineconn.EngineConn
import org.apache.linkis.engineconn.once.executor.OnceExecutor
import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorFactory
import
org.apache.linkis.engineconnplugin.seatunnel.context.SeatunnelEngineConnContext
-import
org.apache.linkis.engineconnplugin.seatunnel.executor.SeatunnelFlinkSQLOnceCodeExecutor
+import
org.apache.linkis.engineconnplugin.seatunnel.executor.SeatunnelZetaOnceCodeExecutor
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.RunType
import org.apache.linkis.manager.label.entity.engine.RunType.RunType
-class SeatunnelFlinkSQLExecutorFactory extends OnceExecutorFactory {
-
- override protected def getRunType: RunType = RunType.SEATUNNEL_FLINK_SQL
+class SeatunnelZetaExecutorFactory extends OnceExecutorFactory {
override protected def newExecutor(
id: Int,
@@ -39,8 +37,9 @@ class SeatunnelFlinkSQLExecutorFactory extends
OnceExecutorFactory {
): OnceExecutor = {
engineConn.getEngineConnSession match {
case context: SeatunnelEngineConnContext =>
- new SeatunnelFlinkSQLOnceCodeExecutor(id, context)
+ new SeatunnelZetaOnceCodeExecutor(id, context)
}
}
+ override protected def getRunType: RunType = RunType.SEATUNNEL_ZETA
}
diff --git
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/util/SeatunnelUtils.scala
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/util/SeatunnelUtils.scala
index 8999e93a4..3296babac 100644
---
a/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/util/SeatunnelUtils.scala
+++
b/linkis-engineconn-plugins/seatunnel/src/main/scala/org/apache/linkis/engineconnplugin/seatunnel/util/SeatunnelUtils.scala
@@ -24,6 +24,7 @@ import org.apache.commons.io.IOUtils
import org.apache.commons.logging.{Log, LogFactory}
import java.io.{BufferedReader, File, InputStreamReader, PrintWriter}
+import java.lang.ProcessBuilder.Redirect
object SeatunnelUtils {
val LOGGER: Log = LogFactory.getLog(SeatunnelUtils.getClass)
@@ -48,14 +49,14 @@ object SeatunnelUtils {
var bufferedReader: BufferedReader = null
try {
val processBuilder: ProcessBuilder = new
ProcessBuilder(generateRunCode(code): _*)
+ val file = new File(
+ System.getenv(ENGINE_CONN_LOCAL_PATH_PWD_KEY.getValue) +
"/logs/yarnApp.log"
+ )
+ processBuilder.redirectErrorStream(true)
+ processBuilder.redirectOutput(Redirect.appendTo(file))
+ LOGGER.info("process ready start.")
process = processBuilder.start()
- bufferedReader = new BufferedReader(new
InputStreamReader(process.getInputStream))
- var line: String = null
- while ({
- line = bufferedReader.readLine(); line != null
- }) {
- LOGGER.info(line)
- }
+ LOGGER.info(s"process start: $code")
val exitcode = process.waitFor()
exitcode
} finally {
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/pom.xml
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/pom.xml
index 7c4da6950..1449c4855 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/pom.xml
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/pom.xml
@@ -28,7 +28,7 @@
<properties>
<postgresql.version>42.3.8</postgresql.version>
- <clickhouse.version>0.3.2-patch11</clickhouse.version>
+ <clickhouse.version>0.4.6</clickhouse.version>
</properties>
<dependencies>
diff --git a/tool/dependencies/known-dependencies.txt
b/tool/dependencies/known-dependencies.txt
index 493b44629..705082a96 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -56,7 +56,7 @@ chill-java-0.7.6.jar
chill_2.12-0.7.6.jar
classgraph-4.1.7.jar
classmate-1.5.1.jar
-clickhouse-jdbc-0.3.2-patch11.jar
+clickhouse-jdbc-0.4.6.jar
commons-beanutils-1.9.4.jar
commons-cli-1.3.1.jar
commons-codec-1.10.jar
@@ -180,6 +180,7 @@ hadoop-yarn-api-3.3.4.jar
hadoop-yarn-client-3.3.4.jar
hadoop-yarn-common-3.3.4.jar
hadoop-yarn-registry-3.3.4.jar
+hazelcast-5.1.jar
hibernate-validator-5.1.2.Final.jar
hibernate-validator-6.1.7.Final.jar
hive-classification-3.1.3.jar
@@ -476,9 +477,21 @@ scala-reflect-2.12.17.jar
scala-xml_2.12-2.1.0.jar
scalap-2.12.17.jar
scopt_2.12-3.5.0.jar
-seatunnel-core-flink-2.1.2.jar
-seatunnel-core-flink-sql-2.1.2.jar
-seatunnel-core-spark-2.1.2.jar
+jackson-dataformat-properties-2.13.4.jar
+jcl-over-slf4j-1.7.30.jar
+jcommander-1.81.jar
+seatunnel-api-2.3.1.jar
+seatunnel-common-2.3.1.jar
+seatunnel-config-base-2.3.1.jar
+seatunnel-config-shade-2.3.1.jar
+seatunnel-core-starter-2.3.1.jar
+seatunnel-engine-common-2.3.1.jar
+seatunnel-engine-core-2.3.1.jar
+seatunnel-flink-13-starter-2.3.1.jar
+seatunnel-jackson-2.3.1-optional.jar
+seatunnel-plugin-discovery-2.3.1.jar
+seatunnel-spark-2-starter-2.3.1.jar
+seatunnel-starter-2.3.1.jar
security-0.191.jar
security-0.193.jar
servo-core-0.12.21.jar
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]