This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 84928184fc [KYUUBI #7121] Improve operation timeout management with 
configurable executors
84928184fc is described below

commit 84928184fc78bb2dd0a8fa54ffbb9b2cd3987309
Author: wangzhigang <[email protected]>
AuthorDate: Wed Jul 9 10:51:30 2025 +0800

    [KYUUBI #7121] Improve operation timeout management with configurable 
executors
    
    ### Why are the changes needed?
    
    The current mechanism for handling operation timeouts in Kyuubi creates a 
new `ScheduledExecutorService` with a dedicated thread for each operation. In 
scenarios with a large number of concurrent operations, this results in 
excessive thread creation, which consumes substantial system resources and may 
adversely affect server performance and stability.
    
    This PR introduces a shared `ScheduledThreadPool` within the Operation 
Manager to centrally schedule operation timeouts. This approach avoids the 
overhead of creating an excessive number of threads, thereby reducing the 
system load. Additionally, both the pool size and thread keep-alive time are 
configurable via the `OPERATION_TIMEOUT_POOL_SIZE` and 
`OPERATION_TIMEOUT_POOL_KEEPALIVE_TIME` parameters.
    
    ### How was this patch tested?
    
    A new unit test for `newDaemonScheduledThreadPool` was added to 
`ThreadUtilsSuite.scala`. Furthermore, a dedicated `TimeoutSchedulerSuite` was 
introduced to verify operation timeout behavior.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #7121 from wangzhigang1999/master.
    
    Closes #7121
    
    df7688dbf [wangzhigang] Refactor timeout management configuration and 
improve documentation
    2b03b1e68 [wangzhigang] Remove deprecated `ThreadPoolTimeoutExecutor` class 
following refactor of operation timeout management.
    52a8a516a [wangzhigang] Refactor operation timeout management to use 
per-OperationManager scheduler
    7e46d47f8 [wangzhigang] Refactor timeout management by introducing 
ThreadPoolTimeoutExecutor
    f7f10881a [wangzhigang] Add operation timeout management with 
ThreadPoolTimeoutExecutor
    d8cd6c7d4 [wangzhigang] Update .gitignore to exclude .bloop and .metals 
directories
    
    Lead-authored-by: wangzhigang <[email protected]>
    Co-authored-by: wangzhigang <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .gitignore                                         |  2 +
 docs/configuration/settings.md                     | 46 +++++++++---------
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 14 ++++++
 .../kyuubi/operation/AbstractOperation.scala       | 17 +++----
 .../apache/kyuubi/operation/OperationManager.scala | 32 +++++++++++++
 .../scala/org/apache/kyuubi/util/ThreadUtils.scala | 18 +++++++
 .../operation/timeout/TimeoutSchedulerSuite.scala  | 55 ++++++++++++++++++++++
 .../org/apache/kyuubi/util/ThreadUtilsSuite.scala  | 15 ++++++
 .../apache/kyuubi/operation/KyuubiOperation.scala  |  4 ++
 9 files changed, 173 insertions(+), 30 deletions(-)

diff --git a/.gitignore b/.gitignore
index 9f9ba71548..9a36db2d86 100644
--- a/.gitignore
+++ b/.gitignore
@@ -26,6 +26,8 @@
 *.swp
 *~
 .DS_Store
+.bloop/
+.metals/
 .cache
 .classpath
 .ensime
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 493661bddb..8bbbcfed62 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -416,28 +416,30 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 
 ### Operation
 
-|                       Key                        |                           
          Default                                     |                         
                                                                                
                                                                                
                                                        Meaning                 
                                                                                
              [...]
-|--------------------------------------------------|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| kyuubi.operation.getTables.ignoreTableProperties | false                     
                                                      | Speed up the 
`GetTables` operation by ignoring `tableTypes` query criteria, and returning 
table identities only.                                                          
                                                                                
                                                                                
                            [...]
-| kyuubi.operation.idle.timeout                    | PT3H                      
                                                      | Operation will be 
closed when it's not accessed for this duration of time                         
                                                                                
                                                                                
                                                                                
                    [...]
-| kyuubi.operation.interrupt.on.cancel             | true                      
                                                      | When true, all running 
tasks will be interrupted if one cancels a query. When false, all running tasks 
will remain until finished.                                                     
                                                                                
                                                                                
               [...]
-| kyuubi.operation.language                        | SQL                       
                                                      | Choose a programing 
language for the following inputs<ul><li>SQL: (Default) Run all following 
statements as SQL queries.</li><li>SCALA: Run all following input as scala 
codes</li><li>PYTHON: (Experimental) Run all following input as Python codes 
with Spark engine</li></ul>                                                     
                                [...]
-| kyuubi.operation.log.dir.root                    | server_operation_logs     
                                                      | Root directory for 
query operation log at server-side.                                             
                                                                                
                                                                                
                                                                                
                   [...]
-| kyuubi.operation.plan.only.excludes              | 
SetCatalogAndNamespace,UseStatement,SetNamespaceCommand,SetCommand,ResetCommand 
| Comma-separated list of query plan names, in the form of simple class names, 
i.e, for `SET abc=xyz`, the value will be `SetCommand`. For those auxiliary 
plans, such as `switch databases`, `set properties`, or `create temporary view` 
etc., which are used for setup evaluating environments for analyzing actual 
queries, we can use this config to exclude them  [...]
-| kyuubi.operation.plan.only.mode                  | none                      
                                                      | Configures the 
statement performed mode, The value can be 'parse', 'analyze', 'optimize', 
'optimize_with_stats', 'physical', 'execution', 'lineage' or 'none', when it is 
'none', indicate to the statement will be fully executed, otherwise only way 
without executing the query. different engines currently support different 
modes, the Spark engine supports al [...]
-| kyuubi.operation.plan.only.output.style          | plain                     
                                                      | Configures the planOnly 
output style. The value can be 'plain' or 'json', and the default value is 
'plain'. This configuration supports only the output styles of the Spark engine 
                                                                                
                                                                                
                   [...]
-| kyuubi.operation.progress.enabled                | false                     
                                                      | Whether to enable the 
operation progress. When true, the operation progress will be returned in 
`GetOperationStatus`.                                                           
                                                                                
                                                                                
                      [...]
-| kyuubi.operation.query.timeout                   | &lt;undefined&gt;         
                                                      | Timeout for query 
executions at server-side, take effect with client-side 
timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be 
cancelled automatically if timeout. It's off by default, which means only 
client-side take full control of whether the query should timeout or not. If 
set, client-side timeout is capped at this point. To [...]
-| kyuubi.operation.result.arrow.timestampAsString  | false                     
                                                      | When true, arrow-based 
rowsets will convert columns of type timestamp to strings for transmission.     
                                                                                
                                                                                
                                                                                
               [...]
-| kyuubi.operation.result.format                   | thrift                    
                                                      | Specify the result 
format, available configs are: <ul> <li>THRIFT: the result will convert to TRow 
at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow 
at the executor side before collecting by the driver, and deserialized at the 
client side. note that it only takes effect for kyuubi-hive-jdbc clients 
now.</li></ul>              [...]
-| kyuubi.operation.result.max.rows                 | 0                         
                                                      | Max rows of Spark query 
results. Rows exceeding the limit would be ignored. By setting this value to 0 
to disable the max rows limit.                                                  
                                                                                
                                                                                
               [...]
-| kyuubi.operation.result.saveToFile.dir           | 
/tmp/kyuubi/tmp_kyuubi_result                                                   
| The Spark query result save dir, it should be a public accessible to every 
engine. Results are saved in ORC format, and the directory structure is 
`/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`. Each query 
result will delete when query finished.                                         
                                                   [...]
-| kyuubi.operation.result.saveToFile.enabled       | false                     
                                                      | The switch for Spark 
query result save to file.                                                      
                                                                                
                                                                                
                                                                                
                 [...]
-| kyuubi.operation.result.saveToFile.minRows       | 10000                     
                                                      | The minRows of Spark 
result save to file, default value is 10000.                                    
                                                                                
                                                                                
                                                                                
                 [...]
-| kyuubi.operation.result.saveToFile.minSize       | 209715200                 
                                                      | The minSize of Spark 
result save to file, default value is 200 MB.we use spark's 
`EstimationUtils#getSizePerRowestimate` to estimate the output size of the 
execution plan.                                                                 
                                                                                
                                          [...]
-| kyuubi.operation.scheduler.pool                  | &lt;undefined&gt;         
                                                      | The scheduler pool of 
job. Note that, this config should be used after changing Spark config 
spark.scheduler.mode=FAIR.                                                      
                                                                                
                                                                                
                         [...]
-| kyuubi.operation.spark.listener.enabled          | true                      
                                                      | When set to true, Spark 
engine registers an SQLOperationListener before executing the statement, 
logging a few summary statistics when each stage completes.                     
                                                                                
                                                                                
                     [...]
-| kyuubi.operation.status.polling.timeout          | PT5S                      
                                                      | Timeout(ms) for long 
polling asynchronous running sql query's status                                 
                                                                                
                                                                                
                                                                                
                 [...]
+|                       Key                        |                           
          Default                                     |                         
                                                                                
                                                                                
                                                        Meaning                 
                                                                                
              [...]
+|--------------------------------------------------|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| kyuubi.operation.getTables.ignoreTableProperties | false                     
                                                      | Speed up the 
`GetTables` operation by ignoring `tableTypes` query criteria, and returning 
table identities only.                                                          
                                                                                
                                                                                
                            [...]
+| kyuubi.operation.idle.timeout                    | PT3H                      
                                                      | Operation will be 
closed when it's not accessed for this duration of time                         
                                                                                
                                                                                
                                                                                
                    [...]
+| kyuubi.operation.interrupt.on.cancel             | true                      
                                                      | When true, all running 
tasks will be interrupted if one cancels a query. When false, all running tasks 
will remain until finished.                                                     
                                                                                
                                                                                
               [...]
+| kyuubi.operation.language                        | SQL                       
                                                      | Choose a programing 
language for the following inputs<ul><li>SQL: (Default) Run all following 
statements as SQL queries.</li><li>SCALA: Run all following input as scala 
codes</li><li>PYTHON: (Experimental) Run all following input as Python codes 
with Spark engine</li></ul>                                                     
                                [...]
+| kyuubi.operation.log.dir.root                    | server_operation_logs     
                                                      | Root directory for 
query operation log at server-side.                                             
                                                                                
                                                                                
                                                                                
                   [...]
+| kyuubi.operation.plan.only.excludes              | 
SetCatalogAndNamespace,UseStatement,SetNamespaceCommand,SetCommand,ResetCommand 
| Comma-separated list of query plan names, in the form of simple class names, 
i.e, for `SET abc=xyz`, the value will be `SetCommand`. For those auxiliary 
plans, such as `switch databases`, `set properties`, or `create temporary view` 
etc., which are used for setup evaluating environments for analyzing actual 
queries, we can use this config to exclude them  [...]
+| kyuubi.operation.plan.only.mode                  | none                      
                                                      | Configures the 
statement performed mode, The value can be 'parse', 'analyze', 'optimize', 
'optimize_with_stats', 'physical', 'execution', 'lineage' or 'none', when it is 
'none', indicate to the statement will be fully executed, otherwise only way 
without executing the query. different engines currently support different 
modes, the Spark engine supports al [...]
+| kyuubi.operation.plan.only.output.style          | plain                     
                                                      | Configures the planOnly 
output style. The value can be 'plain' or 'json', and the default value is 
'plain'. This configuration supports only the output styles of the Spark engine 
                                                                                
                                                                                
                   [...]
+| kyuubi.operation.progress.enabled                | false                     
                                                      | Whether to enable the 
operation progress. When true, the operation progress will be returned in 
`GetOperationStatus`.                                                           
                                                                                
                                                                                
                      [...]
+| kyuubi.operation.query.timeout                   | &lt;undefined&gt;         
                                                      | Timeout for query 
executions at server-side, take effect with client-side 
timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be 
cancelled automatically if timeout. It's off by default, which means only 
client-side take full control of whether the query should timeout or not. If 
set, client-side timeout is capped at this point. To [...]
+| kyuubi.operation.result.arrow.timestampAsString  | false                     
                                                      | When true, arrow-based 
rowsets will convert columns of type timestamp to strings for transmission.     
                                                                                
                                                                                
                                                                                
               [...]
+| kyuubi.operation.result.format                   | thrift                    
                                                      | Specify the result 
format, available configs are: <ul> <li>THRIFT: the result will convert to TRow 
at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow 
at the executor side before collecting by the driver, and deserialized at the 
client side. note that it only takes effect for kyuubi-hive-jdbc clients 
now.</li></ul>              [...]
+| kyuubi.operation.result.max.rows                 | 0                         
                                                      | Max rows of Spark query 
results. Rows exceeding the limit would be ignored. By setting this value to 0 
to disable the max rows limit.                                                  
                                                                                
                                                                                
               [...]
+| kyuubi.operation.result.saveToFile.dir           | 
/tmp/kyuubi/tmp_kyuubi_result                                                   
| The Spark query result save dir, it should be a public accessible to every 
engine. Results are saved in ORC format, and the directory structure is 
`/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`. Each query 
result will delete when query finished.                                         
                                                   [...]
+| kyuubi.operation.result.saveToFile.enabled       | false                     
                                                      | The switch for Spark 
query result save to file.                                                      
                                                                                
                                                                                
                                                                                
                 [...]
+| kyuubi.operation.result.saveToFile.minRows       | 10000                     
                                                      | The minRows of Spark 
result save to file, default value is 10000.                                    
                                                                                
                                                                                
                                                                                
                 [...]
+| kyuubi.operation.result.saveToFile.minSize       | 209715200                 
                                                      | The minSize of Spark 
result save to file, default value is 200 MB.we use spark's 
`EstimationUtils#getSizePerRowestimate` to estimate the output size of the 
execution plan.                                                                 
                                                                                
                                          [...]
+| kyuubi.operation.scheduler.pool                  | &lt;undefined&gt;         
                                                      | The scheduler pool of 
job. Note that, this config should be used after changing Spark config 
spark.scheduler.mode=FAIR.                                                      
                                                                                
                                                                                
                         [...]
+| kyuubi.operation.spark.listener.enabled          | true                      
                                                      | When set to true, Spark 
engine registers an SQLOperationListener before executing the statement, 
logging a few summary statistics when each stage completes.                     
                                                                                
                                                                                
                     [...]
+| kyuubi.operation.status.polling.timeout          | PT5S                      
                                                      | Timeout(ms) for long 
polling asynchronous running sql query's status                                 
                                                                                
                                                                                
                                                                                
                 [...]
+| kyuubi.operation.timeout.pool.keepalive.time     | PT1M                      
                                                      | Keep-alive time for 
idle threads in the timeout scheduler pool.                                     
                                                                                
                                                                                
                                                                                
                  [...]
+| kyuubi.operation.timeout.pool.size               | 8                         
                                                      | Number of threads in 
the timeout scheduler pool used for operation timeout monitoring.               
                                                                                
                                                                                
                                                                                
                 [...]
 
 ### Server
 
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 5ca5c78a8d..b1589811c9 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2212,6 +2212,20 @@ object KyuubiConf {
       .checkValue(_ >= 1000, "must >= 1s if set")
       .createOptional
 
+  val OPERATION_TIMEOUT_POOL_SIZE: ConfigEntry[Int] =
+    buildConf("kyuubi.operation.timeout.pool.size")
+      .doc("Number of threads in the timeout scheduler pool used for operation 
timeout monitoring.")
+      .version("1.11.0")
+      .intConf
+      .createWithDefault(8)
+
+  val OPERATION_TIMEOUT_POOL_KEEPALIVE_TIME: ConfigEntry[Long] =
+    buildConf("kyuubi.operation.timeout.pool.keepalive.time")
+      .doc("Keep-alive time for idle threads in the timeout scheduler pool.")
+      .version("1.11.0")
+      .timeConf
+      .createWithDefault(Duration.ofSeconds(60).toMillis)
+
   val OPERATION_QUERY_TIMEOUT_MONITOR_ENABLED: ConfigEntry[Boolean] =
     buildConf("kyuubi.operation.query.timeout.monitor.enabled")
       .doc("Whether to monitor timeout query timeout check on server side.")
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 01948a71fb..d0243621ea 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -18,7 +18,7 @@
 package org.apache.kyuubi.operation
 
 import java.io.IOException
-import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.{Future, ScheduledFuture}
 import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.JavaConverters._
@@ -32,7 +32,6 @@ import org.apache.kyuubi.operation.OperationState._
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TFetchResultsResp, 
TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TStatus, 
TStatusCode}
-import org.apache.kyuubi.util.ThreadUtils
 
 abstract class AbstractOperation(session: Session) extends Operation with 
Logging {
 
@@ -45,7 +44,7 @@ abstract class AbstractOperation(session: Session) extends 
Operation with Loggin
 
   final private[kyuubi] val statementId = handle.identifier.toString
 
-  private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
+  private var timeoutFuture: Option[ScheduledFuture[_]] = None
 
   private val lock: ReentrantLock = new ReentrantLock()
 
@@ -60,8 +59,6 @@ abstract class AbstractOperation(session: Session) extends 
Operation with Loggin
 
   protected def addTimeoutMonitor(queryTimeout: Long): Unit = {
     if (queryTimeout > 0) {
-      val timeoutExecutor =
-        
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", 
false)
       val action: Runnable = () =>
         // Clients less than version 2.1 have no HIVE-4924 Patch,
         // no queryTimeout parameter and no TIMEOUT status.
@@ -74,13 +71,17 @@ abstract class AbstractOperation(session: Session) extends 
Operation with Loggin
         } else {
           cleanup(OperationState.TIMEOUT)
         }
-      timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
-      statementTimeoutCleaner = Some(timeoutExecutor)
+
+      val future = 
session.sessionManager.operationManager.scheduleTimeout(action, queryTimeout)
+      timeoutFuture = Some(future)
     }
   }
 
   protected def shutdownTimeoutMonitor(): Unit = {
-    statementTimeoutCleaner.foreach(_.shutdown())
+    timeoutFuture.foreach { future =>
+      session.sessionManager.operationManager.cancelTimeout(future)
+    }
+    timeoutFuture = None
   }
 
   override def getOperationLog: Option[OperationLog] = None
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
index 0b19e68ffb..bebaf269bd 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
@@ -17,6 +17,8 @@
 
 package org.apache.kyuubi.operation
 
+import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, 
TimeUnit}
+
 import scala.collection.JavaConverters._
 
 import org.apache.kyuubi.KyuubiSQLException
@@ -28,6 +30,7 @@ import org.apache.kyuubi.operation.log.LogDivertAppender
 import org.apache.kyuubi.service.AbstractService
 import org.apache.kyuubi.session.Session
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
+import org.apache.kyuubi.util.ThreadUtils
 
 /**
  * The [[OperationManager]] manages all the operations during their lifecycle.
@@ -40,6 +43,9 @@ abstract class OperationManager(name: String) extends 
AbstractService(name) {
 
   protected def skipOperationLog: Boolean = false
 
+  /* Scheduler used for query timeout tasks */
+  @volatile private var timeoutScheduler: ScheduledExecutorService = _
+
   def getOperationCount: Int = handleToOperation.size()
 
   def allOperations(): Iterable[Operation] = handleToOperation.values().asScala
@@ -47,6 +53,32 @@ abstract class OperationManager(name: String) extends 
AbstractService(name) {
   override def initialize(conf: KyuubiConf): Unit = {
     LogDivertAppender.initialize(skipOperationLog)
     super.initialize(conf)
+
+    val timeoutPoolSize = conf.get(KyuubiConf.OPERATION_TIMEOUT_POOL_SIZE)
+    val timeoutPoolKeepAliveTime = 
conf.get(KyuubiConf.OPERATION_TIMEOUT_POOL_KEEPALIVE_TIME)
+    timeoutScheduler = ThreadUtils.newDaemonScheduledThreadPool(
+      timeoutPoolSize,
+      timeoutPoolKeepAliveTime,
+      "operation-timeout")
+  }
+
+  override def stop(): Unit = synchronized {
+    if (timeoutScheduler != null) {
+      ThreadUtils.shutdown(timeoutScheduler)
+      timeoutScheduler = null
+    }
+    super.stop()
+  }
+
+  /** Schedule a timeout task using the internal scheduler */
+  def scheduleTimeout(action: Runnable, timeoutSeconds: Long): 
ScheduledFuture[_] = {
+    timeoutScheduler.schedule(action, timeoutSeconds, TimeUnit.SECONDS)
+  }
+
+  def cancelTimeout(future: ScheduledFuture[_]): Unit = {
+    if (future != null && !future.isCancelled) {
+      future.cancel(false)
+    }
   }
 
   def newExecuteStatementOperation(
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
index aeab37b6f1..dabb25dc96 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala
@@ -67,6 +67,24 @@ object ThreadUtils extends Logging {
     
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
   }
 
+  def newDaemonScheduledThreadPool(
+      poolSize: Int,
+      keepAliveSec: Long,
+      prefix: String,
+      removeOnCancel: Boolean = true,
+      executeDelayedTasksAfterShutdown: Boolean = false): 
ScheduledThreadPoolExecutor = {
+    val threadFactory = new NamedThreadFactory(prefix, daemon = true)
+    val executor = new ScheduledThreadPoolExecutor(poolSize, threadFactory)
+    executor.setRemoveOnCancelPolicy(removeOnCancel)
+    
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(executeDelayedTasksAfterShutdown)
+    if (keepAliveSec > 0) {
+      executor.setKeepAliveTime(keepAliveSec, TimeUnit.SECONDS)
+      executor.allowCoreThreadTimeOut(true)
+    }
+    info(s"$prefix: pool size: $poolSize, keepalive time: $keepAliveSec s")
+    executor
+  }
+
   def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
     try {
       // `awaitPermission` is not actually used anywhere so it's safe to pass 
in null here.
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/timeout/TimeoutSchedulerSuite.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/timeout/TimeoutSchedulerSuite.scala
new file mode 100644
index 0000000000..fb2e2188fc
--- /dev/null
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/timeout/TimeoutSchedulerSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.operation.timeout
+
+import java.util.concurrent.CountDownLatch
+
+import scala.concurrent.duration._
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.operation.NoopOperationManager
+
+class TimeoutSchedulerSuite extends KyuubiFunSuite {
+
+  test("scheduler lifecycle and functionality via OperationManager") {
+    val conf = new KyuubiConf()
+    conf.set(KyuubiConf.OPERATION_TIMEOUT_POOL_SIZE, 2)
+    conf.set(KyuubiConf.OPERATION_TIMEOUT_POOL_KEEPALIVE_TIME, 30000L)
+
+    val manager = new NoopOperationManager()
+    manager.initialize(conf)
+    manager.start()
+
+    val latch = new CountDownLatch(1)
+    val future =
+      manager.scheduleTimeout(new Runnable { override def run(): Unit = 
latch.countDown() }, 1)
+
+    // wait until latch count becomes 0
+    eventually(timeout(5.seconds), interval(100.millis)) {
+      assert(latch.getCount == 0)
+    }
+    assert(!future.isCancelled)
+
+    // Test cancellation path
+    val longFuture = manager.scheduleTimeout(new Runnable { override def 
run(): Unit = {} }, 10)
+    manager.cancelTimeout(longFuture)
+    assert(longFuture.isCancelled)
+
+    manager.stop()
+  }
+}
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala
index ccedd6d916..a0b8456722 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/util/ThreadUtilsSuite.scala
@@ -61,4 +61,19 @@ class ThreadUtilsSuite extends KyuubiFunSuite {
     service.awaitTermination(10, TimeUnit.SECONDS)
     assert(threadName startsWith "")
   }
+
+  test("New daemon scheduled thread pool") {
+    val pool = ThreadUtils.newDaemonScheduledThreadPool(2, 10, 
"ThreadUtilsSchedTest")
+    // submit a task to ensure pool operational
+    @volatile var ran = false
+    val fut = pool.schedule(
+      new Runnable { override def run(): Unit = ran = true },
+      100,
+      TimeUnit.MILLISECONDS)
+    fut.get(5, TimeUnit.SECONDS)
+    assert(ran)
+    assert(pool.getCorePoolSize == 2)
+    ThreadUtils.shutdown(pool)
+    assert(pool.isShutdown)
+  }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index 9de1dfb2b9..897c90c239 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -147,6 +147,8 @@ abstract class KyuubiOperation(session: Session) extends 
AbstractOperation(sessi
         }
       }
     }
+    // Clean up timeout monitor when operation is cancelled
+    shutdownTimeoutMonitor()
   }
 
   override def close(): Unit = withLockRequired {
@@ -162,6 +164,8 @@ abstract class KyuubiOperation(session: Session) extends 
AbstractOperation(sessi
         }
       }
     }
+    // Clean up timeout monitor to prevent memory leaks
+    shutdownTimeoutMonitor()
     try {
       // For launch engine operation, we use OperationLog to pass engine 
submit log but
       // at that time we do not have remoteOpHandle


Reply via email to