This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 8fbf1f60a Fix JobContext#jobId error (#2744)
8fbf1f60a is described below

commit 8fbf1f60a9469d28b7351a581dc7034a12ab1f73
Author: Eric <[email protected]>
AuthorDate: Fri Sep 16 17:11:48 2022 +0800

    Fix JobContext#jobId error (#2744)
---
 .../main/java/org/apache/seatunnel/api/common/JobContext.java  |  4 ++++
 .../apache/seatunnel/engine/client/job/JobConfigParser.java    | 10 ++++------
 .../seatunnel/engine/client/job/JobExecutionEnvironment.java   | 10 ++++++----
 .../apache/seatunnel/engine/client/JobConfigParserTest.java    |  9 +++++++--
 .../seatunnel/engine/client/LogicalDagGeneratorTest.java       |  4 +++-
 5 files changed, 24 insertions(+), 13 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
index 9e56de36a..854dd7ac7 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
@@ -45,6 +45,10 @@ public final class JobContext implements Serializable {
         this.jobId = UUID.randomUUID().toString().replace("-", "");
     }
 
+    public JobContext(Long jobId) {
+        this.jobId = jobId + "";
+    }
+
     /**
      * Put table schema.
      *
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
index b1e5aa1fb..51eba4d29 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.engine.client.job;
 
-import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -109,13 +108,11 @@ public class JobConfigParser {
     }
 
     private void jobConfigAnalyze(@NonNull Config envConfigs) {
-        JobContext jobContext = new JobContext();
         if (envConfigs.hasPath("job.mode")) {
-            jobContext.setJobMode(envConfigs.getEnum(JobMode.class, 
"job.mode"));
+            
jobConfig.getJobContext().setJobMode(envConfigs.getEnum(JobMode.class, 
"job.mode"));
         } else {
-            jobContext.setJobMode(JobMode.BATCH);
+            jobConfig.getJobContext().setJobMode(JobMode.BATCH);
         }
-        jobConfig.setJobContext(jobContext);
     }
 
     /**
@@ -293,7 +290,8 @@ public class JobConfigParser {
         }
 
         ImmutablePair<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, 
Serializable>, Set<URL>>
-            sinkListImmutablePair = 
ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0), 
jobConfig.getJobContext());
+            sinkListImmutablePair =
+            ConnectorInstanceLoader.loadSinkInstance(sinkConfigs.get(0), 
jobConfig.getJobContext());
         SinkAction sinkAction = createSinkAction(
             idGenerator.getNextId(),
             sinkListImmutablePair.getLeft().getPluginName(),
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 7176867a2..57b5a8e3e 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.client.job;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -35,8 +36,6 @@ import java.util.concurrent.ExecutionException;
 
 public class JobExecutionEnvironment {
 
-    private static String DEFAULT_JOB_NAME = "test_st_job";
-
     private JobConfig jobConfig;
 
     private int maxParallelism = 1;
@@ -51,12 +50,16 @@ public class JobExecutionEnvironment {
 
     private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
 
+    private final JobClient jobClient;
+
     public JobExecutionEnvironment(JobConfig jobConfig, String jobFilePath,
                                    SeaTunnelHazelcastClient 
seaTunnelHazelcastClient) {
         this.jobConfig = jobConfig;
         this.jobFilePath = jobFilePath;
         this.idGenerator = new IdGenerator();
         this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
+        this.jobClient = new JobClient(seaTunnelHazelcastClient);
+        this.jobConfig.setJobContext(new JobContext(jobClient.getNewJobId()));
     }
 
     private JobConfigParser getJobConfigParser() {
@@ -76,9 +79,8 @@ public class JobExecutionEnvironment {
     }
 
     public ClientJobProxy execute() throws ExecutionException, 
InterruptedException {
-        JobClient jobClient = new JobClient(seaTunnelHazelcastClient);
         JobImmutableInformation jobImmutableInformation = new 
JobImmutableInformation(
-            jobClient.getNewJobId(),
+            Long.valueOf(jobConfig.getJobContext().getJobId()),
             
seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
             jobConfig,
             jarUrls);
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index a04932eeb..ba1f6d0d2 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.client;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.engine.client.job.JobConfigParser;
@@ -42,7 +43,9 @@ public class JobConfigParserTest {
     public void testSimpleJobParse() {
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = 
TestUtils.getResource("/batch_fakesource_to_file.conf");
-        JobConfigParser jobConfigParser = new JobConfigParser(filePath, new 
IdGenerator(), new JobConfig());
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setJobContext(new JobContext());
+        JobConfigParser jobConfigParser = new JobConfigParser(filePath, new 
IdGenerator(), jobConfig);
         ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
         List<Action> actions = parse.getLeft();
         Assert.assertEquals(1, actions.size());
@@ -59,7 +62,9 @@ public class JobConfigParserTest {
     public void testComplexJobParse() {
         Common.setDeployMode(DeployMode.CLIENT);
         String filePath = 
TestUtils.getResource("/batch_fakesource_to_file_complex.conf");
-        JobConfigParser jobConfigParser = new JobConfigParser(filePath, new 
IdGenerator(), new JobConfig());
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setJobContext(new JobContext());
+        JobConfigParser jobConfigParser = new JobConfigParser(filePath, new 
IdGenerator(), jobConfig);
         ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
         List<Action> actions = parse.getLeft();
         Assert.assertEquals(1, actions.size());
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 1c0da5503..66928e0e9 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.client;
 
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.engine.client.job.JobConfigParser;
@@ -45,10 +46,11 @@ public class LogicalDagGeneratorTest {
         String filePath = 
TestUtils.getResource("/batch_fakesource_to_file_complex.conf");
         JobConfig jobConfig = new JobConfig();
         jobConfig.setName("fake_to_file");
+        jobConfig.setJobContext(new JobContext());
 
         IdGenerator idGenerator = new IdGenerator();
         ImmutablePair<List<Action>, Set<URL>> immutablePair =
-            new JobConfigParser(filePath, idGenerator, new 
JobConfig()).parse();
+            new JobConfigParser(filePath, idGenerator, jobConfig).parse();
 
         LogicalDagGenerator logicalDagGenerator =
             new LogicalDagGenerator(immutablePair.getLeft(), jobConfig, 
idGenerator);

Reply via email to