This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8a894ac063 [Feature][Zeta] Support SQL format in REST API (#9802)
8a894ac063 is described below

commit 8a894ac0631c17c956d351e13d80adaddb3a7708
Author: xiaochen <[email protected]>
AuthorDate: Mon Sep 1 09:32:17 2025 +0800

    [Feature][Zeta] Support SQL format in REST API (#9802)
---
 docs/en/seatunnel-engine/rest-api-v2.md            |  64 +++++++--
 docs/zh/seatunnel-engine/rest-api-v2.md            |  56 +++++++-
 .../seatunnel/config/sql/SqlConfigBuilder.java     |  20 +++
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java | 146 +++++++++++++++++++++
 seatunnel-engine/seatunnel-engine-server/pom.xml   |   5 +
 .../seatunnel/engine/server/rest/ConfigFormat.java |  48 +++++++
 .../seatunnel/engine/server/rest/RestConstant.java |   2 -
 .../engine/server/rest/service/JobInfoService.java |  22 +++-
 .../engine/server/rest/servlet/BaseServlet.java    |  21 ++-
 .../rest/servlet/SubmitJobByUploadFileServlet.java |  41 +++++-
 .../server/rest/servlet/SubmitJobServlet.java      |   9 +-
 .../engine/server/rest/BaseServletTest.java        |  70 +++++++++-
 12 files changed, 456 insertions(+), 48 deletions(-)

diff --git a/docs/en/seatunnel-engine/rest-api-v2.md 
b/docs/en/seatunnel-engine/rest-api-v2.md
index 00aa288b3d..5f62c23c95 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -400,16 +400,16 @@ When we can't get the job info, the response will be:
 
 #### Parameters
 
-> | name                 |   type   | data type |            description       
     |
-> 
|----------------------|----------|-----------|-----------------------------------|
-> | jobId                | optional | string    | job id                       
     |
-> | jobName              | optional | string    | job name                     
     |
-> | isStartWithSavePoint | optional | string    | if job is started with save 
point |
-> | format               | optional | string    | config format, support json 
and hocon, default json |
+> | name                 |   type   | data type | description                  
                            |
+> 
|----------------------|----------|-----------|----------------------------------------------------------|
+> | jobId                | optional | string    | job id                       
                            |
+> | jobName              | optional | string    | job name                     
                            |
+> | isStartWithSavePoint | optional | string    | if job is started with save 
point                        |
+> | format               | optional | string    | config format, support json, 
hocon and sql, default json |
 
 #### Body
 
-You can choose json or hocon to pass request body.
+You can choose json, hocon or sql to pass request body.
 The json format example:
 ``` json
 {
@@ -471,6 +471,46 @@ sink {
 
 ```
 
+The SQL format example:
+```sql
+/* config
+env {
+  parallelism = 2
+  job.mode = "BATCH"
+}
+*/
+
+CREATE TABLE fake_source (
+    id INT,
+    name STRING,
+    age INT
+) WITH (
+    'connector' = 'FakeSource',
+    'rows' = '[
+        { fields = [1, "Alice", 25], kind = INSERT },
+        { fields = [2, "Bob", 30], kind = INSERT }
+    ]',
+    'schema' = '{
+        fields {
+            id = "int",
+            name = "string",
+            age = "int"
+        }
+    }',
+    'type' = 'source'
+);
+
+CREATE TABLE console_sink (
+    id INT,
+    name STRING,
+    age INT
+) WITH (
+    'connector' = 'Console',
+    'type' = 'sink'
+);
+
+INSERT INTO console_sink SELECT * FROM fake_source;
+```
 
 #### Responses
 
@@ -499,12 +539,18 @@ sink {
 > | isStartWithSavePoint | optional | string    | if job is started with save 
 > point |
 
 #### Request Body
-The name of the uploaded file key is config_file, and the file suffix json is 
parsed in json format. The conf or config file suffix is parsed in hocon format
+The name of the uploaded file key is config_file, and supports the following 
formats:
+- `.json` files: parsed in JSON format
+- `.conf` or `.config` files: parsed in HOCON format
+- `.sql` files: parsed in SQL format, supports CREATE TABLE and INSERT INTO 
syntax
 
 curl Example :
-```
+```bash
+# Upload HOCON config file
 curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 
'config_file=@"/temp/fake_to_console.conf"'
 
+# Upload SQL config file
+curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 
'config_file=@"/temp/job.sql"'
 ```
 #### Responses
 
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md 
b/docs/zh/seatunnel-engine/rest-api-v2.md
index 124fa5a99a..dccce2640b 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -386,11 +386,11 @@ seatunnel:
 > | jobId                | optional | string | job id                          
 >   |
 > | jobName              | optional | string | job name                        
 >   |
 > | isStartWithSavePoint | optional | string | if job is started with save 
 > point |
-> | format               | optional | string    | 配置风格,支持json和hocon,默认 json    
     |
+> | format               | optional | string    | 配置风格,支持json、hocon 和 sql,默认 
json   |
 
 #### 请求体
 
-你可以选择用json或者hocon的方式来传递请求体。
+你可以选择用json、hocon或者sql的方式来传递请求体。
 Json请求示例:
 ```json
 {
@@ -451,6 +451,48 @@ sink {
   }
 }
 
+```
+
+SQL请求示例:
+
+```sql
+/* config
+env {
+  parallelism = 2
+  job.mode = "BATCH"
+}
+*/
+
+CREATE TABLE fake_source (
+    id INT,
+    name STRING,
+    age INT
+) WITH (
+    'connector' = 'FakeSource',
+    'rows' = '[
+        { fields = [1, "Alice", 25], kind = INSERT },
+        { fields = [2, "Bob", 30], kind = INSERT }
+    ]',
+    'schema' = '{
+        fields {
+            id = "int",
+            name = "string",
+            age = "int"
+        }
+    }',
+    'type' = 'source'
+);
+
+CREATE TABLE console_sink (
+    id INT,
+    name STRING,
+    age INT
+) WITH (
+    'connector' = 'Console',
+    'type' = 'sink'
+);
+
+INSERT INTO console_sink SELECT * FROM fake_source;
 ```
 #### 响应
 
@@ -478,13 +520,19 @@ sink {
 > | isStartWithSavePoint | optional | string | if job is started with save 
 > point |
 
 #### 请求体
-上传文件key的名称是config_file,文件后缀json的按照json格式来解析,conf或config文件后缀按照hocon格式解析
+上传文件key的名称是config_file,支持以下格式:
+- `.json` 文件:按照 JSON 格式解析
+- `.conf` 或 `.config` 文件:按照 HOCON 格式解析
+- `.sql` 文件:按照 SQL 格式解析,支持 CREATE TABLE 和 INSERT INTO 语法
 
 curl Example
 
-```
+```bash
+# 上传 HOCON 配置文件
 curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 
'config_file=@"/temp/fake_to_console.conf"'
 
+# 上传 SQL 配置文件
+curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 
'config_file=@"/temp/job.sql"'
 ```
 #### 响应
 
diff --git 
a/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
 
b/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
index 2e5fc7777a..e526c39e3b 100644
--- 
a/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
+++ 
b/seatunnel-config/seatunnel-config-sql/src/main/java/org/apache/seatunnel/config/sql/SqlConfigBuilder.java
@@ -49,6 +49,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -81,6 +82,25 @@ public class SqlConfigBuilder {
     public static Config of(@NonNull Path sqlFilePath) {
         try {
             List<String> lines = Files.readAllLines(sqlFilePath);
+            return of(lines);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to parse job config file: " + 
sqlFilePath, e);
+        }
+    }
+
+    public static Config of(@NonNull String sqlContent) {
+        try {
+            List<String> lines = new ArrayList<>();
+            String[] lineArray = sqlContent.split("\\r?\\n");
+            Collections.addAll(lines, lineArray);
+            return of(lines);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to parse job config: ", e);
+        }
+    }
+
+    private static Config of(@NonNull List<String> lines) {
+        try {
             Map<String, BaseConfig> sqlTables = new LinkedHashMap<>();
             SeaTunnelConfig seaTunnelConfig = new SeaTunnelConfig();
 
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index b03163c769..8c4c606d48 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -1022,6 +1022,152 @@ public class RestApiIT {
                         });
     }
 
+    @Test
+    public void testSubmitJobWithSqlFormat() {
+        String sqlConfig =
+                "/* config\n"
+                        + "env {\n"
+                        + "  parallelism = 1\n"
+                        + "  job.mode = \"BATCH\"\n"
+                        + "}\n"
+                        + "*/\n"
+                        + "\n"
+                        + "CREATE TABLE test_source (\n"
+                        + "    id INT,\n"
+                        + "    name STRING,\n"
+                        + "    c_time TIMESTAMP\n"
+                        + ") WITH (\n"
+                        + "    'connector' = 'FakeSource',\n"
+                        + "    'schema' = '{ \n"
+                        + "      fields { \n"
+                        + "        id = \"int\", \n"
+                        + "        name = \"string\",\n"
+                        + "        c_time = \"timestamp\"\n"
+                        + "      } \n"
+                        + "    }',\n"
+                        + "    'rows' = '[ \n"
+                        + "      { fields = [1, \"test\", null], kind = INSERT 
}\n"
+                        + "    ]',\n"
+                        + "    'type' = 'source'\n"
+                        + ");\n"
+                        + "\n"
+                        + "CREATE TABLE test_sink (\n"
+                        + "    id INT,\n"
+                        + "    name STRING,\n"
+                        + "    c_time TIMESTAMP\n"
+                        + ") WITH (\n"
+                        + "    'connector' = 'Console',\n"
+                        + "    'type' = 'sink'\n"
+                        + ");\n"
+                        + "\n"
+                        + "INSERT INTO test_sink SELECT * FROM test_source;";
+
+        Arrays.asList(node2, node1)
+                .forEach(
+                        instance -> {
+                            ports.forEach(
+                                    (key, value) -> {
+                                        given().body(sqlConfig)
+                                                .queryParam("format", "sql")
+                                                .queryParam("jobName", 
"test-sql-job")
+                                                .post(HOST + key + 
CONTEXT_PATH + "/submit-job")
+                                                .then()
+                                                .statusCode(200)
+                                                .body("jobId", notNullValue())
+                                                .body("jobName", 
equalTo("test-sql-job"));
+                                    });
+                        });
+    }
+
+    @Test
+    public void testSubmitJobWithJsonFormat() {
+        String jsonConfig =
+                "{\n"
+                        + "    \"env\": {\n"
+                        + "        \"parallelism\": 1,\n"
+                        + "        \"job.mode\": \"BATCH\"\n"
+                        + "    },\n"
+                        + "    \"source\": [\n"
+                        + "        {\n"
+                        + "            \"plugin_name\": \"FakeSource\",\n"
+                        + "            \"plugin_output\": \"fake\",\n"
+                        + "            \"row.num\": 2,\n"
+                        + "            \"schema\": {\n"
+                        + "                \"fields\": {\n"
+                        + "                    \"name\": \"string\",\n"
+                        + "                    \"age\": \"int\"\n"
+                        + "                }\n"
+                        + "            }\n"
+                        + "        }\n"
+                        + "    ],\n"
+                        + "    \"sink\": [\n"
+                        + "        {\n"
+                        + "            \"plugin_name\": \"Console\",\n"
+                        + "            \"plugin_input\": [\"fake\"]\n"
+                        + "        }\n"
+                        + "    ]\n"
+                        + "}";
+
+        Arrays.asList(node2, node1)
+                .forEach(
+                        instance -> {
+                            ports.forEach(
+                                    (key, value) -> {
+                                        given().body(jsonConfig)
+                                                .queryParam("jobName", 
"test-json-job")
+                                                .post(HOST + key + 
CONTEXT_PATH + "/submit-job")
+                                                .then()
+                                                .statusCode(200)
+                                                .body("jobId", notNullValue())
+                                                .body("jobName", 
equalTo("test-json-job"));
+                                    });
+                        });
+    }
+
+    @Test
+    public void testSubmitJobWithHoconFormat() {
+        String hoconConfig =
+                "env {\n"
+                        + "  parallelism = 1\n"
+                        + "  job.mode = \"BATCH\"\n"
+                        + "}\n"
+                        + "\n"
+                        + "source {\n"
+                        + "  FakeSource {\n"
+                        + "    plugin_output = \"fake\"\n"
+                        + "    row.num = 2\n"
+                        + "    schema = {\n"
+                        + "      fields {\n"
+                        + "        name = \"string\"\n"
+                        + "        age = \"int\"\n"
+                        + "      }\n"
+                        + "    }\n"
+                        + "  }\n"
+                        + "}\n"
+                        + "\n"
+                        + "sink {\n"
+                        + "  Console {\n"
+                        + "    plugin_input = \"fake\"\n"
+                        + "  }\n"
+                        + "}";
+
+        Arrays.asList(node2, node1)
+                .forEach(
+                        instance -> {
+                            ports.forEach(
+                                    (key, value) -> {
+                                        given().body(hoconConfig)
+                                                .queryParam("format", "hocon")
+                                                .queryParam("jobName", 
"test-hocon-job")
+                                                .post(HOST + key + 
CONTEXT_PATH + "/submit-job")
+                                                .then()
+                                                .statusCode(200)
+                                                .body("jobId", notNullValue())
+                                                .body("jobName", 
equalTo("test-hocon-job"));
+                                    });
+                        });
+    }
+
     @AfterEach
     void afterClass() {
         if (engineClient != null) {
diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml 
b/seatunnel-engine/seatunnel-engine-server/pom.xml
index 7e36f2c98e..d9e1c860c5 100644
--- a/seatunnel-engine/seatunnel-engine-server/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-server/pom.xml
@@ -66,6 +66,11 @@
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-config-sql</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/ConfigFormat.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/ConfigFormat.java
new file mode 100644
index 0000000000..e830defea4
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/ConfigFormat.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest;
+
+public enum ConfigFormat {
+    JSON("json"),
+    HOCON("hocon"),
+    SQL("sql");
+
+    private final String value;
+
+    ConfigFormat(String value) {
+        this.value = value;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    public static ConfigFormat fromString(String value) {
+        if (value == null) {
+            return JSON;
+        }
+
+        for (ConfigFormat format : ConfigFormat.values()) {
+            if (format.value.equalsIgnoreCase(value)) {
+                return format;
+            }
+        }
+
+        return JSON;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 2e756308e1..f3e28e1329 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -49,8 +49,6 @@ public class RestConstant {
 
     public static final String METRICS = "metrics";
 
-    public static final String HOCON = "hocon";
-
     public static final String TABLE_SOURCE_RECEIVED_COUNT = 
"TableSourceReceivedCount";
     public static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount";
     public static final String TABLE_SOURCE_RECEIVED_QPS = 
"TableSourceReceivedQPS";
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
index 22d3138aee..977aa14e19 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/JobInfoService.java
@@ -22,12 +22,14 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
 import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.config.sql.SqlConfigBuilder;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobInfo;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
 import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
+import org.apache.seatunnel.engine.server.rest.ConfigFormat;
 import org.apache.seatunnel.engine.server.rest.RestConstant;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 import org.apache.seatunnel.engine.server.utils.RestUtil;
@@ -36,6 +38,7 @@ import com.hazelcast.internal.json.JsonArray;
 import com.hazelcast.internal.json.JsonObject;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
 import scala.Tuple2;
 
 import java.nio.charset.StandardCharsets;
@@ -45,8 +48,8 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.CONFIG_FORMAT;
-import static org.apache.seatunnel.engine.server.rest.RestConstant.HOCON;
 
+@Slf4j
 public class JobInfoService extends BaseService {
 
     public JobInfoService(NodeEngineImpl nodeEngine) {
@@ -163,11 +166,18 @@ public class JobInfoService extends BaseService {
             throw new IllegalArgumentException("Please provide jobId when 
start with save point.");
         }
         Config config;
-        if (HOCON.equalsIgnoreCase(requestParams.get(CONFIG_FORMAT))) {
-            String requestBodyStr = new String(requestBody, 
StandardCharsets.UTF_8);
-            config = ConfigFactory.parseString(requestBodyStr);
-        } else {
-            config = RestUtil.buildConfig(requestHandle(requestBody), false);
+        ConfigFormat configFormat = 
ConfigFormat.fromString(requestParams.get(CONFIG_FORMAT));
+        switch (configFormat) {
+            case HOCON:
+                config = ConfigFactory.parseString(new String(requestBody, 
StandardCharsets.UTF_8));
+                break;
+            case SQL:
+                config = SqlConfigBuilder.of(new String(requestBody, 
StandardCharsets.UTF_8));
+                break;
+            case JSON:
+            default:
+                config = RestUtil.buildConfig(requestHandle(requestBody), 
false);
+                break;
         }
         SeaTunnelServer seaTunnelServer = getSeaTunnelServer(false);
         return submitJobInternal(config, requestParams, seaTunnelServer, 
nodeEngine.getNode());
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
index b373211f28..b8fdcb12a4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.rest.servlet;
 
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.rest.ConfigFormat;
 
 import com.google.gson.Gson;
 import com.hazelcast.internal.json.JsonArray;
@@ -110,13 +111,17 @@ public class BaseServlet extends HttpServlet {
         return seaTunnelServer;
     }
 
-    protected byte[] requestBody(HttpServletRequest req) throws IOException {
+    protected byte[] requestBody(HttpServletRequest req, ConfigFormat 
configFormat)
+            throws IOException {
         StringBuilder stringBuilder = new StringBuilder();
         String line;
 
         try (BufferedReader reader = req.getReader()) {
             while ((line = reader.readLine()) != null) {
                 stringBuilder.append(line);
+                if (ConfigFormat.JSON != configFormat) {
+                    stringBuilder.append("\n");
+                }
             }
         }
 
@@ -124,18 +129,8 @@ public class BaseServlet extends HttpServlet {
         return requestBody.getBytes(StandardCharsets.UTF_8);
     }
 
-    public byte[] requestHoconBody(HttpServletRequest req) throws IOException {
-        StringBuilder stringBuilder = new StringBuilder();
-        String line;
-
-        try (BufferedReader reader = req.getReader()) {
-            while ((line = reader.readLine()) != null) {
-                stringBuilder.append(line).append("\n");
-            }
-        }
-
-        String requestBody = stringBuilder.toString();
-        return requestBody.getBytes(StandardCharsets.UTF_8);
+    protected byte[] requestBody(HttpServletRequest req) throws IOException {
+        return requestBody(req, ConfigFormat.JSON);
     }
 
     protected Map<String, String> getParameterMap(HttpServletRequest req) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
index 2db376da06..3818ae94fc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobByUploadFileServlet.java
@@ -22,11 +22,14 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigSyntax;
 
+import org.apache.seatunnel.config.sql.SqlConfigBuilder;
+import org.apache.seatunnel.engine.server.rest.ConfigFormat;
 import org.apache.seatunnel.engine.server.rest.service.JobInfoService;
 
 import org.apache.commons.io.IOUtils;
 
 import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
@@ -36,6 +39,7 @@ import javax.servlet.http.Part;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
+@Slf4j
 public class SubmitJobByUploadFileServlet extends BaseServlet {
     private final JobInfoService jobInfoService;
 
@@ -52,13 +56,38 @@ public class SubmitJobByUploadFileServlet extends 
BaseServlet {
         String submittedFileName = filePart.getSubmittedFileName();
         String content = IOUtils.toString(filePart.getInputStream(), 
StandardCharsets.UTF_8);
         Config config;
-        if (submittedFileName.endsWith(".json")) {
-            config =
-                    ConfigFactory.parseString(
-                            content, 
ConfigParseOptions.defaults().setSyntax(ConfigSyntax.JSON));
-        } else {
-            config = ConfigFactory.parseString(content);
+
+        log.info("Processing uploaded config file: {}", submittedFileName);
+        ConfigFormat configFormat = detectConfigFormat(submittedFileName);
+        switch (configFormat) {
+            case JSON:
+                config =
+                        ConfigFactory.parseString(
+                                content,
+                                
ConfigParseOptions.defaults().setSyntax(ConfigSyntax.JSON));
+                break;
+            case SQL:
+                config = SqlConfigBuilder.of(content);
+                break;
+            case HOCON:
+            default:
+                config = ConfigFactory.parseString(content);
+                break;
         }
         writeJson(resp, jobInfoService.submitJob(getParameterMap(req), 
config));
     }
+
+    private ConfigFormat detectConfigFormat(String fileName) {
+        if (fileName == null) {
+            return ConfigFormat.JSON;
+        }
+
+        if (fileName.endsWith(".json")) {
+            return ConfigFormat.JSON;
+        } else if (fileName.endsWith(".sql")) {
+            return ConfigFormat.SQL;
+        } else {
+            return ConfigFormat.HOCON;
+        }
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
index 31b9d60721..fa00693cfb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/SubmitJobServlet.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.rest.servlet;
 
+import org.apache.seatunnel.engine.server.rest.ConfigFormat;
 import org.apache.seatunnel.engine.server.rest.service.JobInfoService;
 
 import com.hazelcast.spi.impl.NodeEngineImpl;
@@ -28,7 +29,6 @@ import java.io.IOException;
 import java.util.Map;
 
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.CONFIG_FORMAT;
-import static org.apache.seatunnel.engine.server.rest.RestConstant.HOCON;
 
 public class SubmitJobServlet extends BaseServlet {
     private final JobInfoService jobInfoService;
@@ -42,10 +42,7 @@ public class SubmitJobServlet extends BaseServlet {
     public void doPost(HttpServletRequest req, HttpServletResponse resp) 
throws IOException {
 
         Map<String, String> requestParams = getParameterMap(req);
-        if (HOCON.equalsIgnoreCase(requestParams.get(CONFIG_FORMAT))) {
-            writeJson(resp, jobInfoService.submitJob(requestParams, 
requestHoconBody(req)));
-        } else {
-            writeJson(resp, jobInfoService.submitJob(requestParams, 
requestBody(req)));
-        }
+        ConfigFormat configFormat = 
ConfigFormat.fromString(requestParams.get(CONFIG_FORMAT));
+        writeJson(resp, jobInfoService.submitJob(requestParams, 
requestBody(req, configFormat)));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/BaseServletTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/BaseServletTest.java
index 3fec682dba..8cbf7bc7f3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/BaseServletTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/BaseServletTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.rest;
 
+import org.apache.seatunnel.config.sql.SqlConfigBuilder;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.config.server.HttpConfig;
 import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
@@ -29,7 +30,7 @@ import 
org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
 import org.apache.seatunnel.engine.server.TestUtils;
 
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import com.hazelcast.config.Config;
@@ -45,7 +46,7 @@ class BaseServletTest extends AbstractSeaTunnelServerTest {
 
     private static final Long JOB_1 = System.currentTimeMillis() + 1L;
 
-    @BeforeEach
+    @BeforeAll
     void setUp() {
         String name = this.getClass().getName();
         Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
@@ -71,6 +72,71 @@ class BaseServletTest extends AbstractSeaTunnelServerTest {
         testLogRestApiResponse("JSON");
     }
 
+    @Test
+    void testSqlConfigParsing() throws Exception {
+        String sqlContent =
+                "/* config\n"
+                        + "env {\n"
+                        + "  parallelism = 1\n"
+                        + "  job.mode = \"BATCH\"\n"
+                        + "}\n"
+                        + "*/\n"
+                        + "\n"
+                        + "CREATE TABLE test_source (\n"
+                        + "    id INT,\n"
+                        + "    name STRING\n"
+                        + ") WITH (\n"
+                        + "    'connector' = 'FakeSource',\n"
+                        + "    'rows' = '[{ fields = [1, \"test\"], kind = 
INSERT }]',\n"
+                        + "    'schema' = '{ fields { id = \"int\", name = 
\"string\" } }',\n"
+                        + "    'type' = 'source'\n"
+                        + ");\n"
+                        + "\n"
+                        + "CREATE TABLE test_sink (\n"
+                        + "    id INT,\n"
+                        + "    name STRING\n"
+                        + ") WITH (\n"
+                        + "    'connector' = 'Console',\n"
+                        + "    'type' = 'sink'\n"
+                        + ");\n"
+                        + "\n"
+                        + "INSERT INTO test_sink SELECT * FROM test_source;";
+
+        org.apache.seatunnel.shade.com.typesafe.config.Config config =
+                SqlConfigBuilder.of(sqlContent);
+
+        Assertions.assertNotNull(config);
+        Assertions.assertTrue(config.hasPath("source"));
+        Assertions.assertTrue(config.hasPath("transform"));
+        Assertions.assertTrue(config.hasPath("sink"));
+
+        // SQL with INSERT INTO ... SELECT FROM ... will create a transform 
step
+        Assertions.assertTrue(
+                config.hasPath("transform"),
+                "Transform should be created for INSERT INTO ... SELECT FROM 
... statement");
+
+        // Verify source configuration
+        org.apache.seatunnel.shade.com.typesafe.config.Config sourceConfig =
+                config.getConfigList("source").get(0);
+        Assertions.assertEquals("FakeSource", 
sourceConfig.getString("plugin_name"));
+        Assertions.assertEquals("test_source", 
sourceConfig.getString("plugin_output"));
+
+        // Verify transform configuration (created by INSERT statement)
+        org.apache.seatunnel.shade.com.typesafe.config.Config transformConfig =
+                config.getConfigList("transform").get(0);
+        Assertions.assertEquals("test_source", 
transformConfig.getString("plugin_input"));
+        Assertions.assertTrue(
+                
transformConfig.getString("plugin_output").startsWith("test_source__temp"));
+        Assertions.assertEquals("SELECT * FROM test_source", 
transformConfig.getString("query"));
+
+        // Verify sink configuration
+        org.apache.seatunnel.shade.com.typesafe.config.Config sinkConfig =
+                config.getConfigList("sink").get(0);
+        Assertions.assertEquals("Console", 
sinkConfig.getString("plugin_name"));
+        Assertions.assertEquals(
+                transformConfig.getString("plugin_output"), 
sinkConfig.getString("plugin_input"));
+    }
+
     public void testLogRestApiResponse(String format) throws IOException {
         HttpURLConnection conn = null;
         try {


Reply via email to