This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dd2fe00761 [Feature][Zeta][REST-API]Add REST API To Submit Job (#5107)
dd2fe00761 is described below
commit dd2fe0076196f4bfb24b702889456791f788c500
Author: fang <[email protected]>
AuthorDate: Tue Aug 8 11:56:15 2023 +0800
[Feature][Zeta][REST-API]Add REST API To Submit Job (#5107)
---
docs/en/seatunnel-engine/rest-api.md | 58 +++++++++
.../core/starter/utils/ConfigBuilder.java | 6 +
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 72 +++++++++++
.../engine/client/job/JobExecutionEnvironment.java | 91 +-------------
.../engine/core/job/AbstractJobEnvironment.java} | 78 +++---------
.../core/parse/MultipleTableJobConfigParser.java | 16 +++
.../seatunnel/engine/server/NodeExtension.java | 2 +
.../server/job/JobImmutableInformationEnv.java | 80 ++++++++++++
.../seatunnel/engine/server/rest/RestConstant.java | 1 +
.../server/rest/RestHttpPostCommandProcessor.java | 135 +++++++++++++++++++++
.../seatunnel/engine/server/utils/RestUtil.java | 65 ++++++++++
11 files changed, 456 insertions(+), 148 deletions(-)
diff --git a/docs/en/seatunnel-engine/rest-api.md
b/docs/en/seatunnel-engine/rest-api.md
index 2edec3496a..2f44421a3d 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -180,3 +180,61 @@ network:
------------------------------------------------------------------------------------------
+### Submit Job.
+
+<details>
+<summary><code>POST</code> <code><b>/hazelcast/rest/maps/submit-job</b></code>
<code>(Returns jobId and jobName if job submitted
successfully.)</code></summary>
+
+#### Parameters
+
+> | name | type | data type | description
|
+>
|----------------------|----------|-----------|-----------------------------------|
+> | jobId | optional | string | job id
|
+> | jobName | optional | string | job name
|
+> | isStartWithSavePoint | optional | string | if job is started with save
point |
+
+#### Body
+
+```json
+{
+ "env": {
+ "job.mode": "batch"
+ },
+ "source": [
+ {
+ "plugin_name": "FakeSource",
+ "result_table_name": "fake",
+ "row.num": 100,
+ "schema": {
+ "fields": {
+ "name": "string",
+ "age": "int",
+ "card": "int"
+ }
+ }
+ }
+ ],
+ "transform": [
+ ],
+ "sink": [
+ {
+ "plugin_name": "Console",
+ "source_table_name": ["fake"]
+ }
+ ]
+}
+```
+
+#### Responses
+
+```json
+{
+ "jobId": 733584788375666689,
+ "jobName": "rest_api_test"
+}
+```
+
+</details>
+
+------------------------------------------------------------------------------------------
+
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
index ed66b550a0..ad063acac8 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
@@ -69,6 +69,12 @@ public class ConfigBuilder {
return config;
}
+ public static Config of(@NonNull Map<String, Object> objectMap) {
+ log.info("Loading config file from objectMap");
+ Config config = ConfigFactory.parseMap(objectMap);
+ return ConfigShadeUtils.decryptConfig(config);
+ }
+
public static Config of(@NonNull ConfigAdapter configAdapter, @NonNull
Path filePath) {
log.info("With config adapter spi {}",
configAdapter.getClass().getName());
try {
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 5f4e97ac8d..d38d1c732f 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -22,10 +22,12 @@ import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.rest.RestConstant;
@@ -37,6 +39,7 @@ import org.junit.jupiter.api.Test;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import io.restassured.response.Response;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@@ -131,6 +134,75 @@ public class RestApiIT {
.statusCode(200);
}
+ @Test
+ public void testSubmitJob() {
+ String requestBody =
+ "{\n"
+ + " \"env\": {\n"
+ + " \"job.mode\": \"batch\"\n"
+ + " },\n"
+ + " \"source\": [\n"
+ + " {\n"
+ + " \"plugin_name\": \"FakeSource\",\n"
+ + " \"result_table_name\": \"fake\",\n"
+ + " \"row.num\": 100,\n"
+ + " \"schema\": {\n"
+ + " \"fields\": {\n"
+ + " \"name\": \"string\",\n"
+ + " \"age\": \"int\",\n"
+ + " \"card\": \"int\"\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " ],\n"
+ + " \"transform\": [\n"
+ + " ],\n"
+ + " \"sink\": [\n"
+ + " {\n"
+ + " \"plugin_name\": \"Console\",\n"
+ + " \"source_table_name\": [\"fake\"]\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
+ String parameters = "jobId=1&jobName=test&isStartWithSavePoint=false";
+ // Only jobName is compared because jobId is randomly generated if
isStartWithSavePoint is
+ // false
+ Response response =
+ given().body(requestBody)
+ .post(
+ HOST
+ + hazelcastInstance
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ + RestConstant.SUBMIT_JOB_URL
+ + "?"
+ + parameters);
+
+ response.then().statusCode(200).body("jobName", equalTo("test"));
+ String jobId = response.getBody().jsonPath().getString("jobId");
+ SeaTunnelServer seaTunnelServer =
+ (SeaTunnelServer)
+ hazelcastInstance
+ .node
+ .getNodeExtension()
+ .createExtensionServices()
+ .get(Constant.SEATUNNEL_SERVICE_NAME);
+ JobStatus jobStatus =
+
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
+ Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
+ seaTunnelServer
+ .getCoordinatorService()
+
.getJobStatus(Long.parseLong(jobId))));
+ }
+
@AfterAll
static void afterClass() {
if (hazelcastInstance != null) {
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index bf3169e4c8..3f870c6121 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -18,55 +18,19 @@
package org.apache.seatunnel.engine.client.job;
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
-import org.apache.seatunnel.engine.common.utils.IdGenerator;
-import org.apache.seatunnel.engine.core.dag.actions.Action;
-import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
-import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
+import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
-
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-public class JobExecutionEnvironment {
-
- private static final ILogger LOGGER =
Logger.getLogger(JobExecutionEnvironment.class);
-
- private final boolean isStartWithSavePoint;
-
- private final JobConfig jobConfig;
-
- private final List<Action> actions = new ArrayList<>();
-
- private final Set<URL> jarUrls = new HashSet<>();
- private final List<URL> commonPluginJars = new ArrayList<>();
+public class JobExecutionEnvironment extends AbstractJobEnvironment {
private final String jobFilePath;
- private final IdGenerator idGenerator;
-
private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
private final JobClient jobClient;
@@ -78,35 +42,12 @@ public class JobExecutionEnvironment {
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
boolean isStartWithSavePoint,
Long jobId) {
- this.jobConfig = jobConfig;
+ super(jobConfig, isStartWithSavePoint);
this.jobFilePath = jobFilePath;
- this.idGenerator = new IdGenerator();
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobClient = new JobClient(seaTunnelHazelcastClient);
- this.isStartWithSavePoint = isStartWithSavePoint;
this.jobConfig.setJobContext(
new JobContext(isStartWithSavePoint ? jobId :
jobClient.getNewJobId()));
- this.commonPluginJars.addAll(searchPluginJars());
- this.commonPluginJars.addAll(
- new ArrayList<>(
- Common.getThirdPartyJars(
- jobConfig
- .getEnvOptions()
-
.getOrDefault(EnvCommonOptions.JARS.key(), "")
- .toString())
- .stream()
- .map(Path::toUri)
- .map(
- uri -> {
- try {
- return uri.toURL();
- } catch (MalformedURLException e) {
- throw new
SeaTunnelEngineException(
- "the uri of jar
illegal:" + uri, e);
- }
- })
- .collect(Collectors.toList())));
- LOGGER.info("add common jar in plugins :" + commonPluginJars);
}
public JobExecutionEnvironment(
@@ -117,27 +58,12 @@ public class JobExecutionEnvironment {
}
/** Search all jars in SEATUNNEL_HOME/plugins */
- private Set<URL> searchPluginJars() {
- try {
- if (Files.exists(Common.pluginRootDir())) {
- return new
HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir()));
- }
- } catch (IOException | SeaTunnelEngineException e) {
- LOGGER.warning(
- String.format("Can't search plugin jars in %s.",
Common.pluginRootDir()), e);
- }
- return Collections.emptySet();
- }
-
- private MultipleTableJobConfigParser getJobConfigParser() {
+ @Override
+ protected MultipleTableJobConfigParser getJobConfigParser() {
return new MultipleTableJobConfigParser(
jobFilePath, idGenerator, jobConfig, commonPluginJars,
isStartWithSavePoint);
}
- private LogicalDagGenerator getLogicalDagGenerator() {
- return new LogicalDagGenerator(actions, jobConfig, idGenerator);
- }
-
public ClientJobProxy execute() throws ExecutionException,
InterruptedException {
JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
@@ -150,11 +76,4 @@ public class JobExecutionEnvironment {
return jobClient.createJobProxy(jobImmutableInformation);
}
-
- private LogicalDag getLogicalDag() {
- ImmutablePair<List<Action>, Set<URL>> immutablePair =
getJobConfigParser().parse();
- actions.addAll(immutablePair.getLeft());
- jarUrls.addAll(immutablePair.getRight());
- return getLogicalDagGenerator().generate();
- }
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
similarity index 59%
copy from
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
copy to
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
index bf3169e4c8..3509903c08 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.client.job;
+package org.apache.seatunnel.engine.core.job;
-import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.utils.FileUtils;
-import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
-import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -46,46 +43,27 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
-public class JobExecutionEnvironment {
+public abstract class AbstractJobEnvironment {
+ protected static ILogger LOGGER = null;
- private static final ILogger LOGGER =
Logger.getLogger(JobExecutionEnvironment.class);
+ protected final boolean isStartWithSavePoint;
- private final boolean isStartWithSavePoint;
+ protected final List<Action> actions = new ArrayList<>();
+ protected final Set<URL> jarUrls = new HashSet<>();
- private final JobConfig jobConfig;
+ protected final JobConfig jobConfig;
- private final List<Action> actions = new ArrayList<>();
+ protected final IdGenerator idGenerator;
- private final Set<URL> jarUrls = new HashSet<>();
+ protected final List<URL> commonPluginJars = new ArrayList<>();
- private final List<URL> commonPluginJars = new ArrayList<>();
-
- private final String jobFilePath;
-
- private final IdGenerator idGenerator;
-
- private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
-
- private final JobClient jobClient;
-
- /** If the JobId is not empty, it is used to restore job from savePoint */
- public JobExecutionEnvironment(
- JobConfig jobConfig,
- String jobFilePath,
- SeaTunnelHazelcastClient seaTunnelHazelcastClient,
- boolean isStartWithSavePoint,
- Long jobId) {
+ public AbstractJobEnvironment(JobConfig jobConfig, boolean
isStartWithSavePoint) {
+ LOGGER = Logger.getLogger(getClass().getName());
this.jobConfig = jobConfig;
- this.jobFilePath = jobFilePath;
- this.idGenerator = new IdGenerator();
- this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
- this.jobClient = new JobClient(seaTunnelHazelcastClient);
this.isStartWithSavePoint = isStartWithSavePoint;
- this.jobConfig.setJobContext(
- new JobContext(isStartWithSavePoint ? jobId :
jobClient.getNewJobId()));
+ this.idGenerator = new IdGenerator();
this.commonPluginJars.addAll(searchPluginJars());
this.commonPluginJars.addAll(
new ArrayList<>(
@@ -109,15 +87,7 @@ public class JobExecutionEnvironment {
LOGGER.info("add common jar in plugins :" + commonPluginJars);
}
- public JobExecutionEnvironment(
- JobConfig jobConfig,
- String jobFilePath,
- SeaTunnelHazelcastClient seaTunnelHazelcastClient) {
- this(jobConfig, jobFilePath, seaTunnelHazelcastClient, false, null);
- }
-
- /** Search all jars in SEATUNNEL_HOME/plugins */
- private Set<URL> searchPluginJars() {
+ protected Set<URL> searchPluginJars() {
try {
if (Files.exists(Common.pluginRootDir())) {
return new
HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir()));
@@ -129,29 +99,13 @@ public class JobExecutionEnvironment {
return Collections.emptySet();
}
- private MultipleTableJobConfigParser getJobConfigParser() {
- return new MultipleTableJobConfigParser(
- jobFilePath, idGenerator, jobConfig, commonPluginJars,
isStartWithSavePoint);
- }
+ protected abstract MultipleTableJobConfigParser getJobConfigParser();
- private LogicalDagGenerator getLogicalDagGenerator() {
+ protected LogicalDagGenerator getLogicalDagGenerator() {
return new LogicalDagGenerator(actions, jobConfig, idGenerator);
}
- public ClientJobProxy execute() throws ExecutionException,
InterruptedException {
- JobImmutableInformation jobImmutableInformation =
- new JobImmutableInformation(
- Long.parseLong(jobConfig.getJobContext().getJobId()),
- jobConfig.getName(),
- isStartWithSavePoint,
-
seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
- jobConfig,
- new ArrayList<>(jarUrls));
-
- return jobClient.createJobProxy(jobImmutableInformation);
- }
-
- private LogicalDag getLogicalDag() {
+ protected LogicalDag getLogicalDag() {
ImmutablePair<List<Action>, Set<URL>> immutablePair =
getJobConfigParser().parse();
actions.addAll(immutablePair.getLeft());
jarUrls.addAll(immutablePair.getRight());
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 86c0f3c94f..ee2505286f 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -130,6 +130,22 @@ public class MultipleTableJobConfigParser {
new JobConfigParser(idGenerator, commonPluginJars,
isStartWithSavePoint);
}
+ public MultipleTableJobConfigParser(
+ Config seaTunnelJobConfig,
+ IdGenerator idGenerator,
+ JobConfig jobConfig,
+ List<URL> commonPluginJars,
+ boolean isStartWithSavePoint) {
+ this.idGenerator = idGenerator;
+ this.jobConfig = jobConfig;
+ this.commonPluginJars = commonPluginJars;
+ this.isStartWithSavePoint = isStartWithSavePoint;
+ this.seaTunnelJobConfig = seaTunnelJobConfig;
+ this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
+ this.fallbackParser =
+ new JobConfigParser(idGenerator, commonPluginJars,
isStartWithSavePoint);
+ }
+
public ImmutablePair<List<Action>, Set<URL>> parse() {
List<? extends Config> sourceConfigs =
TypesafeConfigUtils.getConfigList(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
index d4137955c8..37e00cffab 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
@@ -21,6 +21,7 @@ import
org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor;
import org.apache.seatunnel.engine.server.rest.RestHttpGetCommandProcessor;
+import org.apache.seatunnel.engine.server.rest.RestHttpPostCommandProcessor;
import com.hazelcast.cluster.ClusterState;
import com.hazelcast.instance.impl.DefaultNodeExtension;
@@ -79,6 +80,7 @@ public class NodeExtension extends DefaultNodeExtension {
register(HTTP_GET, new Log4j2HttpGetCommandProcessor(this));
register(HTTP_POST, new Log4j2HttpPostCommandProcessor(this));
register(HTTP_GET, new RestHttpGetCommandProcessor(this));
+ register(HTTP_POST, new RestHttpPostCommandProcessor(this));
}
};
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java
new file mode 100644
index 0000000000..4dd72e31cb
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.job;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
+
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import java.util.ArrayList;
+
+public class JobImmutableInformationEnv extends AbstractJobEnvironment {
+ private final Config seaTunnelJobConfig;
+
+ private final NodeEngineImpl nodeEngine;
+
+ private final Long jobId;
+
+ public JobImmutableInformationEnv(
+ JobConfig jobConfig,
+ Config seaTunnelJobConfig,
+ Node node,
+ boolean isStartWithSavePoint,
+ Long jobId) {
+ super(jobConfig, isStartWithSavePoint);
+ this.seaTunnelJobConfig = seaTunnelJobConfig;
+ this.nodeEngine = node.getNodeEngine();
+ this.jobConfig.setJobContext(
+ new JobContext(
+ isStartWithSavePoint
+ ? jobId
+ : nodeEngine
+ .getHazelcastInstance()
+
.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)
+ .newId()));
+ this.jobId = Long.valueOf(jobConfig.getJobContext().getJobId());
+ }
+
+ public Long getJobId() {
+ return jobId;
+ }
+
+ @Override
+ protected MultipleTableJobConfigParser getJobConfigParser() {
+ return new MultipleTableJobConfigParser(
+ seaTunnelJobConfig, idGenerator, jobConfig, commonPluginJars,
isStartWithSavePoint);
+ }
+
+ public JobImmutableInformation build() {
+ return new JobImmutableInformation(
+ Long.parseLong(jobConfig.getJobContext().getJobId()),
+ jobConfig.getName(),
+ isStartWithSavePoint,
+ nodeEngine.getSerializationService().toData(getLogicalDag()),
+ jobConfig,
+ new ArrayList<>(jarUrls));
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 0a5d8437be..7776d592b8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -21,6 +21,7 @@ public class RestConstant {
public static final String RUNNING_JOBS_URL =
"/hazelcast/rest/maps/running-jobs";
public static final String RUNNING_JOB_URL =
"/hazelcast/rest/maps/running-job";
+ public static final String SUBMIT_JOB_URL =
"/hazelcast/rest/maps/submit-job";
public static final String SYSTEM_MONITORING_INFORMATION =
"/hazelcast/rest/maps/system-monitoring-information";
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
new file mode 100644
index 0000000000..e0edd93203
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.CoordinatorService;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.job.JobImmutableInformationEnv;
+import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor;
+import org.apache.seatunnel.engine.server.utils.RestUtil;
+
+import com.hazelcast.internal.ascii.TextCommandService;
+import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
+import com.hazelcast.internal.ascii.rest.HttpPostCommand;
+import com.hazelcast.internal.json.JsonObject;
+import com.hazelcast.internal.serialization.Data;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_400;
+import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.SUBMIT_JOB_URL;
+
+public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostCommand> {
+ private final Log4j2HttpPostCommandProcessor original;
+
+ public RestHttpPostCommandProcessor(TextCommandService textCommandService)
{
+ this(textCommandService, new
Log4j2HttpPostCommandProcessor(textCommandService));
+ }
+
+ protected RestHttpPostCommandProcessor(
+ TextCommandService textCommandService,
+ Log4j2HttpPostCommandProcessor log4j2HttpPostCommandProcessor) {
+ super(
+ textCommandService,
+
textCommandService.getNode().getLogger(Log4j2HttpPostCommandProcessor.class));
+ this.original = log4j2HttpPostCommandProcessor;
+ }
+
+ @Override
+ public void handle(HttpPostCommand httpPostCommand) {
+ String uri = httpPostCommand.getURI();
+ try {
+ if (uri.startsWith(SUBMIT_JOB_URL)) {
+ handleSubmitJob(httpPostCommand, uri);
+ } else {
+ original.handle(httpPostCommand);
+ }
+ } catch (IllegalArgumentException e) {
+ prepareResponse(SC_400, httpPostCommand, exceptionResponse(e));
+ } catch (Throwable e) {
+ logger.warning("An error occurred while handling request " +
httpPostCommand, e);
+ prepareResponse(SC_500, httpPostCommand, exceptionResponse(e));
+ }
+
+ this.textCommandService.sendResponse(httpPostCommand);
+ }
+
+ private SeaTunnelServer getSeaTunnelServer() {
+ Map<String, Object> extensionServices =
+
this.textCommandService.getNode().getNodeExtension().createExtensionServices();
+ return (SeaTunnelServer)
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+ }
+
+ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
+ throws IllegalArgumentException {
+ Map<String, String> requestParams = new HashMap<>();
+ RestUtil.buildRequestParams(requestParams, uri);
+ byte[] requestBody = httpPostCommand.getData();
+ if (requestBody.length == 0) {
+ throw new IllegalArgumentException("Request body is empty.");
+ }
+ JsonNode requestBodyJsonNode;
+ try {
+ requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Invalid JSON format in request
body.");
+ }
+ Config config = RestUtil.buildConfig(requestBodyJsonNode);
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(requestParams.get("jobName"));
+ JobImmutableInformationEnv jobImmutableInformationEnv =
+ new JobImmutableInformationEnv(
+ jobConfig,
+ config,
+ textCommandService.getNode(),
+
Boolean.parseBoolean(requestParams.get("isStartWithSavePoint")),
+ Long.parseLong(requestParams.get("jobId")));
+ JobImmutableInformation jobImmutableInformation =
jobImmutableInformationEnv.build();
+ CoordinatorService coordinatorService =
getSeaTunnelServer().getCoordinatorService();
+ Data data =
+ textCommandService
+ .getNode()
+ .nodeEngine
+ .getSerializationService()
+ .toData(jobImmutableInformation);
+ PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+ coordinatorService.submitJob(
+ Long.parseLong(jobConfig.getJobContext().getJobId()),
data);
+ voidPassiveCompletableFuture.join();
+
+ Long jobId = jobImmutableInformationEnv.getJobId();
+ this.prepareResponse(
+ httpPostCommand,
+ new JsonObject().add("jobId", jobId).add("jobName",
requestParams.get("jobName")));
+ }
+
+ @Override
+ public void handleRejection(HttpPostCommand httpPostCommand) {
+ handle(httpPostCommand);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
new file mode 100644
index 0000000000..d3761366d0
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.utils;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+
+import com.hazelcast.internal.util.StringUtil;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class RestUtil {
+ private RestUtil() {}
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ public static JsonNode convertByteToJsonNode(byte[] byteData) throws
IOException {
+ return objectMapper.readTree(byteData);
+ }
+
+ public static void buildRequestParams(Map<String, String> requestParams,
String uri) {
+ requestParams.put("jobId", null);
+ requestParams.put("jobName", Constants.LOGO);
+ requestParams.put("isStartWithSavePoint", String.valueOf(false));
+ uri = StringUtil.stripTrailingSlash(uri);
+ if (!uri.contains("?")) {
+ return;
+ }
+ int indexEnd = uri.indexOf('?');
+ try {
+ for (String s : uri.substring(indexEnd + 1).split("&")) {
+ String[] param = s.split("=");
+ requestParams.put(param[0], param[1]);
+ }
+ } catch (IndexOutOfBoundsException e) {
+ throw new IllegalArgumentException("Invalid Params format in
Params.");
+ }
+ }
+
+ public static Config buildConfig(JsonNode jsonNode) {
+ Map<String, Object> objectMap = JsonUtils.toMap(jsonNode);
+ return ConfigBuilder.of(objectMap);
+ }
+}