bobo495 opened a new issue, #4083:
URL: https://github.com/apache/kyuubi/issues/4083

   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   
   
   ### Search before asking
   
   - [X] I have searched in the 
[issues](https://github.com/apache/incubator-kyuubi/issues?q=is%3Aissue) and 
found no similar issues.
   
   
   ### Describe the bug
   
   The 
[runQueryOperation](https://github.com/apache/kyuubi/blob/master/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala)
 method repeats the `snapshotResult` every 50ms and adds it to the `rows`. At 
this point, the Query will not have to return the result until the accumulation 
of `rows` rows over `kyuubi.session.engine.flink.max.rows` set value or 
FlinkSQL end.
   
   It is not reasonable to add `snapshotResult` repeatedly to `rows` in 
`runQueryOperation`. Should the result be returned directly after 
snapshotResult is fetched?
   
   These are the two changes I suggested. Are they appropriate?
   1. When `snapshotResult` gets the result, jumps out of the loop.
   ```scala
   if (rows.size != 0) {
       loop = false
   }
   ```
   2. Loop sets the maximum timeout to avoid being unable to exit the loop if 
the result is null.
   
   ### Environment
   
   - Flink 1.14.0
   - Kyuubi 1.7.0-SNAPSHOT(Compile from master)
   
   ### Reference UT
   ```scala
     test("data is repeatedly added to the resultSet") {
       withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "20"))(Map.empty) {
         withJdbcStatement() { statement =>
           statement.execute(
             """
               |create table tbl_src (
               |           a bigint
               |           ) with (
               |           'connector' = 'datagen',
               |          'rows-per-second'='1',
               |          'fields.a.kind'='sequence',
               |          'fields.a.start'='1',
               |          'fields.a.end'='5'
               |          )
               |""".stripMargin)
           val resultSet = statement.executeQuery(s"select a from tbl_src")
           var rows = List[Long]()
           while (resultSet.next()) {
             rows :+= resultSet.getLong("a")
           }
           // rows size more than the input data
           assert(rows.size <= 5)
         }
       }
     }
   ```
   ![shot_2023-01-04_18 05 
38](https://user-images.githubusercontent.com/32534808/210531365-a75e562f-cc5c-42b2-a955-e0cbb4c314ef.png)
   
   
   ### Affects Version(s)
   
   master
   
   ### Kyuubi Server Log Output
   
   _No response_
   
   ### Kyuubi Engine Log Output
   
   _No response_
   
   ### Kyuubi Server Configurations
   
   ```yaml
   kyuubi.engine.type FLINK_SQL
   ```
   
   
   ### Kyuubi Engine Configurations
   
   _No response_
   
   ### Additional context
   
   ### Test Case
   #### Case1 
   `kyuubi-defaults.conf` uses the default configuration and generates an 
unbound source through `datagen`. At this point, Kyuubi does not have a result 
output for a long time, and the FlinkSQL job does not finish.
   ```sql
   Connected to: Apache Flink (version 1.14.0)
   Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT)
   Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating)
   0: jdbc:hive2://localhost:10009/> create table tbl_src (
   . . . . . . . . . . . . . . . . . >            a bigint
   . . . . . . . . . . . . . . . . . >            ) with (
   . . . . . . . . . . . . . . . . . >            'connector' = 'datagen',
   . . . . . . . . . . . . . . . . . >           'rows-per-second'='1'
   . . . . . . . . . . . . . . . . . >           );
   ...
   query[7ca29418-03ff-40d9-bdba-b46eedb560b3]: RUNNING_STATE -> 
FINISHED_STATE, time taken: 0.001 seconds
   +---------+
   | result  |
   +---------+
   +---------+
   No rows selected (0.066 seconds)
   0: jdbc:hive2://localhost:10009/> select * from tbl_src;
   
   ```
   #### Case2
   `kyuubi-defaults.conf` uses the default configuration and generates an bound 
source through `datagen`. At this point, duplicate results are returned after 
the FlinkSQL job status is`FINISHED`.
   
   ```sql
   Connected to: Apache Flink (version 1.14.0)
   Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT)
   Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating)
   0: jdbc:hive2://localhost:10009/> create table tbl_src (
   . . . . . . . . . . . . . . . . . >            a bigint
   . . . . . . . . . . . . . . . . . >            ) with (
   . . . . . . . . . . . . . . . . . >            'connector' = 'datagen',
   . . . . . . . . . . . . . . . . . >           'rows-per-second'='1',
   . . . . . . . . . . . . . . . . . >           'fields.a.kind'='sequence',
   . . . . . . . . . . . . . . . . . >           'fields.a.start'='1',
   . . . . . . . . . . . . . . . . . >           'fields.a.end'='5'
   . . . . . . . . . . . . . . . . . >           );
   ...
   query[fb4fbfe3-7e7c-45d0-97a2-f4b5438714ee]: RUNNING_STATE -> 
FINISHED_STATE, time taken: 0.004 seconds
   +---------+
   | result  |
   +---------+
   +---------+
   No rows selected (0.058 seconds)
   0: jdbc:hive2://localhost:10009/> select * from tbl_src;
   ...
   2023-01-04 10:06:20.336 WARN 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher: An 
exception occurred when fetching query results
   java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., <Exception on server side:
   org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (ca16d67ca30cfe4ea6e3fc0f93c65d21)
        at 
org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:917)
        at 
org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:931)
        at 
org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:719)
        at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
   
   End of exception on server side>]
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
~[?:1.8.0_211]
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
~[?:1.8.0_211]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:163)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:128)
 [flink-dist_2.12-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 [flink-dist_2.12-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 [flink-dist_2.12-1.14.0.jar:1.14.0]
        at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
 [flink-table_2.12-1.14.0.jar:1.14.0]
        at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:74)
 [flink-sql-client_2.12-1.14.0.jar:1.14.0]
   ...
   query[e1581803-04c4-4fb0-a09b-f2e8ffd0b157]: RUNNING_STATE -> 
FINISHED_STATE, time taken: 0.02 seconds
   +----+
   | a  |
   +----+
   | 1  |
   | 2  |
   | 3  |
   | 4  |
   | 1  |
   | 2  |
   | 3  |
   | 4  |
   ...
   | 1  |
   | 2  |
   | 3  |
   | 4  |
   | 5  |
   | 1  |
   | 2  |
   | 3  |
   | 4  |
   | 5  |
   +----+
   270 rows selected (6.675 seconds)
   ```
   
   #### Case3
   `kyuubi-defaults.conf` set `kyuubi.session.engine.flink.max.rows 100`, and 
generates an bound source through `datagen`. At this point, 100 duplicate 
results will be returned and the FlinkSQL job state will be 'CANCELED'.
   ```sql
   Driver: Kyuubi Project Hive JDBC Client (version 1.7.0-SNAPSHOT)
   Beeline version 1.7.0-SNAPSHOT by Apache Kyuubi (Incubating)
   0: jdbc:hive2://localhost:10009/> create table tbl_src (
   . . . . . . . . . . . . . . . . . >            a bigint
   . . . . . . . . . . . . . . . . . >            ) with (
   . . . . . . . . . . . . . . . . . >            'connector' = 'datagen',
   . . . . . . . . . . . . . . . . . >           'rows-per-second'='1',
   . . . . . . . . . . . . . . . . . >           'fields.a.kind'='sequence',
   . . . . . . . . . . . . . . . . . >           'fields.a.start'='1',
   . . . . . . . . . . . . . . . . . >           'fields.a.end'='5'
   . . . . . . . . . . . . . . . . . >           );
   ...
   query[31a19428-0f34-4b2e-9f21-1acf8e5e1885]: RUNNING_STATE -> 
FINISHED_STATE, time taken: 0.031 seconds
   +---------+
   | result  |
   +---------+
   | OK      |
   +---------+
   1 row selected (0.315 seconds)
   0: jdbc:hive2://localhost:10009/> select * from tbl_src;
   ...
   2023-01-04 10:14:14.304 WARN 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher: 
Interrupted when sleeping before a retry
   java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_211]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:237)
 [flink-dist_2.12-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:113)
 [flink-dist_2.12-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 [flink-dist_2.12-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 [flink-dist_2.12-1.14.0.jar:1.14.0]
        at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
 [flink-table_2.12-1.14.0.jar:1.14.0]
        at 
org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:74)
 [flink-sql-client_2.12-1.14.0.jar:1.14.0]
   ...
   2023-01-04 10:14:14.315 INFO 
org.apache.kyuubi.engine.flink.operation.ExecuteStatement: Processing 
anonymous's query[063b61fe-8e20-4325-ac25-762f65b3ccf2]: RUNNING_STATE -> 
FINISHED_STATE, time taken: 4.648 seconds
   2023-01-04 10:14:14.321 INFO org.apache.kyuubi.operation.ExecuteStatement: 
Query[4e003511-c4c9-4661-bd48-d77822f7352c] in FINISHED_STATE
   2023-01-04 10:14:14.322 INFO org.apache.kyuubi.operation.ExecuteStatement: 
Processing anonymous's query[4e003511-c4c9-4661-bd48-d77822f7352c]: 
RUNNING_STATE -> FINISHED_STATE, time taken: 0.004 seconds
   +----+
   | a  |
   +----+
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 1  |
   | 2  |
   | 1  |
   | 2  |
   ...
   | 1  |
   | 2  |
   | 3  |
   | 1  |
   | 2  |
   +----+
   100 rows selected (4.702 seconds)
   ```
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes. I would be willing to submit a PR with guidance from the Kyuubi 
community to fix.
   - [ ] No. I cannot submit a PR at this time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to