This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 30af654ae [Feature][Core] Supprot add thirdparty jar in env (#3576)
30af654ae is described below
commit 30af654aef3ab6bc56bbbce785258e869db9837a
Author: liugddx <[email protected]>
AuthorDate: Wed Nov 30 17:09:08 2022 +0800
[Feature][Core] Supprot add thirdparty jar in env (#3576)
* supprot add thirdparty jar in env
---
docs/en/connector-v2/EnvConf.md | 23 ++++++++++++++
docs/sidebars.js | 3 +-
.../apache/seatunnel/api/env/EnvCommonOptions.java | 6 ++++
.../apache/seatunnel/api/env/EnvOptionRule.java | 1 +
.../org/apache/seatunnel/common/config/Common.java | 14 +++++++++
.../starter/flink/execution/FlinkExecution.java | 12 ++++++--
.../seatunnel/core/starter/spark/SparkStarter.java | 36 ++++++++++++----------
.../connectors/seatunnel/jdbc/AbstractJdbcIT.java | 2 +-
.../resources/jdbc_gbase8a_source_to_assert.conf | 1 +
.../test/resources/jdbc_oracle_source_to_sink.conf | 1 +
.../resources/jdbc_starrocks_source_to_sink.conf | 1 +
.../engine/client/job/JobExecutionEnvironment.java | 14 +++++++++
12 files changed, 92 insertions(+), 22 deletions(-)
diff --git a/docs/en/connector-v2/EnvConf.md b/docs/en/connector-v2/EnvConf.md
new file mode 100644
index 000000000..d0616d039
--- /dev/null
+++ b/docs/en/connector-v2/EnvConf.md
@@ -0,0 +1,23 @@
+# EnvConf
+
+This document describes env configuration information,env unifies the
environment variables of all engines.
+
+## job.name
+
+This parameter configures the task name.
+
+## jars
+
+Third-party packages can be loaded via `jars`, like
`jars="file://local/jar1.jar;file://local/jar2.jar"`
+
+## job.mode
+
+You can configure whether the task is in batch mode or stream mode through
`job.mode`, like `job.mode = "BATCH"` or `job.mode = "STREAMING"`
+
+## checkpoint.interval
+
+Gets the interval in which checkpoints are periodically scheduled.
+
+## parallelism
+
+This parameter configures the parallelism of source and sink.
diff --git a/docs/sidebars.js b/docs/sidebars.js
index 470290d42..9951a4650 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -121,7 +121,8 @@ const sidebars = {
}
]
},
- "connector-v2/Error-Quick-Reference-Manual"
+ "connector-v2/Error-Quick-Reference-Manual",
+ "connector-v2/EnvConf"
]
},
{
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index 52a716add..c1a8db383 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -49,6 +49,12 @@ public class EnvCommonOptions {
.noDefaultValue()
.withDescription("The interval (in milliseconds) between two
consecutive checkpoints.");
+ public static final Option<String> JARS =
+ Options.key("jars")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("third-party packages can be loaded via `jars`");
+
public static final Option<Map<String, String>> CUSTOM_PARAMETERS =
Options.key("custom_parameters")
.mapType()
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
index cbfd3d455..bc40247b3 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
@@ -26,6 +26,7 @@ public class EnvOptionRule {
.required(EnvCommonOptions.JOB_MODE)
.optional(EnvCommonOptions.JOB_NAME,
EnvCommonOptions.PARALLELISM,
+ EnvCommonOptions.JARS,
EnvCommonOptions.CHECKPOINT_INTERVAL,
EnvCommonOptions.CUSTOM_PARAMETERS)
.build();
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
index 871773732..02a145471 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -23,12 +23,15 @@ import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -160,6 +163,17 @@ public class Common {
}
}
+ /**
+ * return the jar package configured in env jars
+ */
+ public static Set<Path> getThirdPartyJars(String paths) {
+
+ return Arrays.stream(paths.split(";"))
+ .filter(s -> !"".equals(s))
+ .filter(it -> it.endsWith(".jar"))
+ .map(path ->
Paths.get(URI.create(path))).collect(Collectors.toSet());
+ }
+
public static Path pluginTarball() {
return appRootDir().resolve("plugins.tar.gz");
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index a673199f9..82991a056 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
@@ -69,7 +70,7 @@ public class FlinkExecution implements TaskExecution {
} catch (MalformedURLException e) {
throw new SeaTunnelException("load flink starter error.", e);
}
- registerPlugin();
+ registerPlugin(config.getConfig("env"));
JobContext jobContext = new JobContext();
jobContext.setJobMode(FlinkEnvironmentFactory.getJobMode(config));
@@ -100,8 +101,13 @@ public class FlinkExecution implements TaskExecution {
}
}
- private void registerPlugin() {
- List<URL> jarDependencies =
Stream.concat(Common.getPluginsJarDependencies().stream(),
Common.getLibJars().stream())
+ private void registerPlugin(Config envConfig) {
+ List<Path> thirdPartyJars = new ArrayList<>();
+ if (envConfig.hasPath(EnvCommonOptions.JARS.key())) {
+ thirdPartyJars = new
ArrayList<>(Common.getThirdPartyJars(envConfig.getString(EnvCommonOptions.JARS.key())));
+ }
+ thirdPartyJars.addAll(Common.getPluginsJarDependencies());
+ List<URL> jarDependencies = Stream.concat(thirdPartyJars.stream(),
Common.getLibJars().stream())
.map(Path::toUri)
.map(uri -> {
try {
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 681de41d7..2f645fa1f 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.spark;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.starter.Starter;
@@ -124,6 +125,7 @@ public class SparkStarter implements Starter {
this.jars.addAll(Common.getPluginsJarDependencies());
this.jars.addAll(Common.getLibJars());
this.jars.addAll(getConnectorJarDependencies());
+ this.jars.addAll(new
ArrayList<>(Common.getThirdPartyJars(sparkConf.getOrDefault(EnvCommonOptions.JARS.key(),
""))));
return buildFinal();
}
@@ -132,19 +134,19 @@ public class SparkStarter implements Starter {
*/
private void setSparkConf() throws FileNotFoundException {
commandArgs.getVariables()
- .stream()
- .filter(Objects::nonNull)
- .map(variable -> variable.split("=", 2))
- .filter(pair -> pair.length == 2)
- .forEach(pair -> System.setProperty(pair[0], pair[1]));
+ .stream()
+ .filter(Objects::nonNull)
+ .map(variable -> variable.split("=", 2))
+ .filter(pair -> pair.length == 2)
+ .forEach(pair -> System.setProperty(pair[0], pair[1]));
this.sparkConf = getSparkConf(commandArgs.getConfigFile());
String driverJavaOpts =
this.sparkConf.getOrDefault("spark.driver.extraJavaOptions", "");
String executorJavaOpts =
this.sparkConf.getOrDefault("spark.executor.extraJavaOptions", "");
if (!commandArgs.getVariables().isEmpty()) {
String properties = commandArgs.getVariables()
- .stream()
- .map(v -> "-D" + v)
- .collect(Collectors.joining(" "));
+ .stream()
+ .map(v -> "-D" + v)
+ .collect(Collectors.joining(" "));
driverJavaOpts += " " + properties;
executorJavaOpts += " " + properties;
this.sparkConf.put("spark.driver.extraJavaOptions",
driverJavaOpts.trim());
@@ -161,13 +163,13 @@ public class SparkStarter implements Starter {
throw new FileNotFoundException("config file '" + file + "' does
not exists!");
}
Config appConfig = ConfigFactory.parseFile(file)
-
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
- .resolveWith(ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+ .resolveWith(ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
return appConfig.getConfig("env")
- .entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().unwrapped().toString()));
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().unwrapped().toString()));
}
/**
@@ -235,8 +237,8 @@ public class SparkStarter implements Starter {
protected void appendPaths(List<String> commands, String option,
List<Path> paths) {
if (!paths.isEmpty()) {
String values = paths.stream()
- .map(Path::toString)
- .collect(Collectors.joining(","));
+ .map(Path::toString)
+ .collect(Collectors.joining(","));
appendOption(commands, option, values);
}
}
@@ -264,8 +266,8 @@ public class SparkStarter implements Starter {
return Arrays.stream(pluginTypes).flatMap((Function<PluginType,
Stream<PluginIdentifier>>) pluginType -> {
List<? extends Config> configList =
config.getConfigList(pluginType.getType());
return configList.stream()
- .map(pluginConfig -> PluginIdentifier.of("seatunnel",
pluginType.getType(),
- pluginConfig.getString("plugin_name")));
+ .map(pluginConfig -> PluginIdentifier.of("seatunnel",
pluginType.getType(),
+ pluginConfig.getString("plugin_name")));
}).collect(Collectors.toList());
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index 99ec3b58e..602428e76 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -73,7 +73,7 @@ public abstract class AbstractJdbcIT extends TestSuiteBase
implements TestResour
@TestContainerExtension
private final ContainerExtendedFactory extendedFactory = container -> {
- Container.ExecResult extraCommands = container.execInContainer("bash",
"-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd
/tmp/seatunnel/plugins/Jdbc/lib && curl -O " + jdbcCase.getDriverJar());
+ Container.ExecResult extraCommands = container.execInContainer("bash",
"-c", "mkdir -p /tmp/jars && cd /tmp/jars && curl -O " +
jdbcCase.getDriverJar());
Assertions.assertEquals(0, extraCommands.getExitCode());
};
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf
index aff8f0096..4e65793f2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf
@@ -22,6 +22,7 @@ env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
+ jars =
"file:///tmp/jars/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar"
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_oracle_source_to_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_oracle_source_to_sink.conf
index c67a4d2fe..1ba38aa04 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_oracle_source_to_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_oracle_source_to_sink.conf
@@ -22,6 +22,7 @@ env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
+ jars = "file:///tmp/jars/ojdbc8-12.2.0.1.jar"
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf
index 54886d059..02ed19601 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf
@@ -18,6 +18,7 @@
env {
execution.parallelism = 1
job.mode = "BATCH"
+ jars = "file:///tmp/jars/mysql-connector-java-8.0.16.jar"
}
source {
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 985b95cc8..94c80cabd 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.client.job;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
@@ -35,14 +36,17 @@ import com.hazelcast.logging.Logger;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.io.IOException;
+import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
public class JobExecutionEnvironment {
@@ -76,6 +80,16 @@ public class JobExecutionEnvironment {
this.jobClient = new JobClient(seaTunnelHazelcastClient);
this.jobConfig.setJobContext(new JobContext(jobClient.getNewJobId()));
this.commonPluginJars.addAll(searchPluginJars());
+ this.commonPluginJars.addAll(new
ArrayList<>(Common.getThirdPartyJars(jobConfig.getEnvOptions()
+ .getOrDefault(EnvCommonOptions.JARS.key(),
"").toString()).stream().map(Path::toUri)
+ .map(uri -> {
+ try {
+ return uri.toURL();
+ } catch (MalformedURLException e) {
+ throw new SeaTunnelEngineException("the uri of jar
illegal:" + uri, e);
+ }
+ })
+ .collect(Collectors.toList())));
LOGGER.info("add common jar in plugins :" + commonPluginJars);
}