This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new db57e9365d [KYUUBI #6587] Periodically expire temp files and operation 
logs on server to avoid memeory leak by Files.deleteOnExit
db57e9365d is described below

commit db57e9365d7933942197936ac7ff711d58f7ea91
Author: Bowen Liang <liangbo...@gf.com.cn>
AuthorDate: Wed Aug 28 17:13:27 2024 +0800

    [KYUUBI #6587] Periodically expire temp files and operation logs on server 
to avoid memeory leak by Files.deleteOnExit
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    -
    
    ## Describe Your Solution ๐Ÿ”ง
    Fix the memory leak on server caused by `Files.deleteOnExit`.
    For long-running Kyuubi server instances, some operation log files and 
batch job upload files are marked for deletion at exit using 
`Files.deleteOnExit`. However, the `files` list within the `DeleteOnExitHook`  
by `Files.deleteOnExit` method continuously accumulates file paths without 
being cleaned up, leading to a memory leak issue.
    
    This PR fix this issue by:
    1. introduce a new util `FileExpirationUtils` for similar use of 
`Files.deleteOnExit`, with exposed method for evict file path from the list to 
prevent accumulative path list
    2. adding a service `TempFileService ` in server module, periodical 
clean-up the files for operation logging path, uploaded resources and etc. And 
it evict the paths in `TempFileCleanupUtils` instance after cleanup.
    3. add the new config `kyuubi.server.tempFile.expireTime` with a default 
value of 7 days, to control How often to trigger a file expiration clean-up for 
stale files
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [x] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [ ] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6587 from bowenliang123/file-expiration.
    
    Closes #6587
    
    e23b72e08 [liangbowen] change to P14D
    acaf370e7 [liangbowen] change config name to 
kyuubi.server.tempFile.expireTime
    6c7ddd527 [liangbowen] import
    ed1e4d76f [liangbowen] comment: ConcurrentHashMap.newKeySet
    fbf73ccb4 [liangbowen] update
    34d3fc71c [liangbowen] add guava to common module's dep
    49c10e5ef [Bowen Liang] file expiration
    
    Lead-authored-by: Bowen Liang  <liangbo...@gf.com.cn>
    Co-authored-by: liangbowen <liangbo...@gf.com.cn>
    Co-authored-by: Bowen Liang <liangbo...@gf.com.cn>
    Signed-off-by: liangbowen <liangbo...@gf.com.cn>
---
 .gitignore                                         |  1 +
 docs/configuration/settings.md                     | 37 ++++-----
 .../engine/spark/operation/ExecutePython.scala     |  3 +-
 kyuubi-common/pom.xml                              |  5 ++
 .../src/main/scala/org/apache/kyuubi/Utils.scala   | 12 ++-
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  8 ++
 .../apache/kyuubi/operation/log/OperationLog.scala | 16 ++--
 .../apache/kyuubi/service/TempFileService.scala    | 91 ++++++++++++++++++++++
 .../apache/kyuubi/session/AbstractSession.scala    |  5 +-
 .../apache/kyuubi/util/TempFileCleanupUtils.scala  | 65 ++++++++++++++++
 .../org/apache/kyuubi/server/KyuubiServer.scala    |  7 +-
 .../kyuubi/server/api/v1/BatchesResource.scala     |  8 +-
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  |  2 +
 .../kyuubi/session/KyuubiSessionManager.scala      |  4 +
 .../kyuubi/server/TempFileServiceSuite.scala       | 53 +++++++++++++
 15 files changed, 284 insertions(+), 33 deletions(-)

diff --git a/.gitignore b/.gitignore
index 29bb61545c..9f9ba71548 100644
--- a/.gitignore
+++ b/.gitignore
@@ -67,6 +67,7 @@ embedded_zookeeper/
 /externals/kyuubi-spark-sql-engine/engine_operation_logs/
 /externals/kyuubi-spark-sql-engine/spark-warehouse/
 /work/
+/upload/
 /docs/_build/
 /kyuubi-common/metrics/
 /kyuubi-server/metrics/
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 68014a5cf3..525316b8e0 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -433,24 +433,25 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 
 ### Server
 
-|                           Key                            |      Default      
|                                                                               
                                                  Meaning                       
                                                                                
                          |   Type   | Since |
-|----------------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
-| kyuubi.server.administrators                                                
|| Comma-separated list of Kyuubi service administrators. We use this config to 
grant admin permission to any service accounts when security mechanism is 
enabled. Note, when kyuubi.authentication is configured to NOSASL or NONE, 
everyone is treated as administrator. | set      | 1.8.0 |
-| kyuubi.server.info.provider                              | ENGINE            
| The server information provider name, some clients may rely on this 
information to check the server compatibilities and functionalities. 
<li>SERVER: Return Kyuubi server information.</li> <li>ENGINE: Return Kyuubi 
engine information.</li>                          | string   | 1.6.1 |
-| kyuubi.server.limit.batch.connections.per.ipaddress      | &lt;undefined&gt; 
| Maximum kyuubi server batch connections per ipaddress. Any user exceeding 
this limit will not be allowed to connect.                                      
                                                                                
                              | int      | 1.7.0 |
-| kyuubi.server.limit.batch.connections.per.user           | &lt;undefined&gt; 
| Maximum kyuubi server batch connections per user. Any user exceeding this 
limit will not be allowed to connect.                                           
                                                                                
                              | int      | 1.7.0 |
-| kyuubi.server.limit.batch.connections.per.user.ipaddress | &lt;undefined&gt; 
| Maximum kyuubi server batch connections per user:ipaddress combination. Any 
user-ipaddress exceeding this limit will not be allowed to connect.             
                                                                                
                            | int      | 1.7.0 |
-| kyuubi.server.limit.client.fetch.max.rows                | &lt;undefined&gt; 
| Max rows limit for getting result row set operation. If the max rows 
specified by client-side is larger than the limit, request will fail directly.  
                                                                                
                                   | int      | 1.8.0 |
-| kyuubi.server.limit.connections.ip.deny.list                                
|| The client ip in the deny list will be denied to connect to kyuubi server.   
                                                                                
                                                                                
                           | set      | 1.9.1 |
-| kyuubi.server.limit.connections.per.ipaddress            | &lt;undefined&gt; 
| Maximum kyuubi server connections per ipaddress. Any user exceeding this 
limit will not be allowed to connect.                                           
                                                                                
                               | int      | 1.6.0 |
-| kyuubi.server.limit.connections.per.user                 | &lt;undefined&gt; 
| Maximum kyuubi server connections per user. Any user exceeding this limit 
will not be allowed to connect.                                                 
                                                                                
                              | int      | 1.6.0 |
-| kyuubi.server.limit.connections.per.user.ipaddress       | &lt;undefined&gt; 
| Maximum kyuubi server connections per user:ipaddress combination. Any 
user-ipaddress exceeding this limit will not be allowed to connect.             
                                                                                
                                  | int      | 1.6.0 |
-| kyuubi.server.limit.connections.user.deny.list                              
|| The user in the deny list will be denied to connect to kyuubi server, if the 
user has configured both user.unlimited.list and user.deny.list, the priority 
of the latter is higher.                                                        
                             | set      | 1.8.0 |
-| kyuubi.server.limit.connections.user.unlimited.list                         
|| The maximum connections of the user in the white list will not be limited.   
                                                                                
                                                                                
                           | set      | 1.7.0 |
-| kyuubi.server.name                                       | &lt;undefined&gt; 
| The name of Kyuubi Server.                                                    
                                                                                
                                                                                
                          | string   | 1.5.0 |
-| kyuubi.server.periodicGC.interval                        | PT30M             
| How often to trigger a garbage collection.                                    
                                                                                
                                                                                
                          | duration | 1.7.0 |
-| kyuubi.server.redaction.regex                            | &lt;undefined&gt; 
| Regex to decide which Kyuubi contain sensitive information. When this regex 
matches a property key or value, the value is redacted from the various logs.   
                                                                                
                                      || 1.6.0 |
-| kyuubi.server.thrift.resultset.default.fetch.size        | 1000              
| The number of rows sent in one Fetch RPC call by the server to the client, if 
not specified by the client. Respect 
`hive.server2.thrift.resultset.default.fetch.size` hive conf.                   
                                                                     | int      
| 1.9.1 |
+|                           Key                            |      Default      
|                                                                               
                                                  Meaning                       
                                                                                
                          |   Type   | Since  |
+|----------------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------|
+| kyuubi.server.administrators                                                
|| Comma-separated list of Kyuubi service administrators. We use this config to 
grant admin permission to any service accounts when security mechanism is 
enabled. Note, when kyuubi.authentication is configured to NOSASL or NONE, 
everyone is treated as administrator. | set      | 1.8.0  |
+| kyuubi.server.info.provider                              | ENGINE            
| The server information provider name, some clients may rely on this 
information to check the server compatibilities and functionalities. 
<li>SERVER: Return Kyuubi server information.</li> <li>ENGINE: Return Kyuubi 
engine information.</li>                          | string   | 1.6.1  |
+| kyuubi.server.limit.batch.connections.per.ipaddress      | &lt;undefined&gt; 
| Maximum kyuubi server batch connections per ipaddress. Any user exceeding 
this limit will not be allowed to connect.                                      
                                                                                
                              | int      | 1.7.0  |
+| kyuubi.server.limit.batch.connections.per.user           | &lt;undefined&gt; 
| Maximum kyuubi server batch connections per user. Any user exceeding this 
limit will not be allowed to connect.                                           
                                                                                
                              | int      | 1.7.0  |
+| kyuubi.server.limit.batch.connections.per.user.ipaddress | &lt;undefined&gt; 
| Maximum kyuubi server batch connections per user:ipaddress combination. Any 
user-ipaddress exceeding this limit will not be allowed to connect.             
                                                                                
                            | int      | 1.7.0  |
+| kyuubi.server.limit.client.fetch.max.rows                | &lt;undefined&gt; 
| Max rows limit for getting result row set operation. If the max rows 
specified by client-side is larger than the limit, request will fail directly.  
                                                                                
                                   | int      | 1.8.0  |
+| kyuubi.server.limit.connections.ip.deny.list                                
|| The client ip in the deny list will be denied to connect to kyuubi server.   
                                                                                
                                                                                
                           | set      | 1.9.1  |
+| kyuubi.server.limit.connections.per.ipaddress            | &lt;undefined&gt; 
| Maximum kyuubi server connections per ipaddress. Any user exceeding this 
limit will not be allowed to connect.                                           
                                                                                
                               | int      | 1.6.0  |
+| kyuubi.server.limit.connections.per.user                 | &lt;undefined&gt; 
| Maximum kyuubi server connections per user. Any user exceeding this limit 
will not be allowed to connect.                                                 
                                                                                
                              | int      | 1.6.0  |
+| kyuubi.server.limit.connections.per.user.ipaddress       | &lt;undefined&gt; 
| Maximum kyuubi server connections per user:ipaddress combination. Any 
user-ipaddress exceeding this limit will not be allowed to connect.             
                                                                                
                                  | int      | 1.6.0  |
+| kyuubi.server.limit.connections.user.deny.list                              
|| The user in the deny list will be denied to connect to kyuubi server, if the 
user has configured both user.unlimited.list and user.deny.list, the priority 
of the latter is higher.                                                        
                             | set      | 1.8.0  |
+| kyuubi.server.limit.connections.user.unlimited.list                         
|| The maximum connections of the user in the white list will not be limited.   
                                                                                
                                                                                
                           | set      | 1.7.0  |
+| kyuubi.server.name                                       | &lt;undefined&gt; 
| The name of Kyuubi Server.                                                    
                                                                                
                                                                                
                          | string   | 1.5.0  |
+| kyuubi.server.periodicGC.interval                        | PT30M             
| How often to trigger a garbage collection.                                    
                                                                                
                                                                                
                          | duration | 1.7.0  |
+| kyuubi.server.redaction.regex                            | &lt;undefined&gt; 
| Regex to decide which Kyuubi contain sensitive information. When this regex 
matches a property key or value, the value is redacted from the various logs.   
                                                                                
                                      || 1.6.0  |
+| kyuubi.server.tempFile.expireTime                        | P14D              
| Expiration timout for cleanup server-side temporary files, e.g. operation 
logs.                                                                           
                                                                                
                              | duration | 1.10.0 |
+| kyuubi.server.thrift.resultset.default.fetch.size        | 1000              
| The number of rows sent in one Fetch RPC call by the server to the client, if 
not specified by the client. Respect 
`hive.server2.thrift.resultset.default.fetch.size` hive conf.                   
                                                                     | int      
| 1.9.1  |
 
 ### Session
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index d58a22e45a..771bb65ee2 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -43,6 +43,7 @@ import org.apache.kyuubi.operation.{ArrayFetchIterator, 
OperationHandle, Operati
 import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.TempFileCleanupUtils
 import org.apache.kyuubi.util.reflect.DynFields
 
 class ExecutePython(
@@ -398,7 +399,7 @@ object ExecutePython extends Logging {
     val source = getClass.getClassLoader.getResourceAsStream(s"python/$pyfile")
 
     val file = new File(pythonPath.toFile, pyfile)
-    file.deleteOnExit()
+    TempFileCleanupUtils.deleteOnExit(file)
 
     val sink = new FileOutputStream(file)
     val buf = new Array[Byte](1024)
diff --git a/kyuubi-common/pom.xml b/kyuubi-common/pom.xml
index 747351e41b..57c5a27fdf 100644
--- a/kyuubi-common/pom.xml
+++ b/kyuubi-common/pom.xml
@@ -123,6 +123,11 @@
             <artifactId>HikariCP</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.kyuubi</groupId>
             <artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId>
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 326b1601ff..f58bff3a37 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -38,6 +38,7 @@ import org.apache.hadoop.util.ShutdownHookManager
 
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.internal.Tests.IS_TESTING
+import org.apache.kyuubi.util.TempFileCleanupUtils
 import org.apache.kyuubi.util.command.CommandLineUtils._
 
 object Utils extends Logging {
@@ -138,6 +139,10 @@ object Utils extends Logging {
    * Delete a directory recursively.
    */
   def deleteDirectoryRecursively(f: File, ignoreException: Boolean = true): 
Unit = {
+    if (f == null || !f.exists()) {
+      return
+    }
+
     if (f.isDirectory) {
       val files = f.listFiles
       if (files != null && files.nonEmpty) {
@@ -164,7 +169,7 @@ object Utils extends Logging {
       prefix: String = "kyuubi",
       root: String = System.getProperty("java.io.tmpdir")): Path = {
     val dir = createDirectory(root, prefix)
-    dir.toFile.deleteOnExit()
+    TempFileCleanupUtils.deleteOnExit(dir)
     dir
   }
 
@@ -211,9 +216,8 @@ object Utils extends Logging {
       } finally {
         source.close()
       }
-      val file = filePath.toFile
-      file.deleteOnExit()
-      file
+      TempFileCleanupUtils.deleteOnExit(filePath)
+      filePath.toFile
     } catch {
       case e: Exception =>
         error(
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index a88b5f615e..ebb28d4150 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -3113,6 +3113,14 @@ object KyuubiConf {
       .timeConf
       .createWithDefaultString("PT30M")
 
+  val SERVER_TEMP_FILE_EXPIRE_TIME: ConfigEntry[Long] =
+    buildConf("kyuubi.server.tempFile.expireTime")
+      .doc("Expiration timout for cleanup server-side temporary files, e.g. 
operation logs.")
+      .version("1.10.0")
+      .serverOnly
+      .timeConf
+      .createWithDefaultString("P14D")
+
   val SERVER_ADMINISTRATORS: ConfigEntry[Set[String]] =
     buildConf("kyuubi.server.administrators")
       .doc("Comma-separated list of Kyuubi service administrators. " +
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index b3bd46d35a..e77a726d8c 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -31,7 +31,7 @@ import 
org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, Fe
 import org.apache.kyuubi.operation.OperationHandle
 import org.apache.kyuubi.session.Session
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TRow, 
TRowSet, TStringColumn}
-import org.apache.kyuubi.util.ThriftUtils
+import org.apache.kyuubi.util.{TempFileCleanupUtils, ThriftUtils}
 
 object OperationLog extends Logging {
   final private val OPERATION_LOG: InheritableThreadLocal[OperationLog] = {
@@ -49,19 +49,23 @@ object OperationLog extends Logging {
   def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove()
 
   /**
-   * The operation log root directory, this directory will delete when JVM 
exit.
+   * The operation log root directory, this directory will be deleted
+   * either after the duration of `kyuubi.server.tempFile.expireTime`
+   * or when JVM exit.
    */
-  def createOperationLogRootDirectory(session: Session): Unit = {
-    session.sessionManager.operationLogRoot.foreach { operationLogRoot =>
+  def createOperationLogRootDirectory(session: Session): Path = {
+    session.sessionManager.operationLogRoot.map { operationLogRoot =>
       val path = Paths.get(operationLogRoot, 
session.handle.identifier.toString)
       try {
         Files.createDirectories(path)
-        path.toFile.deleteOnExit()
+        TempFileCleanupUtils.deleteOnExit(path)
+        path
       } catch {
         case e: IOException =>
           error(s"Failed to create operation log root directory: $path", e)
+          null
       }
-    }
+    }.orNull
   }
 
   /**
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TempFileService.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TempFileService.scala
new file mode 100644
index 0000000000..a53259e7a1
--- /dev/null
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TempFileService.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.service
+
+import java.nio.file.{Path, Paths}
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+
+import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.service.TempFileService.tempFileCounter
+import org.apache.kyuubi.util.{TempFileCleanupUtils, ThreadUtils}
+
+class TempFileService(name: String) extends AbstractService(name) {
+  def this() = this(classOf[TempFileService].getSimpleName)
+
+  final private var expiringFiles: Cache[String, String] = _
+  private lazy val cleanupScheduler =
+    
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-cleanup-scheduler")
+
+  override def initialize(conf: KyuubiConf): Unit = {
+    super.initialize(conf)
+    val expireTimeInMs = conf.get(KyuubiConf.SERVER_TEMP_FILE_EXPIRE_TIME)
+    expiringFiles = CacheBuilder.newBuilder()
+      .expireAfterWrite(expireTimeInMs, TimeUnit.MILLISECONDS)
+      .removalListener((notification: RemovalNotification[String, String]) => {
+        val pathStr = notification.getValue
+        debug(s"Remove expired temp file: $pathStr")
+        cleanupFilePath(pathStr)
+      })
+      .build[String, String]()
+
+    cleanupScheduler.scheduleAtFixedRate(
+      () => expiringFiles.cleanUp(),
+      0,
+      Math.max(expireTimeInMs / 10, 100),
+      TimeUnit.MILLISECONDS)
+  }
+
+  override def stop(): Unit = {
+    expiringFiles.asMap().values().forEach(cleanupFilePath)
+    super.stop()
+  }
+
+  private def cleanupFilePath(pathStr: String): Unit = {
+    try {
+      val path = Paths.get(pathStr)
+      TempFileCleanupUtils.cancelDeleteOnExit(path)
+      Utils.deleteDirectoryRecursively(path.toFile)
+    } catch {
+      case e: Throwable => error(s"Failed to delete file $pathStr", e)
+    }
+  }
+
+  /**
+   * add the file path to the expiration list
+   * ensuring the path will be deleted
+   * either after duration
+   * or on the JVM exit
+   *
+   * @param path the path of file or directory
+   */
+  def addPathToExpiration(path: Path): Unit = {
+    require(path != null)
+    expiringFiles.put(
+      s"${tempFileCounter.incrementAndGet()}-${System.currentTimeMillis()}",
+      path.toString)
+    TempFileCleanupUtils.deleteOnExit(path)
+  }
+}
+
+object TempFileService {
+  private lazy val tempFileCounter = new AtomicLong(0)
+}
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index 14e59078fb..1fe5188ad6 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.session
 
+import java.nio.file.Path
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
@@ -259,8 +260,10 @@ abstract class AbstractSession(
     }
   }
 
+  protected var operationalLogRootDir: Option[Path] = None
+
   override def open(): Unit = {
-    OperationLog.createOperationLogRootDirectory(this)
+    operationalLogRootDir = 
Option(OperationLog.createOperationLogRootDirectory(this))
   }
 
   val isForAliveProbe: Boolean =
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/TempFileCleanupUtils.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/TempFileCleanupUtils.scala
new file mode 100644
index 0000000000..9e27fe4269
--- /dev/null
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/TempFileCleanupUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.util
+
+import java.io.File
+import java.nio.file.{Path, Paths}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.kyuubi.Utils
+
+object TempFileCleanupUtils {
+  private lazy val deleteTargets = ConcurrentHashMap.newKeySet[String]()
+
+  private lazy val isCleanupShutdownHookInstalled = {
+    installFilesCleanupOnExitShutdownHook()
+    new AtomicBoolean(true)
+  }
+
+  private def installFilesCleanupOnExitShutdownHook(): Unit = {
+    Utils.addShutdownHook(() => {
+      deleteTargets.forEach { pathStr =>
+        try {
+          Utils.deleteDirectoryRecursively(Paths.get(pathStr).toFile)
+        } catch {
+          case _: Exception =>
+        }
+      }
+      deleteTargets.clear()
+    })
+  }
+
+  def deleteOnExit(file: File): Unit = {
+    require(file != null)
+    deleteOnExit(file.toPath)
+  }
+
+  def deleteOnExit(path: Path): Unit = {
+    require(path != null)
+    isCleanupShutdownHookInstalled.get()
+    deleteTargets.add(path.toString)
+  }
+
+  def cancelDeleteOnExit(path: Path): Unit = {
+    require(path != null)
+    isCleanupShutdownHookInstalled.get()
+    deleteTargets.remove(path.toString)
+  }
+
+}
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index ace7ba9d46..f8c9ffa1b4 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -32,7 +32,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf._
 import org.apache.kyuubi.ha.client.{AuthTypes, ServiceDiscovery}
 import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem}
 import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf
-import org.apache.kyuubi.service.{AbstractBackendService, 
AbstractFrontendService, Serverable, ServiceState}
+import org.apache.kyuubi.service.{AbstractBackendService, 
AbstractFrontendService, Serverable, ServiceState, TempFileService}
 import org.apache.kyuubi.session.KyuubiSessionManager
 import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
 import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
@@ -202,6 +202,8 @@ class KyuubiServer(name: String) extends Serverable(name) {
         throw new UnsupportedOperationException(s"Frontend protocol $other is 
not supported yet.")
     }
 
+  final var tempFileService: TempFileService = _
+
   override def initialize(conf: KyuubiConf): Unit = synchronized {
     val kinit = new KinitAuxiliaryService()
     addService(kinit)
@@ -209,6 +211,9 @@ class KyuubiServer(name: String) extends Serverable(name) {
     val periodicGCService = new PeriodicGCService
     addService(periodicGCService)
 
+    tempFileService = new TempFileService
+    addService(tempFileService)
+
     if (conf.get(MetricsConf.METRICS_ENABLED)) {
       addService(new MetricsSystem)
     }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index b5e98845e6..de69cf3771 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -44,6 +44,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys._
 import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, 
ApplicationState, KillResponse, KyuubiApplicationManager}
 import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, 
OperationState}
 import org.apache.kyuubi.server.KyuubiServer
+import org.apache.kyuubi.server.KyuubiServer.kyuubiServer
 import org.apache.kyuubi.server.api.ApiRequestContext
 import org.apache.kyuubi.server.api.v1.BatchesResource._
 import org.apache.kyuubi.server.metadata.MetadataManager
@@ -568,6 +569,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
       uploadFileFolderPath: JPath): Unit = {
     try {
       val tempFile = Utils.writeToTempFile(inputStream, uploadFileFolderPath, 
fileName)
+      kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath)
       request.setResource(tempFile.getPath)
     } catch {
       case e: Exception =>
@@ -599,10 +601,12 @@ private[v1] class BatchesResource extends 
ApiRequestContext with Logging {
             val tempFilePaths = fileParts.map { filePart =>
               val fileName = filePart.getContentDisposition.getFileName
               try {
-                Utils.writeToTempFile(
+                val tempFile = Utils.writeToTempFile(
                   filePart.getValueAs(classOf[InputStream]),
                   uploadFileFolderPath,
-                  fileName).getPath
+                  fileName)
+                
kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath)
+                tempFile.getPath
               } catch {
                 case e: Exception =>
                   throw new RuntimeException(
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index d0e8e042f7..6b4e2a2e7d 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -118,6 +118,8 @@ class KyuubiSessionImpl(
     // we should call super.open before running launch engine operation
     super.open()
 
+    
sessionManager.tempFileService.addPathToExpiration(operationalLogRootDir.get)
+
     runOperation(launchEngineOp)
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 9edc8218eb..a20b4bc97c 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -36,8 +36,10 @@ import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState}
 import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader, 
SessionConfAdvisor}
+import org.apache.kyuubi.server.KyuubiServer.kyuubiServer
 import org.apache.kyuubi.server.metadata.{MetadataManager, 
MetadataRequestsRetryRef}
 import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
+import org.apache.kyuubi.service.TempFileService
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
 import org.apache.kyuubi.sql.parser.server.KyuubiParser
 import org.apache.kyuubi.util.{SignUtils, ThreadUtils}
@@ -71,6 +73,8 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
   private val engineConnectionAliveChecker =
     
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
 
+  def tempFileService: TempFileService = kyuubiServer.tempFileService
+
   override def initialize(conf: KyuubiConf): Unit = {
     this.conf = conf
     addService(applicationManager)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
new file mode 100644
index 0000000000..4b7568c1c7
--- /dev/null
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.server
+
+import java.io.ByteArrayInputStream
+import java.time.Duration
+
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
+
+import org.apache.kyuubi.{Utils, WithKyuubiServer}
+import org.apache.kyuubi.Utils.writeToTempFile
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.SERVER_TEMP_FILE_EXPIRE_TIME
+
+class TempFileServiceSuite extends WithKyuubiServer {
+  private val expirationInMs = 100
+
+  override protected val conf: KyuubiConf = KyuubiConf()
+    .set(SERVER_TEMP_FILE_EXPIRE_TIME, 
Duration.ofMillis(expirationInMs).toMillis)
+
+  test("file cleaned up after expiration") {
+    val tempFileService = KyuubiServer.kyuubiServer.tempFileService
+    (0 until 3).map { i =>
+      val dir = Utils.createTempDir()
+      writeToTempFile(new ByteArrayInputStream(s"$i".getBytes()), dir, 
s"$i.txt")
+      dir.toFile
+    }.map { dirFile =>
+      assert(dirFile.exists())
+      tempFileService.addPathToExpiration(dirFile.toPath)
+      dirFile
+    }.foreach { f =>
+      eventually(Timeout((expirationInMs * 2).millis)) {
+        assert(!f.exists())
+      }
+    }
+  }
+}


Reply via email to