This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new 9763d9313 [KYUUBI #5392] Add query timeout monitor on server-side in 
ExecuteStatement
9763d9313 is described below

commit 9763d9313115e52a598a957aec3c459f1657ddca
Author: Bowen Liang <liangbo...@gf.com.cn>
AuthorDate: Wed Oct 18 21:46:06 2023 +0800

    [KYUUBI #5392] Add query timeout monitor on server-side in ExecuteStatement
    
    ### _Why are the changes needed?_
    
    As reported in #5392, currently the server is unable to guarantee that the 
statement timed-out when the engine may have no proper response for the 
server's request therefore the query timeout does not work.
    
    Introduce a server-side statement query timeout monitor, to ensure the 
time-out query statements are set to TIMEOUT state and help the JDBC client get 
out of the blocked status.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    
    No.
    
    Closes #5398 from bowenliang123/stmt-timeout.
    
    Closes #5392
    
    f5733b3f9 [Bowen Liang] use addTimeoutMonitor for server-side query timeout 
checks
    
    Authored-by: Bowen Liang <liangbo...@gf.com.cn>
    Signed-off-by: liangbowen <liangbo...@gf.com.cn>
    (cherry picked from commit bdc28acf411e2cb48c1e5e7699f4aa2db286045a)
    Signed-off-by: liangbowen <liangbo...@gf.com.cn>
---
 .../main/scala/org/apache/kyuubi/config/KyuubiConf.scala    |  9 +++++++++
 .../org/apache/kyuubi/operation/ExecuteStatement.scala      | 13 ++++++++++++-
 2 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 9cef84d6b..87f8fcad1 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1868,6 +1868,15 @@ object KyuubiConf {
       .checkValue(_ >= 1000, "must >= 1s if set")
       .createOptional
 
+  val OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED: ConfigEntry[Boolean] =
+    buildConf("kyuubi.operation.query.timeout.monitor.enabled")
+      .doc("Whether to monitor timeout query timeout check on server side.")
+      .version("1.8.0")
+      .serverOnly
+      .internal
+      .booleanConf
+      .createWithDefault(true)
+
   val OPERATION_RESULT_MAX_ROWS: ConfigEntry[Int] =
     buildConf("kyuubi.operation.result.max.rows")
       .doc("Max rows of Spark query results. Rows exceeding the limit would be 
ignored. " +
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 4767cbf12..86bd3f8c8 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -25,6 +25,7 @@ import org.apache.hive.service.rpc.thrift.TOperationState._
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf
+import 
org.apache.kyuubi.config.KyuubiConf.OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
 import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
 import org.apache.kyuubi.operation.log.OperationLog
@@ -58,6 +59,10 @@ class ExecuteStatement(
     OperationLog.removeCurrentOperationLog()
   }
 
+  private val isTimeoutMonitorEnabled: Boolean = confOverlay.getOrElse[String](
+    OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED.key,
+    OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED.defaultValStr).toBoolean
+
   private def executeStatement(): Unit = {
     try {
       // We need to avoid executing query in sync mode, because there is no 
heartbeat mechanism
@@ -84,7 +89,7 @@ class ExecuteStatement(
       var lastStateUpdateTime: Long = 0L
       val stateUpdateInterval =
         
session.sessionManager.getConf.get(KyuubiConf.OPERATION_STATUS_UPDATE_INTERVAL)
-      while (!isComplete) {
+      while (!isComplete && !isTerminalState(state)) {
         fetchQueryLog()
         verifyTStatus(statusResp.getStatus)
         if (statusResp.getProgressUpdateResponse != null) {
@@ -143,6 +148,9 @@ class ExecuteStatement(
       // see if anymore log could be fetched
       fetchQueryLog()
     } catch onError()
+    finally {
+      shutdownTimeoutMonitor()
+    }
 
   private def fetchQueryLog(): Unit = {
     getOperationLog.foreach { logger =>
@@ -157,6 +165,9 @@ class ExecuteStatement(
   }
 
   override protected def runInternal(): Unit = {
+    if (isTimeoutMonitorEnabled) {
+      addTimeoutMonitor(queryTimeout)
+    }
     executeStatement()
     val sessionManager = session.sessionManager
     val asyncOperation: Runnable = () => waitStatementComplete()

Reply via email to