This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.1
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git

commit 192effa147a734169cf7d0fd17d11fb552184ee8
Author: s7monk <34889415+s7m...@users.noreply.github.com>
AuthorDate: Thu Jul 11 16:43:13 2024 +0800

    [Improvement] Support query limit (#502)
---
 .../web/engine/flink/common/executor/Executor.java |   3 +-
 .../flink/common/parser/CustomSqlParser.java       |  86 +++++++++++++++
 .../flink/common/parser/CustomSqlParserTest.java   |  54 ++++++++++
 .../flink/common/parser/StatementsConstant.java    | 117 +++++++++++++++++++++
 .../gateway/executor/FlinkSqlGatewayExecutor.java  |  24 +++--
 .../executor/FlinkSqlGatewayExecutorTest.java      |  21 ++--
 paimon-web-engine/paimon-web-engine-flink/pom.xml  |   1 -
 paimon-web-server/pom.xml                          |   7 --
 .../paimon/web/server/data/dto/JobSubmitDTO.java   |   2 +
 .../web/server/service/impl/JobServiceImpl.java    |   5 +-
 10 files changed, 291 insertions(+), 29 deletions(-)

diff --git 
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/executor/Executor.java
 
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/executor/Executor.java
index 60a754f6..f3e8bece 100644
--- 
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/executor/Executor.java
+++ 
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/executor/Executor.java
@@ -31,10 +31,11 @@ public interface Executor {
      * Executes an SQL statement.
      *
      * @param statement The SQL statement to be executed.
+     * @param maxRows The maximum number of rows to return in the result set.
      * @return SubmitResult containing information about the execution result.
      * @throws Exception if there is an error executing the SQL statement.
      */
-    ExecutionResult executeSql(String statement) throws Exception;
+    ExecutionResult executeSql(String statement, int maxRows) throws Exception;
 
     /**
      * Fetches the results of a previously submitted SQL statement execution.
diff --git 
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParser.java
 
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParser.java
new file mode 100644
index 00000000..395a7efc
--- /dev/null
+++ 
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParser.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.engine.flink.common.parser;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+
+/** CustomSqlParser to parse Sql list. */
+public class CustomSqlParser {
+
+    private static final SqlParser.Config config;
+    private final SqlParser parser;
+    private final int limit;
+
+    static {
+        config =
+                SqlParser.config()
+                        .withParserFactory(FlinkSqlParserImpl.FACTORY)
+                        .withConformance(FlinkSqlConformance.DEFAULT)
+                        .withLex(Lex.JAVA)
+                        .withIdentifierMaxLength(256);
+    }
+
+    public CustomSqlParser(String sql, int limit) {
+        this.parser = SqlParser.create(sql, config);
+        this.limit = limit;
+    }
+
+    public SqlNodeList parseStmtList() throws SqlParseException {
+        SqlNodeList nodeList = parser.parseStmtList();
+        for (SqlNode node : nodeList) {
+            if (node instanceof SqlSelect) {
+                SqlSelect select = (SqlSelect) node;
+                if (!hasAggregateOrGroupBy(select) && select.getFetch() == 
null) {
+                    SqlLiteral sqlLiteral =
+                            
SqlLiteral.createExactNumeric(String.valueOf(limit), SqlParserPos.ZERO);
+                    select.setFetch(sqlLiteral);
+                }
+            }
+        }
+        return nodeList;
+    }
+
+    private boolean hasAggregateOrGroupBy(SqlSelect select) {
+        if (select.getGroup() != null && !select.getGroup().isEmpty()) {
+            return true;
+        }
+        return containsComplexOperations(select.getSelectList());
+    }
+
+    private boolean containsComplexOperations(SqlNodeList nodes) {
+        if (nodes != null) {
+            for (SqlNode node : nodes) {
+                if (!(node instanceof SqlIdentifier)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+}
diff --git 
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParserTest.java
 
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParserTest.java
new file mode 100644
index 00000000..967c73a2
--- /dev/null
+++ 
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParserTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.engine.flink.common.parser;
+
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.paimon.web.engine.flink.common.parser.StatementsConstant.statement1;
+import static 
org.apache.paimon.web.engine.flink.common.parser.StatementsConstant.statement2;
+import static 
org.apache.paimon.web.engine.flink.common.parser.StatementsConstant.statement3;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests of {@link CustomSqlParser}. */
+public class CustomSqlParserTest {
+
+    @Test
+    public void testParse() throws SqlParseException {
+        CustomSqlParser customSqlParser = new CustomSqlParser(statement1, 0);
+        SqlNodeList sqlNodeList = customSqlParser.parseStmtList();
+        assertThat(sqlNodeList.size()).isEqualTo(5);
+    }
+
+    @Test
+    public void testSelectLimit() throws SqlParseException {
+        CustomSqlParser customSqlParser = new CustomSqlParser(statement2, 500);
+        String actual = customSqlParser.parseStmtList().get(2).toString();
+        assertThat(actual)
+                .isEqualToIgnoringWhitespace("SELECT * FROM `t_order` FETCH 
NEXT 500 ROWS ONLY");
+    }
+
+    @Test
+    public void testSelectWithoutLimit() throws SqlParseException {
+        CustomSqlParser customSqlParser = new CustomSqlParser(statement3, 200);
+        String actual = customSqlParser.parseStmtList().get(2).toString();
+        assertThat(actual).isEqualToIgnoringWhitespace("SELECT COUNT(*) FROM 
`t_order`");
+    }
+}
diff --git 
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/StatementsConstant.java
 
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/StatementsConstant.java
new file mode 100644
index 00000000..e029c4ba
--- /dev/null
+++ 
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/StatementsConstant.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.engine.flink.common.parser;
+
+/** Statements constant. */
+public class StatementsConstant {
+
+    public static String statement1 =
+            "DROP TABLE IF EXISTS t_order;\n"
+                    + "CREATE TABLE IF NOT EXISTS t_order(\n"
+                    + "    --order id\n"
+                    + "    `order_id` BIGINT,\n"
+                    + "    --product\n"
+                    + "    `product` BIGINT,\n"
+                    + "    --amount\n"
+                    + "    `amount` BIGINT,\n"
+                    + "    --payment time\n"
+                    + "    `order_time` as CAST(CURRENT_TIMESTAMP AS 
TIMESTAMP(3)),\n"
+                    + "    --WATERMARK\n"
+                    + "    WATERMARK FOR order_time AS order_time-INTERVAL '2' 
SECOND\n"
+                    + ") WITH(\n"
+                    + "    'connector' = 'datagen',\n"
+                    + "    'rows-per-second' = '1',\n"
+                    + "    'fields.order_id.min' = '1',\n"
+                    + "    'fields.order_id.max' = '2',\n"
+                    + "    'fields.amount.min' = '1',\n"
+                    + "    'fields.amount.max' = '10',\n"
+                    + "    'fields.product.min' = '1',\n"
+                    + "    'fields.product.max' = '2'\n"
+                    + ");\n"
+                    + "-- SELECT * FROM t_order LIMIT 10;\n"
+                    + "DROP TABLE IF EXISTS sink_table;\n"
+                    + "CREATE TABLE IF NOT EXISTS sink_table(\n"
+                    + "    --product\n"
+                    + "    `product` BIGINT,\n"
+                    + "    --amount\n"
+                    + "    `amount` BIGINT,\n"
+                    + "    --payment time\n"
+                    + "    `order_time` TIMESTAMP(3),\n"
+                    + "    `one_minute_sum` BIGINT\n"
+                    + ") WITH('connector' = 'print');\n"
+                    + "\n"
+                    + "INSERT INTO\n"
+                    + "    sink_table\n"
+                    + "SELECT\n"
+                    + "    product,\n"
+                    + "    amount,\n"
+                    + "    order_time,\n"
+                    + "    0 as one_minute_sum\n"
+                    + "FROM\n"
+                    + "    t_order;";
+
+    public static String statement2 =
+            "DROP TABLE IF EXISTS t_order;\n"
+                    + "CREATE TABLE IF NOT EXISTS t_order(\n"
+                    + "    --order id\n"
+                    + "    `order_id` BIGINT,\n"
+                    + "    --product\n"
+                    + "    `product` BIGINT,\n"
+                    + "    --amount\n"
+                    + "    `amount` BIGINT,\n"
+                    + "    --payment time\n"
+                    + "    `order_time` as CAST(CURRENT_TIMESTAMP AS 
TIMESTAMP(3)),\n"
+                    + "    --WATERMARK\n"
+                    + "    WATERMARK FOR order_time AS order_time-INTERVAL '2' 
SECOND\n"
+                    + ") WITH(\n"
+                    + "    'connector' = 'datagen',\n"
+                    + "    'rows-per-second' = '1',\n"
+                    + "    'fields.order_id.min' = '1',\n"
+                    + "    'fields.order_id.max' = '2',\n"
+                    + "    'fields.amount.min' = '1',\n"
+                    + "    'fields.amount.max' = '10',\n"
+                    + "    'fields.product.min' = '1',\n"
+                    + "    'fields.product.max' = '2'\n"
+                    + ");\n"
+                    + "SELECT * FROM t_order;";
+    public static String statement3 =
+            "DROP TABLE IF EXISTS t_order;\n"
+                    + "CREATE TABLE IF NOT EXISTS t_order(\n"
+                    + "    --order id\n"
+                    + "    `order_id` BIGINT,\n"
+                    + "    --product\n"
+                    + "    `product` BIGINT,\n"
+                    + "    --amount\n"
+                    + "    `amount` BIGINT,\n"
+                    + "    --payment time\n"
+                    + "    `order_time` as CAST(CURRENT_TIMESTAMP AS 
TIMESTAMP(3)),\n"
+                    + "    --WATERMARK\n"
+                    + "    WATERMARK FOR order_time AS order_time-INTERVAL '2' 
SECOND\n"
+                    + ") WITH(\n"
+                    + "    'connector' = 'datagen',\n"
+                    + "    'rows-per-second' = '1',\n"
+                    + "    'fields.order_id.min' = '1',\n"
+                    + "    'fields.order_id.max' = '2',\n"
+                    + "    'fields.amount.min' = '1',\n"
+                    + "    'fields.amount.max' = '10',\n"
+                    + "    'fields.product.min' = '1',\n"
+                    + "    'fields.product.max' = '2'\n"
+                    + ");\n"
+                    + "SELECT count(*) FROM t_order;";
+}
diff --git 
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java
 
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java
index 12c5b4de..ab58e471 100644
--- 
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java
+++ 
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java
@@ -20,7 +20,7 @@ package 
org.apache.paimon.web.engine.flink.sql.gateway.executor;
 
 import org.apache.paimon.web.engine.flink.common.executor.Executor;
 import 
org.apache.paimon.web.engine.flink.common.operation.FlinkSqlOperationType;
-import org.apache.paimon.web.engine.flink.common.parser.StatementParser;
+import org.apache.paimon.web.engine.flink.common.parser.CustomSqlParser;
 import org.apache.paimon.web.engine.flink.common.result.ExecutionResult;
 import org.apache.paimon.web.engine.flink.common.result.FetchResultParams;
 import org.apache.paimon.web.engine.flink.sql.gateway.client.SqlGatewayClient;
@@ -28,6 +28,8 @@ import 
org.apache.paimon.web.engine.flink.sql.gateway.model.SessionEntity;
 import org.apache.paimon.web.engine.flink.sql.gateway.utils.CollectResultUtil;
 import 
org.apache.paimon.web.engine.flink.sql.gateway.utils.FlinkSqlStatementSetBuilder;
 
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
 
@@ -53,16 +55,18 @@ public class FlinkSqlGatewayExecutor implements Executor {
     }
 
     @Override
-    public ExecutionResult executeSql(String multiStatement) throws Exception {
-        String[] statements = StatementParser.parse(multiStatement);
+    public ExecutionResult executeSql(String multiStatement, int maxRows) 
throws Exception {
+        CustomSqlParser customSqlParser = new CustomSqlParser(multiStatement, 
maxRows);
+        SqlNodeList sqlNodeList = customSqlParser.parseStmtList();
         List<String> insertStatements = new ArrayList<>();
         ExecutionResult executionResult = null;
 
-        for (String statement : statements) {
-            FlinkSqlOperationType operationType = 
FlinkSqlOperationType.getOperationType(statement);
+        for (SqlNode sqlNode : sqlNodeList) {
+            FlinkSqlOperationType operationType =
+                    FlinkSqlOperationType.getOperationType(sqlNode.toString());
 
             if (operationType == null) {
-                String operationTypeString = 
extractSqlOperationType(statement);
+                String operationTypeString = 
extractSqlOperationType(sqlNode.toString());
                 throw new UnsupportedOperationException(
                         "Unsupported operation type: " + operationTypeString);
             }
@@ -73,17 +77,17 @@ public class FlinkSqlGatewayExecutor implements Executor {
                         throw new UnsupportedOperationException(
                                 "Cannot execute DQL statement with pending 
INSERT statements.");
                     }
-                    executionResult = executeDqlStatement(statement, 
operationType);
+                    executionResult = executeDqlStatement(sqlNode.toString(), 
operationType);
                     break;
                 case DML:
                     if 
(operationType.getType().equals(FlinkSqlOperationType.INSERT.getType())) {
-                        insertStatements.add(statement);
+                        insertStatements.add(sqlNode.toString());
                     } else {
-                        executionResult = executeDmlStatement(statement);
+                        executionResult = 
executeDmlStatement(sqlNode.toString());
                     }
                     break;
                 default:
-                    client.executeStatement(session.getSessionId(), statement, 
null);
+                    client.executeStatement(session.getSessionId(), 
sqlNode.toString(), null);
                     break;
             }
 
diff --git 
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/test/java/org/apache/paimon/web/engine/flink/sql/gataway/executor/FlinkSqlGatewayExecutorTest.java
 
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/test/java/org/apache/paimon/web/engine/flink/sql/gataway/executor/FlinkSqlGatewayExecutorTest.java
index 42620af7..69a6d8a8 100644
--- 
a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/test/java/org/apache/paimon/web/engine/flink/sql/gataway/executor/FlinkSqlGatewayExecutorTest.java
+++ 
b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/test/java/org/apache/paimon/web/engine/flink/sql/gataway/executor/FlinkSqlGatewayExecutorTest.java
@@ -25,6 +25,7 @@ import 
org.apache.paimon.web.engine.flink.sql.gateway.client.SqlGatewayClient;
 import 
org.apache.paimon.web.engine.flink.sql.gateway.executor.FlinkSqlGatewayExecutor;
 import org.apache.paimon.web.engine.flink.sql.gateway.model.SessionEntity;
 
+import org.apache.calcite.sql.parser.SqlParseException;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -52,21 +53,23 @@ public class FlinkSqlGatewayExecutorTest extends TestBase {
 
     @Test
     public void testExecuteSql() throws Exception {
-        ExecutionResult executionResult = 
executor.executeSql(StatementsConstant.statement);
+        ExecutionResult executionResult = 
executor.executeSql(StatementsConstant.statement, 0);
         assertNotNull(executionResult);
         assertNotNull(executionResult.getJobId());
     }
 
     @Test
     public void testExecuteStatementSetSql() throws Exception {
-        ExecutionResult executionResult = 
executor.executeSql(StatementsConstant.statementSetSql);
+        ExecutionResult executionResult =
+                executor.executeSql(StatementsConstant.statementSetSql, 0);
         assertNotNull(executionResult);
         assertNotNull(executionResult.getJobId());
     }
 
     @Test
     public void testExecutorStatementWithoutResult() throws Exception {
-        ExecutionResult executionResult = 
executor.executeSql(StatementsConstant.createStatement);
+        ExecutionResult executionResult =
+                executor.executeSql(StatementsConstant.createStatement, 0);
         assertNull(executionResult);
     }
 
@@ -77,7 +80,8 @@ public class FlinkSqlGatewayExecutorTest extends TestBase {
                         UnsupportedOperationException.class,
                         () -> {
                             executor.executeSql(
-                                    
StatementsConstant.selectStatementWithPendingInsertStatements);
+                                    
StatementsConstant.selectStatementWithPendingInsertStatements,
+                                    0);
                         });
         String expectedMessage = "Cannot execute DQL statement with pending 
INSERT statements.";
         String actualMessage = exception.getMessage();
@@ -88,18 +92,19 @@ public class FlinkSqlGatewayExecutorTest extends TestBase {
     public void testExecuteBadSqlStatement() {
         Exception exception =
                 assertThrows(
-                        UnsupportedOperationException.class,
+                        SqlParseException.class,
                         () -> {
-                            
executor.executeSql(StatementsConstant.badStatement);
+                            
executor.executeSql(StatementsConstant.badStatement, 0);
                         });
-        String expectedMessage = "Unsupported operation type: CREAT";
+        String expectedMessage = "Non-query expression encountered in illegal 
context";
         String actualMessage = exception.getMessage();
         assertTrue(actualMessage.contains(expectedMessage));
     }
 
     @Test
     public void testFetchResults() throws Exception {
-        ExecutionResult executionResult = 
executor.executeSql(StatementsConstant.selectStatement);
+        ExecutionResult executionResult =
+                executor.executeSql(StatementsConstant.selectStatement, 10);
         assertNotNull(executionResult);
         assertNotNull(executionResult.getJobId());
         assertNotNull(executionResult.getSubmitId());
diff --git a/paimon-web-engine/paimon-web-engine-flink/pom.xml 
b/paimon-web-engine/paimon-web-engine-flink/pom.xml
index 476a6bc2..1db29878 100644
--- a/paimon-web-engine/paimon-web-engine-flink/pom.xml
+++ b/paimon-web-engine/paimon-web-engine-flink/pom.xml
@@ -88,7 +88,6 @@ under the License.
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-planner_${scala.version}</artifactId>
             <version>${flink.version}</version>
-            <scope>provided</scope>
         </dependency>
 
         <dependency>
diff --git a/paimon-web-server/pom.xml b/paimon-web-server/pom.xml
index 2d85244d..9bd9c433 100644
--- a/paimon-web-server/pom.xml
+++ b/paimon-web-server/pom.xml
@@ -248,13 +248,6 @@ under the License.
             </exclusions>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-planner_${scala.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-        </dependency>
-
         <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
index 83e4c384..b3c4d6f7 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java
@@ -39,4 +39,6 @@ public class JobSubmitDTO {
     private Map<String, String> config;
 
     private String statements;
+
+    private int maxRows;
 }
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
index ceb46c73..2df400e2 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java
@@ -142,7 +142,8 @@ public class JobServiceImpl extends ServiceImpl<JobMapper, 
JobInfo> implements J
                     String.format(
                             "Starting to submit %s %s job...",
                             jobSubmitDTO.getTaskType(), executeMode));
-            ExecutionResult executionResult = 
executor.executeSql(jobSubmitDTO.getStatements());
+            ExecutionResult executionResult =
+                    executor.executeSql(jobSubmitDTO.getStatements(), 
jobSubmitDTO.getMaxRows());
             if (StringUtils.isNotBlank(executionResult.getJobId())) {
                 JobInfo jobInfo = buildJobInfo(executionResult, jobSubmitDTO);
                 this.save(jobInfo);
@@ -318,7 +319,7 @@ public class JobServiceImpl extends ServiceImpl<JobMapper, 
JobInfo> implements J
                         return;
                     }
 
-                    ExecutionResult executionResult = 
executor.executeSql(SHOW_JOBS_STATEMENT);
+                    ExecutionResult executionResult = 
executor.executeSql(SHOW_JOBS_STATEMENT, 0);
                     List<Map<String, Object>> jobsData = 
executionResult.getData();
                     for (Map<String, Object> jobData : jobsData) {
                         String jobId = (String) jobData.get("job id");

Reply via email to