This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new 12459d19 [Bug] [Seatunnel-web] When job execution initialization
fails, the job execution status remains unchanged. (#194)
12459d19 is described below
commit 12459d1998f7c6c8717bb9564a806b0f3d4a5de3
Author: Mohammad Arshad <[email protected]>
AuthorDate: Sun Aug 25 14:49:39 2024 +0530
[Bug] [Seatunnel-web] When job execution initialization fails, the job
execution status remains unchanged. (#194)
---
.../app/service/impl/JobExecutorServiceImpl.java | 59 +++++++++++++---------
.../server/common/SeatunnelErrorEnum.java | 1 +
.../app/test/JobExecutorControllerTest.java | 22 ++++++++
3 files changed, 59 insertions(+), 23 deletions(-)
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
index fbcf0190..b8ca731e 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java
@@ -30,7 +30,6 @@ import
org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory
import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
@@ -39,6 +38,7 @@ import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder;
import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.springframework.stereotype.Service;
@@ -52,6 +52,7 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.util.Date;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -73,9 +74,18 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
String configFile = writeJobConfigIntoConfFile(jobConfig, jobDefineId);
- Long jobInstanceId =
- executeJobBySeaTunnel(userId, configFile,
executeResource.getJobInstanceId());
- return Result.success(jobInstanceId);
+ try {
+ executeJobBySeaTunnel(userId, configFile,
executeResource.getJobInstanceId());
+ return Result.success(executeResource.getJobInstanceId());
+ } catch (RuntimeException e) {
+ Result<Long> failure =
+
Result.failure(SeatunnelErrorEnum.JUB_EXEC_SUBMISSION_ERROR, e.getMessage());
+ // Even though job execution submission failed, we still need to
return the
+ // jobInstanceId to the user
+ // as the job instance has been created in the database.
+ failure.setData(executeResource.getJobInstanceId());
+ return failure;
+ }
}
public String writeJobConfigIntoConfFile(String jobConfig, Long
jobDefineId) {
@@ -101,35 +111,38 @@ public class JobExecutorServiceImpl implements
IJobExecutorService {
return filePath;
}
- public Long executeJobBySeaTunnel(Integer userId, String filePath, Long
jobInstanceId) {
+ private void executeJobBySeaTunnel(Integer userId, String filePath, Long
jobInstanceId) {
Common.setDeployMode(DeployMode.CLIENT);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(jobInstanceId + "_job");
- SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+ SeaTunnelClient seaTunnelClient;
+ ClientJobProxy clientJobProxy;
try {
+ seaTunnelClient = createSeaTunnelClient();
SeaTunnelConfig seaTunnelConfig = new
YamlSeaTunnelConfigBuilder().build();
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath,
jobConfig, seaTunnelConfig);
- final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ clientJobProxy = jobExecutionEnv.execute();
+ } catch (Throwable e) {
+ log.error("Job execution submission failed.", e);
JobInstance jobInstance =
jobInstanceDao.getJobInstance(jobInstanceId);
-
jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
+ jobInstance.setJobStatus(JobStatus.FAILED.name());
+ jobInstance.setEndTime(new Date());
jobInstanceDao.update(jobInstance);
-
- CompletableFuture.runAsync(
- () -> {
- waitJobFinish(
- clientJobProxy,
- userId,
- jobInstanceId,
- Long.toString(clientJobProxy.getJobId()),
- seaTunnelClient);
- });
-
- } catch (ExecutionException | InterruptedException e) {
- ExceptionUtils.getMessage(e);
- throw new RuntimeException(e);
+ throw new RuntimeException(e.getMessage(), e);
}
- return jobInstanceId;
+ JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
+ jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
+ jobInstanceDao.update(jobInstance);
+ CompletableFuture.runAsync(
+ () -> {
+ waitJobFinish(
+ clientJobProxy,
+ userId,
+ jobInstanceId,
+ Long.toString(clientJobProxy.getJobId()),
+ seaTunnelClient);
+ });
}
public void waitJobFinish(
diff --git
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
index b81cec09..5f4b218e 100644
---
a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
+++
b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java
@@ -82,6 +82,7 @@ public enum SeatunnelErrorEnum {
"load job state from engine error",
"load job statue from engine [%s] error, error msg is [%s]"),
UNSUPPORTED_ENGINE(40003, "unsupported engine", "unsupported engine [%s]
version [%s]"),
+ JUB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"),
JOB_RUN_GENERATE_UUID_ERROR(50001, "generate uuid error", "generate uuid
error"),
/* datasource and virtual table */
diff --git
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
index 7c942e1d..965ad200 100644
---
a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
+++
b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java
@@ -243,6 +243,28 @@ public class JobExecutorControllerTest {
assertTrue(result.isSuccess());
}
+ @Test
+ public void executeJob_JobStatusUpdate_WhenSubmissionFailed() {
+ String jobName = "execJobStatus" + uniqueId;
+ JobCreateReq jobCreateReq =
JobUtils.populateMySQLJobCreateReqFromFile();
+ jobCreateReq.getJobConfig().setName(jobName);
+ jobCreateReq.getJobConfig().setDescription(jobName + " description");
+ String datasourceName = "execJobStatus_db_1" + uniqueId;
+ String mysqlDatasourceId =
+
seatunnelDatasourceControllerWrapper.createMysqlDatasource(datasourceName);
+ for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
+ pluginConfig.setDataSourceId(Long.parseLong(mysqlDatasourceId));
+ }
+ Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
+ assertTrue(job.isSuccess());
+ Long jobVersionId = job.getData();
+ Result<Long> result =
jobExecutorControllerWrapper.jobExecutor(jobVersionId);
+ // Fails because of the wrong database credentials.
+ assertFalse(result.isSuccess());
+ // Even though job failed but job instance is created into the
database.
+ assertTrue(result.getData() > 0);
+ }
+
@AfterAll
public static void tearDown() {
seaTunnelWebCluster.stop();