This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 77f61408b7 [Improve][Engine] Support custom job id for rest-api named
submit-job (#7053)
77f61408b7 is described below
commit 77f61408b7e45d18979f6db410c64162514a218d
Author: dailai <[email protected]>
AuthorDate: Tue Jun 25 13:35:52 2024 +0800
[Improve][Engine] Support custom job id for rest-api named submit-job
(#7053)
---
.../engine/e2e/ClusterSeaTunnelContainer.java | 189 ++++++++++-----------
.../server/rest/RestHttpPostCommandProcessor.java | 12 +-
.../server/rest/RestJobExecutionEnvironment.java | 3 +-
3 files changed, 100 insertions(+), 104 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
index f3b5922b3b..2967d4227f 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
@@ -66,6 +66,8 @@ public class ClusterSeaTunnelContainer extends
SeaTunnelContainer {
private static final Path hadoopJar =
Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar");
+ private static final long CUSTOM_JOB_ID = 123456789;
+
@Override
@BeforeEach
public void startUp() throws Exception {
@@ -101,106 +103,24 @@ public class ClusterSeaTunnelContainer extends
SeaTunnelContainer {
}
@Test
- public void testSubmitJob() {
+ public void testSubmitJobWithCustomJobId() {
AtomicInteger i = new AtomicInteger();
-
Arrays.asList(server, secondServer)
.forEach(
- container -> {
- Response response =
- i.get() == 0
- ? submitJob(container, "BATCH",
jobName, paramJobName)
- : submitJob(container, "BATCH",
jobName, null);
- if (i.get() == 0) {
- response.then()
- .statusCode(200)
- .body("jobName",
equalTo(paramJobName));
- } else {
-
response.then().statusCode(200).body("jobName", equalTo(jobName));
- }
- String jobId =
response.getBody().jsonPath().getString("jobId");
+ container ->
+ submitJobAndAssertResponse(
+ container,
+ i,
+ paramJobName + "&jobId=" +
CUSTOM_JOB_ID,
+ true));
+ }
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () -> {
- given().get(
- http
- +
container.getHost()
- + colon
- +
container
-
.getFirstMappedPort()
- +
RestConstant
-
.FINISHED_JOBS_INFO
- +
"/FINISHED")
- .then()
- .statusCode(200)
- .body(
- "[" + i.get()
+ "].jobName",
- equalTo(
-
i.get() == 0
-
? paramJobName
-
: jobName))
- .body(
- "[" + i.get()
+ "].errorMsg",
- equalTo(null))
- .body(
- "[" + i.get()
+ "].jobDag.jobId",
-
equalTo(Long.parseLong(jobId)))
- .body(
- "["
- +
i.get()
- +
"].metrics.SourceReceivedCount",
- equalTo("100"))
- .body(
- "["
- +
i.get()
- +
"].metrics.SinkWriteCount",
- equalTo("100"))
- .body(
- "[" + i.get()
+ "].jobStatus",
-
equalTo("FINISHED"));
-
- // test for without status
parameter.
- given().get(
- http
- +
container.getHost()
- + colon
- +
container
-
.getFirstMappedPort()
- +
RestConstant
-
.FINISHED_JOBS_INFO)
- .then()
- .statusCode(200)
- .body(
- "[" + i.get()
+ "].jobName",
- equalTo(
-
i.get() == 0
-
? paramJobName
-
: jobName))
- .body(
- "[" + i.get()
+ "].errorMsg",
- equalTo(null))
- .body(
- "[" + i.get()
+ "].jobDag.jobId",
-
equalTo(Long.parseLong(jobId)))
- .body(
- "["
- +
i.get()
- +
"].metrics.SourceReceivedCount",
- equalTo("100"))
- .body(
- "["
- +
i.get()
- +
"].metrics.SinkWriteCount",
- equalTo("100"))
- .body(
- "[" + i.get()
+ "].jobStatus",
-
equalTo("FINISHED"));
- });
-
- i.getAndIncrement();
- });
+ @Test
+ public void testSubmitJobWithoutCustomJobId() {
+ AtomicInteger i = new AtomicInteger();
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> submitJobAndAssertResponse(container, i,
paramJobName, false));
}
@Test
@@ -459,4 +379,81 @@ public class ClusterSeaTunnelContainer extends
SeaTunnelContainer {
return server;
}
+
+ private void submitJobAndAssertResponse(
+ GenericContainer<? extends GenericContainer<?>> container,
+ AtomicInteger i,
+ String customParam,
+ boolean isCustomJobId) {
+ Response response = submitJobAndResponse(container, i, customParam);
+ String jobId = response.getBody().jsonPath().getString("jobId");
+ assertResponse(container, i, jobId, isCustomJobId);
+ i.getAndIncrement();
+ }
+
+ private Response submitJobAndResponse(
+ GenericContainer<? extends GenericContainer<?>> container,
+ AtomicInteger i,
+ String customParam) {
+ Response response =
+ i.get() == 0
+ ? submitJob(container, "BATCH", jobName, customParam)
+ : submitJob(container, "BATCH", jobName, null);
+ if (i.get() == 0) {
+ response.then().statusCode(200).body("jobName",
equalTo(paramJobName));
+ } else {
+ response.then().statusCode(200).body("jobName", equalTo(jobName));
+ }
+ return response;
+ }
+
+ private void assertResponse(
+ GenericContainer<? extends GenericContainer<?>> container,
+ AtomicInteger i,
+ String jobId,
+ boolean isCustomJobId) {
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ assertWithStatusParameterOrNot(
+ container, i, jobId, isCustomJobId, true);
+
+ // test for without status parameter.
+ assertWithStatusParameterOrNot(
+ container, i, jobId, isCustomJobId, false);
+ });
+ }
+
+ private void assertWithStatusParameterOrNot(
+ GenericContainer<? extends GenericContainer<?>> container,
+ AtomicInteger i,
+ String jobId,
+ boolean isCustomJobId,
+ boolean isStatusWithSubmitJob) {
+ String baseRestUrl = getBaseRestUrl(container);
+ String restUrl = isStatusWithSubmitJob ? baseRestUrl + "/FINISHED" :
baseRestUrl;
+ given().get(restUrl)
+ .then()
+ .statusCode(200)
+ .body("[" + i.get() + "].jobName", equalTo(i.get() == 0 ?
paramJobName : jobName))
+ .body("[" + i.get() + "].errorMsg", equalTo(null))
+ .body(
+ "[" + i.get() + "].jobId",
+ equalTo(
+ i.get() == 0 && isCustomJobId
+ ? Long.toString(CUSTOM_JOB_ID)
+ : jobId))
+ .body("[" + i.get() + "].metrics.SourceReceivedCount",
equalTo("100"))
+ .body("[" + i.get() + "].metrics.SinkWriteCount",
equalTo("100"))
+ .body("[" + i.get() + "].jobStatus", equalTo("FINISHED"));
+ }
+
+ private String getBaseRestUrl(GenericContainer<? extends
GenericContainer<?>> container) {
+ return http
+ + container.getHost()
+ + colon
+ + container.getFirstMappedPort()
+ + RestConstant.FINISHED_JOBS_INFO;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
index 6b822520c3..e250fdf936 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -38,7 +38,7 @@ import
org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.engine.server.utils.RestUtil;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
import com.hazelcast.internal.ascii.TextCommandService;
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
@@ -119,6 +119,8 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
boolean startWithSavePoint =
Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT));
+ String jobIdStr = requestParams.get(RestConstant.JOB_ID);
+ Long finalJobId = StringUtils.isNotBlank(jobIdStr) ?
Long.parseLong(jobIdStr) : null;
SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
RestJobExecutionEnvironment restJobExecutionEnvironment =
new RestJobExecutionEnvironment(
@@ -127,9 +129,7 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
config,
textCommandService.getNode(),
startWithSavePoint,
- startWithSavePoint
- ?
Long.parseLong(requestParams.get(RestConstant.JOB_ID))
- : null);
+ finalJobId);
JobImmutableInformation jobImmutableInformation =
restJobExecutionEnvironment.build();
Long jobId = jobImmutableInformation.getJobId();
if (!seaTunnelServer.isMasterNode()) {
@@ -137,12 +137,10 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
NodeEngineUtil.sendOperationToMasterNode(
getNode().nodeEngine,
new SubmitJobOperation(
- jobImmutableInformation.getJobId(),
-
getNode().nodeEngine.toData(jobImmutableInformation)))
+ jobId,
getNode().nodeEngine.toData(jobImmutableInformation)))
.join();
} else {
-
submitJob(seaTunnelServer, jobImmutableInformation, jobConfig);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
index a166d0a4d5..d13f1a49d8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
public class RestJobExecutionEnvironment extends AbstractJobEnvironment {
@@ -63,7 +64,7 @@ public class RestJobExecutionEnvironment extends
AbstractJobEnvironment {
this.nodeEngine = node.getNodeEngine();
this.jobConfig.setJobContext(
new JobContext(
- isStartWithSavePoint
+ Objects.nonNull(jobId)
? jobId
: nodeEngine
.getHazelcastInstance()