Hisoka-X commented on code in PR #2982:
URL:
https://github.com/apache/incubator-seatunnel/pull/2982#discussion_r1003041191
##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
pluginsJarDependencies.forEach(url ->
FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(),
url));
- flinkEnvironment.registerPlugin(pluginsJarDependencies);
+ jarPaths.addAll(pluginsJarDependencies);
+ }
+
+ private Config registerPlugin(Config config, List<URL> jars) {
+ config = this.parseConfig(config, ConfigUtil.joinPath("env",
"pipeline", "jars"), jars);
+ return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline",
"classpaths"), jars);
+ }
+
+ private Config parseConfig(Config config, String path, List<URL> jars) {
Review Comment:
Should change name too, Can't know it want to do what if just read name.
##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -50,15 +57,29 @@ public class FlinkExecution implements TaskExecution {
private final PluginExecuteProcessor sourcePluginExecuteProcessor;
private final PluginExecuteProcessor transformPluginExecuteProcessor;
private final PluginExecuteProcessor sinkPluginExecuteProcessor;
+ private final List<URL> jarPaths;
public FlinkExecution(Config config) {
- this.flinkEnvironment = new
FlinkEnvironmentFactory(config).getEnvironment();
- JobContext jobContext = new JobContext();
- jobContext.setJobMode(flinkEnvironment.getJobMode());
+ try {
+ jarPaths = new ArrayList<>(Collections.singletonList(
+ new
File(Common.appLibDir().resolve(FlinkStarter.APP_JAR_NAME).toString()).toURI().toURL()));
+ } catch (MalformedURLException e) {
+ throw new SeaTunnelException("load flink starter error.", e);
+ }
registerPlugin();
- this.sourcePluginExecuteProcessor = new
SourceExecuteProcessor(flinkEnvironment, jobContext,
config.getConfigList(Constants.SOURCE));
- this.transformPluginExecuteProcessor = new
TransformExecuteProcessor(flinkEnvironment, jobContext,
config.getConfigList(Constants.TRANSFORM));
- this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(flinkEnvironment, jobContext,
config.getConfigList(Constants.SINK));
+ JobContext jobContext = new JobContext();
+ jobContext.setJobMode(new
FlinkEnvironmentFactory(config).getJobMode(config.getConfig("env")));
+
+ this.sourcePluginExecuteProcessor = new
SourceExecuteProcessor(jarPaths, config.getConfigList(Constants.SOURCE),
jobContext);
+ this.transformPluginExecuteProcessor = new
TransformExecuteProcessor(jarPaths, config.getConfigList(Constants.TRANSFORM),
jobContext);
+ this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(jarPaths,
config.getConfigList(Constants.SINK), jobContext);
+
+ this.flinkEnvironment = new
FlinkEnvironmentFactory(this.registerPlugin(config, jarPaths)).getEnvironment();
Review Comment:
Don't create `FlinkEnvironmentFactory` twice
##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -50,15 +57,29 @@ public class FlinkExecution implements TaskExecution {
private final PluginExecuteProcessor sourcePluginExecuteProcessor;
private final PluginExecuteProcessor transformPluginExecuteProcessor;
private final PluginExecuteProcessor sinkPluginExecuteProcessor;
+ private final List<URL> jarPaths;
public FlinkExecution(Config config) {
- this.flinkEnvironment = new
FlinkEnvironmentFactory(config).getEnvironment();
- JobContext jobContext = new JobContext();
- jobContext.setJobMode(flinkEnvironment.getJobMode());
+ try {
+ jarPaths = new ArrayList<>(Collections.singletonList(
Review Comment:
Use `Set` to aovid repeated jar path.
##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
pluginsJarDependencies.forEach(url ->
FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(),
url));
- flinkEnvironment.registerPlugin(pluginsJarDependencies);
+ jarPaths.addAll(pluginsJarDependencies);
+ }
+
+ private Config registerPlugin(Config config, List<URL> jars) {
+ config = this.parseConfig(config, ConfigUtil.joinPath("env",
"pipeline", "jars"), jars);
+ return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline",
"classpaths"), jars);
+ }
+
+ private Config parseConfig(Config config, String path, List<URL> jars) {
+
+ if (config.hasPath(path)) {
+ List<URL> paths =
Arrays.stream(config.getString(path).split(";")).map(uri -> {
Review Comment:
Use `Set` to aovid repeated jar path.
##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
pluginsJarDependencies.forEach(url ->
FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(),
url));
- flinkEnvironment.registerPlugin(pluginsJarDependencies);
+ jarPaths.addAll(pluginsJarDependencies);
+ }
+
+ private Config registerPlugin(Config config, List<URL> jars) {
Review Comment:
Maybe should named `injectJarsToConfig`
##########
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -90,6 +111,33 @@ private void registerPlugin() {
pluginsJarDependencies.forEach(url ->
FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(),
url));
- flinkEnvironment.registerPlugin(pluginsJarDependencies);
+ jarPaths.addAll(pluginsJarDependencies);
+ }
+
+ private Config registerPlugin(Config config, List<URL> jars) {
+ config = this.parseConfig(config, ConfigUtil.joinPath("env",
"pipeline", "jars"), jars);
+ return this.parseConfig(config, ConfigUtil.joinPath("env", "pipeline",
"classpaths"), jars);
Review Comment:
Change the doc to tell user the new way add jar. Also this function should
support spark later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]