This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8a894ac063 [Feature][Zeta] Support SQL format in REST API (#9802)
8a894ac063 is described below
commit 8a894ac0631c17c956d351e13d80adaddb3a7708
Author: xiaochen <[email protected]>
AuthorDate: Mon Sep 1 09:32:17 2025 +0800
[Feature][Zeta] Support SQL format in REST API (#9802)
---
docs/en/seatunnel-engine/rest-api-v2.md | 64 +++++++--
docs/zh/seatunnel-engine/rest-api-v2.md | 56 +++++++-
.../seatunnel/config/sql/SqlConfigBuilder.java | 20 +++
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 146 +++++++++++++++++++++
seatunnel-engine/seatunnel-engine-server/pom.xml | 5 +
.../seatunnel/engine/server/rest/ConfigFormat.java | 48 +++++++
.../seatunnel/engine/server/rest/RestConstant.java | 2 -
.../engine/server/rest/service/JobInfoService.java | 22 +++-
.../engine/server/rest/servlet/BaseServlet.java | 21 ++-
.../rest/servlet/SubmitJobByUploadFileServlet.java | 41 +++++-
.../server/rest/servlet/SubmitJobServlet.java | 9 +-
.../engine/server/rest/BaseServletTest.java | 70 +++++++++-
12 files changed, 456 insertions(+), 48 deletions(-)
diff --git a/docs/en/seatunnel-engine/rest-api-v2.md
b/docs/en/seatunnel-engine/rest-api-v2.md
index 00aa288b3d..5f62c23c95 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -400,16 +400,16 @@ When we can't get the job info, the response will be:
#### Parameters
-> | name | type | data type | description
|
->
|----------------------|----------|-----------|-----------------------------------|
-> | jobId | optional | string | job id
|
-> | jobName | optional | string | job name
|
-> | isStartWithSavePoint | optional | string | if job is started with save
point |
-> | format | optional | string | config format, support json
and hocon, default json |
+> | name | type | data type | description
|
+>
|----------------------|----------|-----------|----------------------------------------------------------|
+> | jobId | optional | string | job id
|
+> | jobName | optional | string | job name
|
+> | isStartWithSavePoint | optional | string | if job is started with save
point |
+> | format | optional | string | config format, support json,
hocon and sql, default json |
#### Body
-You can choose json or hocon to pass request body.
+You can choose json, hocon or sql to pass request body.
The json format example:
``` json
{
@@ -471,6 +471,46 @@ sink {
```
+The SQL format example:
+```sql
+/* config
+env {
+ parallelism = 2
+ job.mode = "BATCH"
+}
+*/
+
+CREATE TABLE fake_source (
+ id INT,
+ name STRING,
+ age INT
+) WITH (
+ 'connector' = 'FakeSource',
+ 'rows' = '[
+ { fields = [1, "Alice", 25], kind = INSERT },
+ { fields = [2, "Bob", 30], kind = INSERT }
+ ]',
+ 'schema' = '{
+ fields {
+ id = "int",
+ name = "string",
+ age = "int"
+ }
+ }',
+ 'type' = 'source'
+);
+
+CREATE TABLE console_sink (
+ id INT,
+ name STRING,
+ age INT
+) WITH (
+ 'connector' = 'Console',
+ 'type' = 'sink'
+);
+
+INSERT INTO console_sink SELECT * FROM fake_source;
+```
#### Responses
@@ -499,12 +539,18 @@ sink {
> | isStartWithSavePoint | optional | string | if job is started with save
> point |
#### Request Body
-The name of the uploaded file key is config_file, and the file suffix json is
parsed in json format. The conf or config file suffix is parsed in hocon format
+The name of the uploaded file key is config_file, and supports the following
formats:
+- `.json` files: parsed in JSON format
+- `.conf` or `.config` files: parsed in HOCON format
+- `.sql` files: parsed in SQL format, supports CREATE TABLE and INSERT INTO
syntax
curl Example :
-```
+```bash
+# Upload HOCON config file
curl --location 'http://127.0.0.1:8080/submit-job/upload' --form
'config_file=@"/temp/fake_to_console.conf"'
+# Upload SQL config file
+curl --location 'http://127.0.0.1:8080/submit-job/upload' --form
'config_file=@"/temp/job.sql"'
```
#### Responses
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md
b/docs/zh/seatunnel-engine/rest-api-v2.md
index 124fa5a99a..dccce2640b 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -386,11 +386,11 @@ seatunnel:
> | jobId | optional | string | job id
> |
> | jobName | optional | string | job name
> |
> | isStartWithSavePoint | optional | string | if job is started with save
> point |
-> | format | optional | string | 配置风格,支持json和hocon,默认 json
|
+> | format | optional | string | 配置风格,支持json、hocon 和 sql,默认
json |
#### 请求体
-你可以选择用json或者hocon的方式来传递请求体。
+你可以选择用json、hocon或者sql的方式来传递请求体。
Json请求示例:
```json
{
@@ -451,6 +451,48 @@ sink {
}
}
+```
+
+SQL请求示例:
+
+```sql
+/* config
+env {
+ parallelism = 2
+ job.mode = "BATCH"
+}
+*/
+
+CREATE TABLE fake_source (
+ id INT,
+ name STRING,
+ age INT
+) WITH (
+ 'connector' = 'FakeSource',
+ 'rows' = '[
+ { fields = [1, "Alice", 25], kind = INSERT },
+ { fields = [2, "Bob", 30], kind = INSERT }
+ ]',
+ 'schema' = '{
+ fields {
+ id = "int",
+ name = "string",
+ age = "int"
+ }
+ }',
+ 'type' = 'source'
+);
+
+CREATE TABLE console_sink (
+ id INT,
+ name STRING,
+ age INT
+) WITH (
+ 'connector' = 'Console',
+ 'type' = 'sink'
+);
+
+INSERT INTO console_sink SELECT * FROM fake_source;
```
#### 响应
@@ -478,13 +520,19 @@ sink {
> | isStartWithSavePoint | optional | string | if job is started with save
> point |
#### 请求体
-上传文件key的名称是config_file,文件后缀json的按照json格式来解析,conf或config文件后缀按照hocon格式解析
+上传文件key的名称是config_file,支持以下格式:
+- `.json` 文件:按照 JSON 格式解析
+- `.conf` 或 `.config` 文件:按照 HOCON 格式解析
+- `.sql` 文件:按照 SQL 格式解析,支持 CREATE TABLE 和 INSERT INTO 语法
curl Example
-```
+```bash
+# 上传 HOCON 配置文件
curl --location 'http://127.0.0.1:8080/submit-job/upload' --form
'config_file=@"/temp/fake_to_console.conf"'
+# 上传 SQL 配置文件
+curl --location 'http://127.0.0.1:8080/submit-job/upload' --form
'config_file=@"/temp/job.sql"'
```
#### 响应
diff --git
a/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
b/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
index 2e5fc7777a..e526c39e3b 100644
---
a/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
+++
b/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
@@ -49,6 +49,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -81,6 +82,25 @@ public class SqlConfigBuilder {
public static Config of(@NonNull Path sqlFilePath) {
try {
List<String> lines = Files.readAllLines(sqlFilePath);
+ return of(lines);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to parse job config file: " +
sqlFilePath, e);
+ }
+ }
+
+ public static Config of(@NonNull String sqlContent) {
+ try {
+ List<String> lines = new ArrayList<>();
+ String[] lineArray = sqlContent.split("\\r?\\n");
+ Collections.addAll(lines, lineArray);
+ return of(lines);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to parse job config: ", e);
+ }
+ }
+
+ private static Config of(@NonNull List<String> lines) {
+ try {
Map<String, BaseConfig> sqlTables = new LinkedHashMap<>();
SeaTunnelConfig seaTunnelConfig = new SeaTunnelConfig();
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index b03163c769..8c4c606d48 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -1022,6 +1022,152 @@ public class RestApiIT {
});
}
+ @Test
+ public void testSubmitJobWithSqlFormat() {
+ String sqlConfig =
+ "/* config\n"
+ + "env {\n"
+ + " parallelism = 1\n"
+ + " job.mode = \"BATCH\"\n"
+ + "}\n"
+ + "*/\n"
+ + "\n"
+ + "CREATE TABLE test_source (\n"
+ + " id INT,\n"
+ + " name STRING,\n"
+ + " c_time TIMESTAMP\n"
+ + ") WITH (\n"
+ + " 'connector' = 'FakeSource',\n"
+ + " 'schema' = '{ \n"
+ + " fields { \n"
+ + " id = \"int\", \n"
+ + " name = \"string\",\n"
+ + " c_time = \"timestamp\"\n"
+ + " } \n"
+ + " }',\n"
+ + " 'rows' = '[ \n"
+ + " { fields = [1, \"test\", null], kind = INSERT
}\n"
+ + " ]',\n"
+ + " 'type' = 'source'\n"
+ + ");\n"
+ + "\n"
+ + "CREATE TABLE test_sink (\n"
+ + " id INT,\n"
+ + " name STRING,\n"
+ + " c_time TIMESTAMP\n"
+ + ") WITH (\n"
+ + " 'connector' = 'Console',\n"
+ + " 'type' = 'sink'\n"
+ + ");\n"
+ + "\n"
+ + "INSERT INTO test_sink SELECT * FROM test_source;";
+
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance -> {
+ ports.forEach(
+ (key, value) -> {
+ given().body(sqlConfig)
+ .queryParam("format", "sql")
+ .queryParam("jobName",
"test-sql-job")
+ .post(HOST + key +
CONTEXT_PATH + "/submit-job")
+ .then()
+ .statusCode(200)
+ .body("jobId", notNullValue())
+ .body("jobName",
equalTo("test-sql-job"));
+ });
+ });
+ }
+
+ @Test
+ public void testSubmitJobWithJsonFormat() {
+ String jsonConfig =
+ "{\n"
+ + " \"env\": {\n"
+ + " \"parallelism\": 1,\n"
+ + " \"job.mode\": \"BATCH\"\n"
+ + " },\n"
+ + " \"source\": [\n"
+ + " {\n"
+ + " \"plugin_name\": \"FakeSource\",\n"
+ + " \"plugin_output\": \"fake\",\n"
+ + " \"row.num\": 2,\n"
+ + " \"schema\": {\n"
+ + " \"fields\": {\n"
+ + " \"name\": \"string\",\n"
+ + " \"age\": \"int\"\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " ],\n"
+ + " \"sink\": [\n"
+ + " {\n"
+ + " \"plugin_name\": \"Console\",\n"
+ + " \"plugin_input\": [\"fake\"]\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
+
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance -> {
+ ports.forEach(
+ (key, value) -> {
+ given().body(jsonConfig)
+ .queryParam("jobName",
"test-json-job")
+ .post(HOST + key +
CONTEXT_PATH + "/submit-job")
+ .then()
+ .statusCode(200)
+ .body("jobId", notNullValue())
+ .body("jobName",
equalTo("test-json-job"));
+ });
+ });
+ }
+
+ @Test
+ public void testSubmitJobWithHoconFormat() {
+ String hoconConfig =
+ "env {\n"
+ + " parallelism = 1\n"
+ + " job.mode = \"BATCH\"\n"
+ + "}\n"
+ + "\n"
+ + "source {\n"
+ + " FakeSource {\n"
+ + " plugin_output = \"fake\"\n"
+ + " row.num = 2\n"
+ + " schema = {\n"
+ + " fields {\n"
+ + " name = \"string\"\n"
+ + " age = \"int\"\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + "}\n"
+ + "\n"
+ + "sink {\n"
+ + " Console {\n"
+ + " plugin_input = \"fake\"\n"
+ + " }\n"
+ + "}";
+
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance -> {
+ ports.forEach(
+ (key, value) -> {
+ given().body(hoconConfig)
+ .queryParam("format", "hocon")
+ .queryParam("jobName",
"test-hocon-job")
+ .post(HOST + key +
CONTEXT_PATH + "/submit-job")
+ .then()
+ .statusCode(200)
+ .body("jobId", notNullValue())
+ .body("jobName",
equalTo("test-hocon-job"));
+ });
+ });
+ }
+
@AfterEach
void afterClass() {
if (engineClient != null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml
b/seatunnel-engine/seatunnel-engine-server/pom.xml
index 7e36f2c98e..d9e1c860c5 100644
--- a/seatunnel-engine/seatunnel-engine-server/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-server/pom.xml
@@ -66,6 +66,11 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-config-sql</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/ConfigFormat.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/ConfigFormat.java
new file mode 100644
index 0000000000..e830defea4
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/ConfigFormat.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest;
+
+public enum ConfigFormat {
+ JSON("json"),
+ HOCON("hocon"),
+ SQL("sql");
+
+ private final String value;
+
+ ConfigFormat(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public static ConfigFormat fromString(String value) {
+ if (value == null) {
+ return JSON;
+ }
+
+ for (ConfigFormat format : ConfigFormat.values()) {
+ if (format.value.equalsIgnoreCase(value)) {
+ return format;
+ }
+ }
+
+ return JSON;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 2e756308e1..f3e28e1329 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -49,8 +49,6 @@ public class RestConstant {
public static final String METRICS = "metrics";
- public static final String HOCON = "hocon";
-
public static final String TABLE_SOURCE_RECEIVED_COUNT =
"TableSourceReceivedCount";
public static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount";
public static final String TABLE_SOURCE_RECEIVED_QPS =
"TableSourceReceivedQPS";
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
index 22d3138aee..977aa14e19 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
@@ -22,12 +22,14 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.config.sql.SqlConfigBuilder;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
+import org.apache.seatunnel.engine.server.rest.ConfigFormat;
import org.apache.seatunnel.engine.server.rest.RestConstant;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.engine.server.utils.RestUtil;
@@ -36,6 +38,7 @@ import com.hazelcast.internal.json.JsonArray;
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
import scala.Tuple2;
import java.nio.charset.StandardCharsets;
@@ -45,8 +48,8 @@ import java.util.List;
import java.util.Map;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.CONFIG_FORMAT;
-import static org.apache.seatunnel.engine.server.rest.RestConstant.HOCON;
+@Slf4j
public class JobInfoService extends BaseService {
public JobInfoService(NodeEngineImpl nodeEngine) {
@@ -163,11 +166,18 @@ public class JobInfoService extends BaseService {
throw new IllegalArgumentException("Please provide jobId when
start with save point.");
}
Config config;
- if (HOCON.equalsIgnoreCase(requestParams.get(CONFIG_FORMAT))) {
- String requestBodyStr = new String(requestBody,
StandardCharsets.UTF_8);
- config = ConfigFactory.parseString(requestBodyStr);
- } else {
- config = RestUtil.buildConfig(requestHandle(requestBody), false);
+ ConfigFormat configFormat =
ConfigFormat.fromString(requestParams.get(CONFIG_FORMAT));
+ switch (configFormat) {
+ case HOCON:
+ config = ConfigFactory.parseString(new String(requestBody,
StandardCharsets.UTF_8));
+ break;
+ case SQL:
+ config = SqlConfigBuilder.of(new String(requestBody,
StandardCharsets.UTF_8));
+ break;
+ case JSON:
+ default:
+ config = RestUtil.buildConfig(requestHandle(requestBody),
false);
+ break;
}
SeaTunnelServer seaTunnelServer = getSeaTunnelServer(false);
return submitJobInternal(config, requestParams, seaTunnelServer,
nodeEngine.getNode());
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
index b373211f28..b8fdcb12a4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.rest.servlet;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.rest.ConfigFormat;
import com.google.gson.Gson;
import com.hazelcast.internal.json.JsonArray;
@@ -110,13 +111,17 @@ public class BaseServlet extends HttpServlet {
return seaTunnelServer;
}
- protected byte[] requestBody(HttpServletRequest req) throws IOException {
+ protected byte[] requestBody(HttpServletRequest req, ConfigFormat
configFormat)
+ throws IOException {
StringBuilder stringBuilder = new StringBuilder();
String line;
try (BufferedReader reader = req.getReader()) {
while ((line = reader.readLine()) != null) {
stringBuilder.append(line);
+ if (ConfigFormat.JSON != configFormat) {
+ stringBuilder.append("\n");
+ }
}
}
@@ -124,18 +129,8 @@ public class BaseServlet extends HttpServlet {
return requestBody.getBytes(StandardCharsets.UTF_8);
}
- public byte[] requestHoconBody(HttpServletRequest req) throws IOException {
- StringBuilder stringBuilder = new StringBuilder();
- String line;
-
- try (BufferedReader reader = req.getReader()) {
- while ((line = reader.readLine()) != null) {
- stringBuilder.append(line).append("\n");
- }
- }
-
- String requestBody = stringBuilder.toString();
- return requestBody.getBytes(StandardCharsets.UTF_8);
+ protected byte[] requestBody(HttpServletRequest req) throws IOException {
+ return requestBody(req, ConfigFormat.JSON);
}
protected Map<String, String> getParameterMap(HttpServletRequest req) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
index 2db376da06..3818ae94fc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
@@ -22,11 +22,14 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigSyntax;
+import org.apache.seatunnel.config.sql.SqlConfigBuilder;
+import org.apache.seatunnel.engine.server.rest.ConfigFormat;
import org.apache.seatunnel.engine.server.rest.service.JobInfoService;
import org.apache.commons.io.IOUtils;
import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@@ -36,6 +39,7 @@ import javax.servlet.http.Part;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+@Slf4j
public class SubmitJobByUploadFileServlet extends BaseServlet {
private final JobInfoService jobInfoService;
@@ -52,13 +56,38 @@ public class SubmitJobByUploadFileServlet extends
BaseServlet {
String submittedFileName = filePart.getSubmittedFileName();
String content = IOUtils.toString(filePart.getInputStream(),
StandardCharsets.UTF_8);
Config config;
- if (submittedFileName.endsWith(".json")) {
- config =
- ConfigFactory.parseString(
- content,
ConfigParseOptions.defaults().setSyntax(ConfigSyntax.JSON));
- } else {
- config = ConfigFactory.parseString(content);
+
+ log.info("Processing uploaded config file: {}", submittedFileName);
+ ConfigFormat configFormat = detectConfigFormat(submittedFileName);
+ switch (configFormat) {
+ case JSON:
+ config =
+ ConfigFactory.parseString(
+ content,
+
ConfigParseOptions.defaults().setSyntax(ConfigSyntax.JSON));
+ break;
+ case SQL:
+ config = SqlConfigBuilder.of(content);
+ break;
+ case HOCON:
+ default:
+ config = ConfigFactory.parseString(content);
+ break;
}
writeJson(resp, jobInfoService.submitJob(getParameterMap(req),
config));
}
+
+ private ConfigFormat detectConfigFormat(String fileName) {
+ if (fileName == null) {
+ return ConfigFormat.JSON;
+ }
+
+ if (fileName.endsWith(".json")) {
+ return ConfigFormat.JSON;
+ } else if (fileName.endsWith(".sql")) {
+ return ConfigFormat.SQL;
+ } else {
+ return ConfigFormat.HOCON;
+ }
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
index 31b9d60721..fa00693cfb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.rest.servlet;
+import org.apache.seatunnel.engine.server.rest.ConfigFormat;
import org.apache.seatunnel.engine.server.rest.service.JobInfoService;
import com.hazelcast.spi.impl.NodeEngineImpl;
@@ -28,7 +29,6 @@ import java.io.IOException;
import java.util.Map;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.CONFIG_FORMAT;
-import static org.apache.seatunnel.engine.server.rest.RestConstant.HOCON;
public class SubmitJobServlet extends BaseServlet {
private final JobInfoService jobInfoService;
@@ -42,10 +42,7 @@ public class SubmitJobServlet extends BaseServlet {
public void doPost(HttpServletRequest req, HttpServletResponse resp)
throws IOException {
Map<String, String> requestParams = getParameterMap(req);
- if (HOCON.equalsIgnoreCase(requestParams.get(CONFIG_FORMAT))) {
- writeJson(resp, jobInfoService.submitJob(requestParams,
requestHoconBody(req)));
- } else {
- writeJson(resp, jobInfoService.submitJob(requestParams,
requestBody(req)));
- }
+ ConfigFormat configFormat =
ConfigFormat.fromString(requestParams.get(CONFIG_FORMAT));
+ writeJson(resp, jobInfoService.submitJob(requestParams,
requestBody(req, configFormat)));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/BaseServletTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/BaseServletTest.java
index 3fec682dba..8cbf7bc7f3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/BaseServletTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/BaseServletTest.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.rest;
+import org.apache.seatunnel.config.sql.SqlConfigBuilder;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
@@ -29,7 +30,7 @@ import
org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.TestUtils;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import com.hazelcast.config.Config;
@@ -45,7 +46,7 @@ class BaseServletTest extends AbstractSeaTunnelServerTest {
private static final Long JOB_1 = System.currentTimeMillis() + 1L;
- @BeforeEach
+ @BeforeAll
void setUp() {
String name = this.getClass().getName();
Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
@@ -71,6 +72,71 @@ class BaseServletTest extends AbstractSeaTunnelServerTest {
testLogRestApiResponse("JSON");
}
+ @Test
+ void testSqlConfigParsing() throws Exception {
+ String sqlContent =
+ "/* config\n"
+ + "env {\n"
+ + " parallelism = 1\n"
+ + " job.mode = \"BATCH\"\n"
+ + "}\n"
+ + "*/\n"
+ + "\n"
+ + "CREATE TABLE test_source (\n"
+ + " id INT,\n"
+ + " name STRING\n"
+ + ") WITH (\n"
+ + " 'connector' = 'FakeSource',\n"
+ + " 'rows' = '[{ fields = [1, \"test\"], kind =
INSERT }]',\n"
+ + " 'schema' = '{ fields { id = \"int\", name =
\"string\" } }',\n"
+ + " 'type' = 'source'\n"
+ + ");\n"
+ + "\n"
+ + "CREATE TABLE test_sink (\n"
+ + " id INT,\n"
+ + " name STRING\n"
+ + ") WITH (\n"
+ + " 'connector' = 'Console',\n"
+ + " 'type' = 'sink'\n"
+ + ");\n"
+ + "\n"
+ + "INSERT INTO test_sink SELECT * FROM test_source;";
+
+ org.apache.seatunnel.shade.com.typesafe.config.Config config =
+ SqlConfigBuilder.of(sqlContent);
+
+ Assertions.assertNotNull(config);
+ Assertions.assertTrue(config.hasPath("source"));
+ Assertions.assertTrue(config.hasPath("transform"));
+ Assertions.assertTrue(config.hasPath("sink"));
+
+ // SQL with INSERT INTO ... SELECT FROM ... will create a transform
step
+ Assertions.assertTrue(
+ config.hasPath("transform"),
+ "Transform should be created for INSERT INTO ... SELECT FROM
... statement");
+
+ // Verify source configuration
+ org.apache.seatunnel.shade.com.typesafe.config.Config sourceConfig =
+ config.getConfigList("source").get(0);
+ Assertions.assertEquals("FakeSource",
sourceConfig.getString("plugin_name"));
+ Assertions.assertEquals("test_source",
sourceConfig.getString("plugin_output"));
+
+ // Verify transform configuration (created by INSERT statement)
+ org.apache.seatunnel.shade.com.typesafe.config.Config transformConfig =
+ config.getConfigList("transform").get(0);
+ Assertions.assertEquals("test_source",
transformConfig.getString("plugin_input"));
+ Assertions.assertTrue(
+
transformConfig.getString("plugin_output").startsWith("test_source__temp"));
+ Assertions.assertEquals("SELECT * FROM test_source",
transformConfig.getString("query"));
+
+ // Verify sink configuration
+ org.apache.seatunnel.shade.com.typesafe.config.Config sinkConfig =
+ config.getConfigList("sink").get(0);
+ Assertions.assertEquals("Console",
sinkConfig.getString("plugin_name"));
+ Assertions.assertEquals(
+ transformConfig.getString("plugin_output"),
sinkConfig.getString("plugin_input"));
+ }
+
public void testLogRestApiResponse(String format) throws IOException {
HttpURLConnection conn = null;
try {