bobo495 opened a new issue, #4083: URL: https://github.com/apache/kyuubi/issues/4083
### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) ### Search before asking - [X] I have searched in the [issues](https://github.com/apache/incubator-kyuubi/issues?q=is%3Aissue) and found no similar issues. ### Describe the bug The [runQueryOperation](https://github.com/apache/kyuubi/blob/master/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala) method repeats the `snapshotResult` every 50ms and adds it to the `rows`. At this point, the Query will not have to return the result until the accumulation of `rows` rows over `kyuubi.session.engine.flink.max.rows` set value or FlinkSQL end. It is not reasonable to add `snapshotResult` repeatedly to `rows` in `runQueryOperation`. Should the result be returned directly after snapshotResult is fetched? These are the two changes I suggested. Are they appropriate? 1. When `snapshotResult` gets the result, jumps out of the loop. ```scala if (rows.size != 0) { loop = false } ``` 2. Loop sets the maximum timeout to avoid being unable to exit the loop if the result is null. ### Environment - Flink 1.14.0 - Kyuubi 1.7.0-SNAPSHOT(Compile from master) ### Reference UT ```scala test("data is repeatedly added to the resultSet") { withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "20"))(Map.empty) { withJdbcStatement() { statement => statement.execute( """ |create table tbl_src ( | a bigint | ) with ( | 'connector' = 'datagen', | 'rows-per-second'='1', | 'fields.a.kind'='sequence', | 'fields.a.start'='1', | 'fields.a.end'='5' | ) |""".stripMargin) val resultSet = statement.executeQuery(s"select a from tbl_src") var rows = List[Long]() while (resultSet.next()) { rows :+= resultSet.getLong("a") } // rows size more than the input data assert(rows.size <= 5) } } } ```  ### Affects Version(s) master ### Kyuubi Server Log Output _No response_ ### Kyuubi Engine Log Output _No response_ ### Kyuubi Server Configurations ```yaml kyuubi.engine.type FLINK_SQL ``` ### Kyuubi Engine Configurations _No response_ ### Additional context ### Test Case #### Case1 `kyuubi-defaults.conf` uses the default configuration and generates an unbound source through `datagen`. At this point, Kyuubi does not have a result output for a long time, and the FlinkSQL job does not finish. ```sql Connected to: Apache Flink (version 1.14.0) Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT) Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating) 0: jdbc:hive2://localhost:10009/> create table tbl_src ( . . . . . . . . . . . . . . . . . > a bigint . . . . . . . . . . . . . . . . . > ) with ( . . . . . . . . . . . . . . . . . > 'connector' = 'datagen', . . . . . . . . . . . . . . . . . > 'rows-per-second'='1' . . . . . . . . . . . . . . . . . > ); ... query[7ca29418-03ff-40d9-bdba-b46eedb560b3]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.001 seconds +---------+ | result | +---------+ +---------+ No rows selected (0.066 seconds) 0: jdbc:hive2://localhost:10009/> select * from tbl_src; ``` #### Case2 `kyuubi-defaults.conf` uses the default configuration and generates an bound source through `datagen`. At this point, duplicate results are returned after the FlinkSQL job status is`FINISHED`. ```sql Connected to: Apache Flink (version 1.14.0) Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT) Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating) 0: jdbc:hive2://localhost:10009/> create table tbl_src ( . . . . . . . . . . . . . . . . . > a bigint . . . . . . . . . . . . . . . . . > ) with ( . . . . . . . . . . . . . . . . . > 'connector' = 'datagen', . . . . . . . . . . . . . . . . . > 'rows-per-second'='1', . . . . . . . . . . . . . . . . . > 'fields.a.kind'='sequence', . . . . . . . . . . . . . . . . . > 'fields.a.start'='1', . . . . . . . . . . . . . . . . . > 'fields.a.end'='5' . . . . . . . . . . . . . . . . . > ); ... query[fb4fbfe3-7e7c-45d0-97a2-f4b5438714ee]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.004 seconds +---------+ | result | +---------+ +---------+ No rows selected (0.058 seconds) 0: jdbc:hive2://localhost:10009/> select * from tbl_src; ... 2023-01-04 10:06:20.336 WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher: An exception occurred when fetching query results java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (ca16d67ca30cfe4ea6e3fc0f93c65d21) at org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:917) at org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:931) at org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:719) at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) End of exception on server side>] at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_211] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_211] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163) ~[flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:128) [flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) [flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) [flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) [flink-table_2.12-1.14.0.jar:1.14.0] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:74) [flink-sql-client_2.12-1.14.0.jar:1.14.0] ... query[e1581803-04c4-4fb0-a09b-f2e8ffd0b157]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.02 seconds +----+ | a | +----+ | 1 | | 2 | | 3 | | 4 | | 1 | | 2 | | 3 | | 4 | ... | 1 | | 2 | | 3 | | 4 | | 5 | | 1 | | 2 | | 3 | | 4 | | 5 | +----+ 270 rows selected (6.675 seconds) ``` #### Case3 `kyuubi-defaults.conf` set `kyuubi.session.engine.flink.max.rows 100`, and generates an bound source through `datagen`. At this point, 100 duplicate results will be returned and the FlinkSQL job state will be 'CANCELED'. ```sql Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT) Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating) 0: jdbc:hive2://localhost:10009/> create table tbl_src ( . . . . . . . . . . . . . . . . . > a bigint . . . . . . . . . . . . . . . . . > ) with ( . . . . . . . . . . . . . . . . . > 'connector' = 'datagen', . . . . . . . . . . . . . . . . . > 'rows-per-second'='1', . . . . . . . . . . . . . . . . . > 'fields.a.kind'='sequence', . . . . . . . . . . . . . . . . . > 'fields.a.start'='1', . . . . . . . . . . . . . . . . . > 'fields.a.end'='5' . . . . . . . . . . . . . . . . . > ); ... query[31a19428-0f34-4b2e-9f21-1acf8e5e1885]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.031 seconds +---------+ | result | +---------+ | OK | +---------+ 1 row selected (0.315 seconds) 0: jdbc:hive2://localhost:10009/> select * from tbl_src; ... 2023-01-04 10:14:14.304 WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher: Interrupted when sleeping before a retry java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_211] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:237) [flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:113) [flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) [flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) [flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) [flink-table_2.12-1.14.0.jar:1.14.0] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:74) [flink-sql-client_2.12-1.14.0.jar:1.14.0] ... 2023-01-04 10:14:14.315 INFO org.apache.kyuubi.engine.flink.operation.ExecuteStatement: Processing anonymous's query[063b61fe-8e20-4325-ac25-762f65b3ccf2]: RUNNING_STATE -> FINISHED_STATE, time taken: 4.648 seconds 2023-01-04 10:14:14.321 INFO org.apache.kyuubi.operation.ExecuteStatement: Query[4e003511-c4c9-4661-bd48-d77822f7352c] in FINISHED_STATE 2023-01-04 10:14:14.322 INFO org.apache.kyuubi.operation.ExecuteStatement: Processing anonymous's query[4e003511-c4c9-4661-bd48-d77822f7352c]: RUNNING_STATE -> FINISHED_STATE, time taken: 0.004 seconds +----+ | a | +----+ | 1 | | 1 | | 1 | | 1 | | 1 | | 1 | | 1 | | 1 | | 1 | | 1 | | 1 | | 1 | | 1 | | 1 | | 1 | | 1 | | 1 | | 2 | | 1 | | 2 | ... | 1 | | 2 | | 3 | | 1 | | 2 | +----+ 100 rows selected (4.702 seconds) ``` ### Are you willing to submit PR? - [ ] Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix. - [ ] No. I cannot submit a PR at this time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
