This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch branch-0.1 in repository https://gitbox.apache.org/repos/asf/paimon-webui.git
commit 192effa147a734169cf7d0fd17d11fb552184ee8 Author: s7monk <34889415+s7m...@users.noreply.github.com> AuthorDate: Thu Jul 11 16:43:13 2024 +0800 [Improvement] Support query limit (#502) --- .../web/engine/flink/common/executor/Executor.java | 3 +- .../flink/common/parser/CustomSqlParser.java | 86 +++++++++++++++ .../flink/common/parser/CustomSqlParserTest.java | 54 ++++++++++ .../flink/common/parser/StatementsConstant.java | 117 +++++++++++++++++++++ .../gateway/executor/FlinkSqlGatewayExecutor.java | 24 +++-- .../executor/FlinkSqlGatewayExecutorTest.java | 21 ++-- paimon-web-engine/paimon-web-engine-flink/pom.xml | 1 - paimon-web-server/pom.xml | 7 -- .../paimon/web/server/data/dto/JobSubmitDTO.java | 2 + .../web/server/service/impl/JobServiceImpl.java | 5 +- 10 files changed, 291 insertions(+), 29 deletions(-) diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/executor/Executor.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/executor/Executor.java index 60a754f6..f3e8bece 100644 --- a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/executor/Executor.java +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/executor/Executor.java @@ -31,10 +31,11 @@ public interface Executor { * Executes an SQL statement. * * @param statement The SQL statement to be executed. + * @param maxRows The maximum number of rows to return in the result set. * @return SubmitResult containing information about the execution result. * @throws Exception if there is an error executing the SQL statement. */ - ExecutionResult executeSql(String statement) throws Exception; + ExecutionResult executeSql(String statement, int maxRows) throws Exception; /** * Fetches the results of a previously submitted SQL statement execution. diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParser.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParser.java new file mode 100644 index 00000000..395a7efc --- /dev/null +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/main/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParser.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.engine.flink.common.parser; + +import org.apache.calcite.config.Lex; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; +import org.apache.flink.sql.parser.validate.FlinkSqlConformance; + +/** CustomSqlParser to parse Sql list. */ +public class CustomSqlParser { + + private static final SqlParser.Config config; + private final SqlParser parser; + private final int limit; + + static { + config = + SqlParser.config() + .withParserFactory(FlinkSqlParserImpl.FACTORY) + .withConformance(FlinkSqlConformance.DEFAULT) + .withLex(Lex.JAVA) + .withIdentifierMaxLength(256); + } + + public CustomSqlParser(String sql, int limit) { + this.parser = SqlParser.create(sql, config); + this.limit = limit; + } + + public SqlNodeList parseStmtList() throws SqlParseException { + SqlNodeList nodeList = parser.parseStmtList(); + for (SqlNode node : nodeList) { + if (node instanceof SqlSelect) { + SqlSelect select = (SqlSelect) node; + if (!hasAggregateOrGroupBy(select) && select.getFetch() == null) { + SqlLiteral sqlLiteral = + SqlLiteral.createExactNumeric(String.valueOf(limit), SqlParserPos.ZERO); + select.setFetch(sqlLiteral); + } + } + } + return nodeList; + } + + private boolean hasAggregateOrGroupBy(SqlSelect select) { + if (select.getGroup() != null && !select.getGroup().isEmpty()) { + return true; + } + return containsComplexOperations(select.getSelectList()); + } + + private boolean containsComplexOperations(SqlNodeList nodes) { + if (nodes != null) { + for (SqlNode node : nodes) { + if (!(node instanceof SqlIdentifier)) { + return true; + } + } + } + return false; + } +} diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParserTest.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParserTest.java new file mode 100644 index 00000000..967c73a2 --- /dev/null +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/CustomSqlParserTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.engine.flink.common.parser; + +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.parser.SqlParseException; +import org.junit.jupiter.api.Test; + +import static org.apache.paimon.web.engine.flink.common.parser.StatementsConstant.statement1; +import static org.apache.paimon.web.engine.flink.common.parser.StatementsConstant.statement2; +import static org.apache.paimon.web.engine.flink.common.parser.StatementsConstant.statement3; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests of {@link CustomSqlParser}. */ +public class CustomSqlParserTest { + + @Test + public void testParse() throws SqlParseException { + CustomSqlParser customSqlParser = new CustomSqlParser(statement1, 0); + SqlNodeList sqlNodeList = customSqlParser.parseStmtList(); + assertThat(sqlNodeList.size()).isEqualTo(5); + } + + @Test + public void testSelectLimit() throws SqlParseException { + CustomSqlParser customSqlParser = new CustomSqlParser(statement2, 500); + String actual = customSqlParser.parseStmtList().get(2).toString(); + assertThat(actual) + .isEqualToIgnoringWhitespace("SELECT * FROM `t_order` FETCH NEXT 500 ROWS ONLY"); + } + + @Test + public void testSelectWithoutLimit() throws SqlParseException { + CustomSqlParser customSqlParser = new CustomSqlParser(statement3, 200); + String actual = customSqlParser.parseStmtList().get(2).toString(); + assertThat(actual).isEqualToIgnoringWhitespace("SELECT COUNT(*) FROM `t_order`"); + } +} diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/StatementsConstant.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/StatementsConstant.java new file mode 100644 index 00000000..e029c4ba --- /dev/null +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-common/src/test/java/org/apache/paimon/web/engine/flink/common/parser/StatementsConstant.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.web.engine.flink.common.parser; + +/** Statements constant. */ +public class StatementsConstant { + + public static String statement1 = + "DROP TABLE IF EXISTS t_order;\n" + + "CREATE TABLE IF NOT EXISTS t_order(\n" + + " --order id\n" + + " `order_id` BIGINT,\n" + + " --product\n" + + " `product` BIGINT,\n" + + " --amount\n" + + " `amount` BIGINT,\n" + + " --payment time\n" + + " `order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),\n" + + " --WATERMARK\n" + + " WATERMARK FOR order_time AS order_time-INTERVAL '2' SECOND\n" + + ") WITH(\n" + + " 'connector' = 'datagen',\n" + + " 'rows-per-second' = '1',\n" + + " 'fields.order_id.min' = '1',\n" + + " 'fields.order_id.max' = '2',\n" + + " 'fields.amount.min' = '1',\n" + + " 'fields.amount.max' = '10',\n" + + " 'fields.product.min' = '1',\n" + + " 'fields.product.max' = '2'\n" + + ");\n" + + "-- SELECT * FROM t_order LIMIT 10;\n" + + "DROP TABLE IF EXISTS sink_table;\n" + + "CREATE TABLE IF NOT EXISTS sink_table(\n" + + " --product\n" + + " `product` BIGINT,\n" + + " --amount\n" + + " `amount` BIGINT,\n" + + " --payment time\n" + + " `order_time` TIMESTAMP(3),\n" + + " `one_minute_sum` BIGINT\n" + + ") WITH('connector' = 'print');\n" + + "\n" + + "INSERT INTO\n" + + " sink_table\n" + + "SELECT\n" + + " product,\n" + + " amount,\n" + + " order_time,\n" + + " 0 as one_minute_sum\n" + + "FROM\n" + + " t_order;"; + + public static String statement2 = + "DROP TABLE IF EXISTS t_order;\n" + + "CREATE TABLE IF NOT EXISTS t_order(\n" + + " --order id\n" + + " `order_id` BIGINT,\n" + + " --product\n" + + " `product` BIGINT,\n" + + " --amount\n" + + " `amount` BIGINT,\n" + + " --payment time\n" + + " `order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),\n" + + " --WATERMARK\n" + + " WATERMARK FOR order_time AS order_time-INTERVAL '2' SECOND\n" + + ") WITH(\n" + + " 'connector' = 'datagen',\n" + + " 'rows-per-second' = '1',\n" + + " 'fields.order_id.min' = '1',\n" + + " 'fields.order_id.max' = '2',\n" + + " 'fields.amount.min' = '1',\n" + + " 'fields.amount.max' = '10',\n" + + " 'fields.product.min' = '1',\n" + + " 'fields.product.max' = '2'\n" + + ");\n" + + "SELECT * FROM t_order;"; + public static String statement3 = + "DROP TABLE IF EXISTS t_order;\n" + + "CREATE TABLE IF NOT EXISTS t_order(\n" + + " --order id\n" + + " `order_id` BIGINT,\n" + + " --product\n" + + " `product` BIGINT,\n" + + " --amount\n" + + " `amount` BIGINT,\n" + + " --payment time\n" + + " `order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),\n" + + " --WATERMARK\n" + + " WATERMARK FOR order_time AS order_time-INTERVAL '2' SECOND\n" + + ") WITH(\n" + + " 'connector' = 'datagen',\n" + + " 'rows-per-second' = '1',\n" + + " 'fields.order_id.min' = '1',\n" + + " 'fields.order_id.max' = '2',\n" + + " 'fields.amount.min' = '1',\n" + + " 'fields.amount.max' = '10',\n" + + " 'fields.product.min' = '1',\n" + + " 'fields.product.max' = '2'\n" + + ");\n" + + "SELECT count(*) FROM t_order;"; +} diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java index 12c5b4de..ab58e471 100644 --- a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/main/java/org/apache/paimon/web/engine/flink/sql/gateway/executor/FlinkSqlGatewayExecutor.java @@ -20,7 +20,7 @@ package org.apache.paimon.web.engine.flink.sql.gateway.executor; import org.apache.paimon.web.engine.flink.common.executor.Executor; import org.apache.paimon.web.engine.flink.common.operation.FlinkSqlOperationType; -import org.apache.paimon.web.engine.flink.common.parser.StatementParser; +import org.apache.paimon.web.engine.flink.common.parser.CustomSqlParser; import org.apache.paimon.web.engine.flink.common.result.ExecutionResult; import org.apache.paimon.web.engine.flink.common.result.FetchResultParams; import org.apache.paimon.web.engine.flink.sql.gateway.client.SqlGatewayClient; @@ -28,6 +28,8 @@ import org.apache.paimon.web.engine.flink.sql.gateway.model.SessionEntity; import org.apache.paimon.web.engine.flink.sql.gateway.utils.CollectResultUtil; import org.apache.paimon.web.engine.flink.sql.gateway.utils.FlinkSqlStatementSetBuilder; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody; @@ -53,16 +55,18 @@ public class FlinkSqlGatewayExecutor implements Executor { } @Override - public ExecutionResult executeSql(String multiStatement) throws Exception { - String[] statements = StatementParser.parse(multiStatement); + public ExecutionResult executeSql(String multiStatement, int maxRows) throws Exception { + CustomSqlParser customSqlParser = new CustomSqlParser(multiStatement, maxRows); + SqlNodeList sqlNodeList = customSqlParser.parseStmtList(); List<String> insertStatements = new ArrayList<>(); ExecutionResult executionResult = null; - for (String statement : statements) { - FlinkSqlOperationType operationType = FlinkSqlOperationType.getOperationType(statement); + for (SqlNode sqlNode : sqlNodeList) { + FlinkSqlOperationType operationType = + FlinkSqlOperationType.getOperationType(sqlNode.toString()); if (operationType == null) { - String operationTypeString = extractSqlOperationType(statement); + String operationTypeString = extractSqlOperationType(sqlNode.toString()); throw new UnsupportedOperationException( "Unsupported operation type: " + operationTypeString); } @@ -73,17 +77,17 @@ public class FlinkSqlGatewayExecutor implements Executor { throw new UnsupportedOperationException( "Cannot execute DQL statement with pending INSERT statements."); } - executionResult = executeDqlStatement(statement, operationType); + executionResult = executeDqlStatement(sqlNode.toString(), operationType); break; case DML: if (operationType.getType().equals(FlinkSqlOperationType.INSERT.getType())) { - insertStatements.add(statement); + insertStatements.add(sqlNode.toString()); } else { - executionResult = executeDmlStatement(statement); + executionResult = executeDmlStatement(sqlNode.toString()); } break; default: - client.executeStatement(session.getSessionId(), statement, null); + client.executeStatement(session.getSessionId(), sqlNode.toString(), null); break; } diff --git a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/test/java/org/apache/paimon/web/engine/flink/sql/gataway/executor/FlinkSqlGatewayExecutorTest.java b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/test/java/org/apache/paimon/web/engine/flink/sql/gataway/executor/FlinkSqlGatewayExecutorTest.java index 42620af7..69a6d8a8 100644 --- a/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/test/java/org/apache/paimon/web/engine/flink/sql/gataway/executor/FlinkSqlGatewayExecutorTest.java +++ b/paimon-web-engine/paimon-web-engine-flink/paimon-web-engine-flink-sql-gateway/src/test/java/org/apache/paimon/web/engine/flink/sql/gataway/executor/FlinkSqlGatewayExecutorTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.web.engine.flink.sql.gateway.client.SqlGatewayClient; import org.apache.paimon.web.engine.flink.sql.gateway.executor.FlinkSqlGatewayExecutor; import org.apache.paimon.web.engine.flink.sql.gateway.model.SessionEntity; +import org.apache.calcite.sql.parser.SqlParseException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -52,21 +53,23 @@ public class FlinkSqlGatewayExecutorTest extends TestBase { @Test public void testExecuteSql() throws Exception { - ExecutionResult executionResult = executor.executeSql(StatementsConstant.statement); + ExecutionResult executionResult = executor.executeSql(StatementsConstant.statement, 0); assertNotNull(executionResult); assertNotNull(executionResult.getJobId()); } @Test public void testExecuteStatementSetSql() throws Exception { - ExecutionResult executionResult = executor.executeSql(StatementsConstant.statementSetSql); + ExecutionResult executionResult = + executor.executeSql(StatementsConstant.statementSetSql, 0); assertNotNull(executionResult); assertNotNull(executionResult.getJobId()); } @Test public void testExecutorStatementWithoutResult() throws Exception { - ExecutionResult executionResult = executor.executeSql(StatementsConstant.createStatement); + ExecutionResult executionResult = + executor.executeSql(StatementsConstant.createStatement, 0); assertNull(executionResult); } @@ -77,7 +80,8 @@ public class FlinkSqlGatewayExecutorTest extends TestBase { UnsupportedOperationException.class, () -> { executor.executeSql( - StatementsConstant.selectStatementWithPendingInsertStatements); + StatementsConstant.selectStatementWithPendingInsertStatements, + 0); }); String expectedMessage = "Cannot execute DQL statement with pending INSERT statements."; String actualMessage = exception.getMessage(); @@ -88,18 +92,19 @@ public class FlinkSqlGatewayExecutorTest extends TestBase { public void testExecuteBadSqlStatement() { Exception exception = assertThrows( - UnsupportedOperationException.class, + SqlParseException.class, () -> { - executor.executeSql(StatementsConstant.badStatement); + executor.executeSql(StatementsConstant.badStatement, 0); }); - String expectedMessage = "Unsupported operation type: CREAT"; + String expectedMessage = "Non-query expression encountered in illegal context"; String actualMessage = exception.getMessage(); assertTrue(actualMessage.contains(expectedMessage)); } @Test public void testFetchResults() throws Exception { - ExecutionResult executionResult = executor.executeSql(StatementsConstant.selectStatement); + ExecutionResult executionResult = + executor.executeSql(StatementsConstant.selectStatement, 10); assertNotNull(executionResult); assertNotNull(executionResult.getJobId()); assertNotNull(executionResult.getSubmitId()); diff --git a/paimon-web-engine/paimon-web-engine-flink/pom.xml b/paimon-web-engine/paimon-web-engine-flink/pom.xml index 476a6bc2..1db29878 100644 --- a/paimon-web-engine/paimon-web-engine-flink/pom.xml +++ b/paimon-web-engine/paimon-web-engine-flink/pom.xml @@ -88,7 +88,6 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.version}</artifactId> <version>${flink.version}</version> - <scope>provided</scope> </dependency> <dependency> diff --git a/paimon-web-server/pom.xml b/paimon-web-server/pom.xml index 2d85244d..9bd9c433 100644 --- a/paimon-web-server/pom.xml +++ b/paimon-web-server/pom.xml @@ -248,13 +248,6 @@ under the License. </exclusions> </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${scala.version}</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java index 83e4c384..b3c4d6f7 100644 --- a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/JobSubmitDTO.java @@ -39,4 +39,6 @@ public class JobSubmitDTO { private Map<String, String> config; private String statements; + + private int maxRows; } diff --git a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java index ceb46c73..2df400e2 100644 --- a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java +++ b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/JobServiceImpl.java @@ -142,7 +142,8 @@ public class JobServiceImpl extends ServiceImpl<JobMapper, JobInfo> implements J String.format( "Starting to submit %s %s job...", jobSubmitDTO.getTaskType(), executeMode)); - ExecutionResult executionResult = executor.executeSql(jobSubmitDTO.getStatements()); + ExecutionResult executionResult = + executor.executeSql(jobSubmitDTO.getStatements(), jobSubmitDTO.getMaxRows()); if (StringUtils.isNotBlank(executionResult.getJobId())) { JobInfo jobInfo = buildJobInfo(executionResult, jobSubmitDTO); this.save(jobInfo); @@ -318,7 +319,7 @@ public class JobServiceImpl extends ServiceImpl<JobMapper, JobInfo> implements J return; } - ExecutionResult executionResult = executor.executeSql(SHOW_JOBS_STATEMENT); + ExecutionResult executionResult = executor.executeSql(SHOW_JOBS_STATEMENT, 0); List<Map<String, Object>> jobsData = executionResult.getData(); for (Map<String, Object> jobData : jobsData) { String jobId = (String) jobData.get("job id");