This is an automated email from the ASF dual-hosted git repository. bowenliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push: new db57e9365d [KYUUBI #6587] Periodically expire temp files and operation logs on server to avoid memeory leak by Files.deleteOnExit db57e9365d is described below commit db57e9365d7933942197936ac7ff711d58f7ea91 Author: Bowen Liang <liangbo...@gf.com.cn> AuthorDate: Wed Aug 28 17:13:27 2024 +0800 [KYUUBI #6587] Periodically expire temp files and operation logs on server to avoid memeory leak by Files.deleteOnExit # :mag: Description ## Issue References ๐ - ## Describe Your Solution ๐ง Fix the memory leak on server caused by `Files.deleteOnExit`. For long-running Kyuubi server instances, some operation log files and batch job upload files are marked for deletion at exit using `Files.deleteOnExit`. However, the `files` list within the `DeleteOnExitHook` by `Files.deleteOnExit` method continuously accumulates file paths without being cleaned up, leading to a memory leak issue. This PR fix this issue by: 1. introduce a new util `FileExpirationUtils` for similar use of `Files.deleteOnExit`, with exposed method for evict file path from the list to prevent accumulative path list 2. adding a service `TempFileService ` in server module, periodical clean-up the files for operation logging path, uploaded resources and etc. And it evict the paths in `TempFileCleanupUtils` instance after cleanup. 3. add the new config `kyuubi.server.tempFile.expireTime` with a default value of 7 days, to control How often to trigger a file expiration clean-up for stale files ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐งช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐ - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6587 from bowenliang123/file-expiration. Closes #6587 e23b72e08 [liangbowen] change to P14D acaf370e7 [liangbowen] change config name to kyuubi.server.tempFile.expireTime 6c7ddd527 [liangbowen] import ed1e4d76f [liangbowen] comment: ConcurrentHashMap.newKeySet fbf73ccb4 [liangbowen] update 34d3fc71c [liangbowen] add guava to common module's dep 49c10e5ef [Bowen Liang] file expiration Lead-authored-by: Bowen Liang <liangbo...@gf.com.cn> Co-authored-by: liangbowen <liangbo...@gf.com.cn> Co-authored-by: Bowen Liang <liangbo...@gf.com.cn> Signed-off-by: liangbowen <liangbo...@gf.com.cn> --- .gitignore | 1 + docs/configuration/settings.md | 37 ++++----- .../engine/spark/operation/ExecutePython.scala | 3 +- kyuubi-common/pom.xml | 5 ++ .../src/main/scala/org/apache/kyuubi/Utils.scala | 12 ++- .../org/apache/kyuubi/config/KyuubiConf.scala | 8 ++ .../apache/kyuubi/operation/log/OperationLog.scala | 16 ++-- .../apache/kyuubi/service/TempFileService.scala | 91 ++++++++++++++++++++++ .../apache/kyuubi/session/AbstractSession.scala | 5 +- .../apache/kyuubi/util/TempFileCleanupUtils.scala | 65 ++++++++++++++++ .../org/apache/kyuubi/server/KyuubiServer.scala | 7 +- .../kyuubi/server/api/v1/BatchesResource.scala | 8 +- .../apache/kyuubi/session/KyuubiSessionImpl.scala | 2 + .../kyuubi/session/KyuubiSessionManager.scala | 4 + .../kyuubi/server/TempFileServiceSuite.scala | 53 +++++++++++++ 15 files changed, 284 insertions(+), 33 deletions(-) diff --git a/.gitignore b/.gitignore index 29bb61545c..9f9ba71548 100644 --- a/.gitignore +++ b/.gitignore @@ -67,6 +67,7 @@ embedded_zookeeper/ /externals/kyuubi-spark-sql-engine/engine_operation_logs/ /externals/kyuubi-spark-sql-engine/spark-warehouse/ /work/ +/upload/ /docs/_build/ /kyuubi-common/metrics/ /kyuubi-server/metrics/ diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 68014a5cf3..525316b8e0 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -433,24 +433,25 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co ### Server -| Key | Default | Meaning | Type | Since | -|----------------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| -| kyuubi.server.administrators || Comma-separated list of Kyuubi service administrators. We use this config to grant admin permission to any service accounts when security mechanism is enabled. Note, when kyuubi.authentication is configured to NOSASL or NONE, everyone is treated as administrator. | set | 1.8.0 | -| kyuubi.server.info.provider | ENGINE | The server information provider name, some clients may rely on this information to check the server compatibilities and functionalities. <li>SERVER: Return Kyuubi server information.</li> <li>ENGINE: Return Kyuubi engine information.</li> | string | 1.6.1 | -| kyuubi.server.limit.batch.connections.per.ipaddress | <undefined> | Maximum kyuubi server batch connections per ipaddress. Any user exceeding this limit will not be allowed to connect. | int | 1.7.0 | -| kyuubi.server.limit.batch.connections.per.user | <undefined> | Maximum kyuubi server batch connections per user. Any user exceeding this limit will not be allowed to connect. | int | 1.7.0 | -| kyuubi.server.limit.batch.connections.per.user.ipaddress | <undefined> | Maximum kyuubi server batch connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect. | int | 1.7.0 | -| kyuubi.server.limit.client.fetch.max.rows | <undefined> | Max rows limit for getting result row set operation. If the max rows specified by client-side is larger than the limit, request will fail directly. | int | 1.8.0 | -| kyuubi.server.limit.connections.ip.deny.list || The client ip in the deny list will be denied to connect to kyuubi server. | set | 1.9.1 | -| kyuubi.server.limit.connections.per.ipaddress | <undefined> | Maximum kyuubi server connections per ipaddress. Any user exceeding this limit will not be allowed to connect. | int | 1.6.0 | -| kyuubi.server.limit.connections.per.user | <undefined> | Maximum kyuubi server connections per user. Any user exceeding this limit will not be allowed to connect. | int | 1.6.0 | -| kyuubi.server.limit.connections.per.user.ipaddress | <undefined> | Maximum kyuubi server connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect. | int | 1.6.0 | -| kyuubi.server.limit.connections.user.deny.list || The user in the deny list will be denied to connect to kyuubi server, if the user has configured both user.unlimited.list and user.deny.list, the priority of the latter is higher. | set | 1.8.0 | -| kyuubi.server.limit.connections.user.unlimited.list || The maximum connections of the user in the white list will not be limited. | set | 1.7.0 | -| kyuubi.server.name | <undefined> | The name of Kyuubi Server. | string | 1.5.0 | -| kyuubi.server.periodicGC.interval | PT30M | How often to trigger a garbage collection. | duration | 1.7.0 | -| kyuubi.server.redaction.regex | <undefined> | Regex to decide which Kyuubi contain sensitive information. When this regex matches a property key or value, the value is redacted from the various logs. || 1.6.0 | -| kyuubi.server.thrift.resultset.default.fetch.size | 1000 | The number of rows sent in one Fetch RPC call by the server to the client, if not specified by the client. Respect `hive.server2.thrift.resultset.default.fetch.size` hive conf. | int | 1.9.1 | +| Key | Default | Meaning | Type | Since | +|----------------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------| +| kyuubi.server.administrators || Comma-separated list of Kyuubi service administrators. We use this config to grant admin permission to any service accounts when security mechanism is enabled. Note, when kyuubi.authentication is configured to NOSASL or NONE, everyone is treated as administrator. | set | 1.8.0 | +| kyuubi.server.info.provider | ENGINE | The server information provider name, some clients may rely on this information to check the server compatibilities and functionalities. <li>SERVER: Return Kyuubi server information.</li> <li>ENGINE: Return Kyuubi engine information.</li> | string | 1.6.1 | +| kyuubi.server.limit.batch.connections.per.ipaddress | <undefined> | Maximum kyuubi server batch connections per ipaddress. Any user exceeding this limit will not be allowed to connect. | int | 1.7.0 | +| kyuubi.server.limit.batch.connections.per.user | <undefined> | Maximum kyuubi server batch connections per user. Any user exceeding this limit will not be allowed to connect. | int | 1.7.0 | +| kyuubi.server.limit.batch.connections.per.user.ipaddress | <undefined> | Maximum kyuubi server batch connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect. | int | 1.7.0 | +| kyuubi.server.limit.client.fetch.max.rows | <undefined> | Max rows limit for getting result row set operation. If the max rows specified by client-side is larger than the limit, request will fail directly. | int | 1.8.0 | +| kyuubi.server.limit.connections.ip.deny.list || The client ip in the deny list will be denied to connect to kyuubi server. | set | 1.9.1 | +| kyuubi.server.limit.connections.per.ipaddress | <undefined> | Maximum kyuubi server connections per ipaddress. Any user exceeding this limit will not be allowed to connect. | int | 1.6.0 | +| kyuubi.server.limit.connections.per.user | <undefined> | Maximum kyuubi server connections per user. Any user exceeding this limit will not be allowed to connect. | int | 1.6.0 | +| kyuubi.server.limit.connections.per.user.ipaddress | <undefined> | Maximum kyuubi server connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect. | int | 1.6.0 | +| kyuubi.server.limit.connections.user.deny.list || The user in the deny list will be denied to connect to kyuubi server, if the user has configured both user.unlimited.list and user.deny.list, the priority of the latter is higher. | set | 1.8.0 | +| kyuubi.server.limit.connections.user.unlimited.list || The maximum connections of the user in the white list will not be limited. | set | 1.7.0 | +| kyuubi.server.name | <undefined> | The name of Kyuubi Server. | string | 1.5.0 | +| kyuubi.server.periodicGC.interval | PT30M | How often to trigger a garbage collection. | duration | 1.7.0 | +| kyuubi.server.redaction.regex | <undefined> | Regex to decide which Kyuubi contain sensitive information. When this regex matches a property key or value, the value is redacted from the various logs. || 1.6.0 | +| kyuubi.server.tempFile.expireTime | P14D | Expiration timout for cleanup server-side temporary files, e.g. operation logs. | duration | 1.10.0 | +| kyuubi.server.thrift.resultset.default.fetch.size | 1000 | The number of rows sent in one Fetch RPC call by the server to the client, if not specified by the client. Respect `hive.server2.thrift.resultset.default.fetch.size` hive conf. | int | 1.9.1 | ### Session diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala index d58a22e45a..771bb65ee2 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala @@ -43,6 +43,7 @@ import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationHandle, Operati import org.apache.kyuubi.operation.OperationState.OperationState import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session +import org.apache.kyuubi.util.TempFileCleanupUtils import org.apache.kyuubi.util.reflect.DynFields class ExecutePython( @@ -398,7 +399,7 @@ object ExecutePython extends Logging { val source = getClass.getClassLoader.getResourceAsStream(s"python/$pyfile") val file = new File(pythonPath.toFile, pyfile) - file.deleteOnExit() + TempFileCleanupUtils.deleteOnExit(file) val sink = new FileOutputStream(file) val buf = new Array[Byte](1024) diff --git a/kyuubi-common/pom.xml b/kyuubi-common/pom.xml index 747351e41b..57c5a27fdf 100644 --- a/kyuubi-common/pom.xml +++ b/kyuubi-common/pom.xml @@ -123,6 +123,11 @@ <artifactId>HikariCP</artifactId> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> <groupId>org.apache.kyuubi</groupId> <artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId> diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala index 326b1601ff..f58bff3a37 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala @@ -38,6 +38,7 @@ import org.apache.hadoop.util.ShutdownHookManager import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.internal.Tests.IS_TESTING +import org.apache.kyuubi.util.TempFileCleanupUtils import org.apache.kyuubi.util.command.CommandLineUtils._ object Utils extends Logging { @@ -138,6 +139,10 @@ object Utils extends Logging { * Delete a directory recursively. */ def deleteDirectoryRecursively(f: File, ignoreException: Boolean = true): Unit = { + if (f == null || !f.exists()) { + return + } + if (f.isDirectory) { val files = f.listFiles if (files != null && files.nonEmpty) { @@ -164,7 +169,7 @@ object Utils extends Logging { prefix: String = "kyuubi", root: String = System.getProperty("java.io.tmpdir")): Path = { val dir = createDirectory(root, prefix) - dir.toFile.deleteOnExit() + TempFileCleanupUtils.deleteOnExit(dir) dir } @@ -211,9 +216,8 @@ object Utils extends Logging { } finally { source.close() } - val file = filePath.toFile - file.deleteOnExit() - file + TempFileCleanupUtils.deleteOnExit(filePath) + filePath.toFile } catch { case e: Exception => error( diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index a88b5f615e..ebb28d4150 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -3113,6 +3113,14 @@ object KyuubiConf { .timeConf .createWithDefaultString("PT30M") + val SERVER_TEMP_FILE_EXPIRE_TIME: ConfigEntry[Long] = + buildConf("kyuubi.server.tempFile.expireTime") + .doc("Expiration timout for cleanup server-side temporary files, e.g. operation logs.") + .version("1.10.0") + .serverOnly + .timeConf + .createWithDefaultString("P14D") + val SERVER_ADMINISTRATORS: ConfigEntry[Set[String]] = buildConf("kyuubi.server.administrators") .doc("Comma-separated list of Kyuubi service administrators. " + diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala index b3bd46d35a..e77a726d8c 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala @@ -31,7 +31,7 @@ import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, Fe import org.apache.kyuubi.operation.OperationHandle import org.apache.kyuubi.session.Session import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn} -import org.apache.kyuubi.util.ThriftUtils +import org.apache.kyuubi.util.{TempFileCleanupUtils, ThriftUtils} object OperationLog extends Logging { final private val OPERATION_LOG: InheritableThreadLocal[OperationLog] = { @@ -49,19 +49,23 @@ object OperationLog extends Logging { def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove() /** - * The operation log root directory, this directory will delete when JVM exit. + * The operation log root directory, this directory will be deleted + * either after the duration of `kyuubi.server.tempFile.expireTime` + * or when JVM exit. */ - def createOperationLogRootDirectory(session: Session): Unit = { - session.sessionManager.operationLogRoot.foreach { operationLogRoot => + def createOperationLogRootDirectory(session: Session): Path = { + session.sessionManager.operationLogRoot.map { operationLogRoot => val path = Paths.get(operationLogRoot, session.handle.identifier.toString) try { Files.createDirectories(path) - path.toFile.deleteOnExit() + TempFileCleanupUtils.deleteOnExit(path) + path } catch { case e: IOException => error(s"Failed to create operation log root directory: $path", e) + null } - } + }.orNull } /** diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TempFileService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TempFileService.scala new file mode 100644 index 0000000000..a53259e7a1 --- /dev/null +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TempFileService.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.service + +import java.nio.file.{Path, Paths} +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong + +import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification} + +import org.apache.kyuubi.Utils +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.service.TempFileService.tempFileCounter +import org.apache.kyuubi.util.{TempFileCleanupUtils, ThreadUtils} + +class TempFileService(name: String) extends AbstractService(name) { + def this() = this(classOf[TempFileService].getSimpleName) + + final private var expiringFiles: Cache[String, String] = _ + private lazy val cleanupScheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-cleanup-scheduler") + + override def initialize(conf: KyuubiConf): Unit = { + super.initialize(conf) + val expireTimeInMs = conf.get(KyuubiConf.SERVER_TEMP_FILE_EXPIRE_TIME) + expiringFiles = CacheBuilder.newBuilder() + .expireAfterWrite(expireTimeInMs, TimeUnit.MILLISECONDS) + .removalListener((notification: RemovalNotification[String, String]) => { + val pathStr = notification.getValue + debug(s"Remove expired temp file: $pathStr") + cleanupFilePath(pathStr) + }) + .build[String, String]() + + cleanupScheduler.scheduleAtFixedRate( + () => expiringFiles.cleanUp(), + 0, + Math.max(expireTimeInMs / 10, 100), + TimeUnit.MILLISECONDS) + } + + override def stop(): Unit = { + expiringFiles.asMap().values().forEach(cleanupFilePath) + super.stop() + } + + private def cleanupFilePath(pathStr: String): Unit = { + try { + val path = Paths.get(pathStr) + TempFileCleanupUtils.cancelDeleteOnExit(path) + Utils.deleteDirectoryRecursively(path.toFile) + } catch { + case e: Throwable => error(s"Failed to delete file $pathStr", e) + } + } + + /** + * add the file path to the expiration list + * ensuring the path will be deleted + * either after duration + * or on the JVM exit + * + * @param path the path of file or directory + */ + def addPathToExpiration(path: Path): Unit = { + require(path != null) + expiringFiles.put( + s"${tempFileCounter.incrementAndGet()}-${System.currentTimeMillis()}", + path.toString) + TempFileCleanupUtils.deleteOnExit(path) + } +} + +object TempFileService { + private lazy val tempFileCounter = new AtomicLong(0) +} diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala index 14e59078fb..1fe5188ad6 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.session +import java.nio.file.Path import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ @@ -259,8 +260,10 @@ abstract class AbstractSession( } } + protected var operationalLogRootDir: Option[Path] = None + override def open(): Unit = { - OperationLog.createOperationLogRootDirectory(this) + operationalLogRootDir = Option(OperationLog.createOperationLogRootDirectory(this)) } val isForAliveProbe: Boolean = diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/TempFileCleanupUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/TempFileCleanupUtils.scala new file mode 100644 index 0000000000..9e27fe4269 --- /dev/null +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/TempFileCleanupUtils.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.util + +import java.io.File +import java.nio.file.{Path, Paths} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.kyuubi.Utils + +object TempFileCleanupUtils { + private lazy val deleteTargets = ConcurrentHashMap.newKeySet[String]() + + private lazy val isCleanupShutdownHookInstalled = { + installFilesCleanupOnExitShutdownHook() + new AtomicBoolean(true) + } + + private def installFilesCleanupOnExitShutdownHook(): Unit = { + Utils.addShutdownHook(() => { + deleteTargets.forEach { pathStr => + try { + Utils.deleteDirectoryRecursively(Paths.get(pathStr).toFile) + } catch { + case _: Exception => + } + } + deleteTargets.clear() + }) + } + + def deleteOnExit(file: File): Unit = { + require(file != null) + deleteOnExit(file.toPath) + } + + def deleteOnExit(path: Path): Unit = { + require(path != null) + isCleanupShutdownHookInstalled.get() + deleteTargets.add(path.toString) + } + + def cancelDeleteOnExit(path: Path): Unit = { + require(path != null) + isCleanupShutdownHookInstalled.get() + deleteTargets.remove(path.toString) + } + +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index ace7ba9d46..f8c9ffa1b4 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -32,7 +32,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.{AuthTypes, ServiceDiscovery} import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem} import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf -import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState} +import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState, TempFileService} import org.apache.kyuubi.session.KyuubiSessionManager import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister} import org.apache.kyuubi.zookeeper.EmbeddedZookeeper @@ -202,6 +202,8 @@ class KyuubiServer(name: String) extends Serverable(name) { throw new UnsupportedOperationException(s"Frontend protocol $other is not supported yet.") } + final var tempFileService: TempFileService = _ + override def initialize(conf: KyuubiConf): Unit = synchronized { val kinit = new KinitAuxiliaryService() addService(kinit) @@ -209,6 +211,9 @@ class KyuubiServer(name: String) extends Serverable(name) { val periodicGCService = new PeriodicGCService addService(periodicGCService) + tempFileService = new TempFileService + addService(tempFileService) + if (conf.get(MetricsConf.METRICS_ENABLED)) { addService(new MetricsSystem) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index b5e98845e6..de69cf3771 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -44,6 +44,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys._ import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, ApplicationState, KillResponse, KyuubiApplicationManager} import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, OperationState} import org.apache.kyuubi.server.KyuubiServer +import org.apache.kyuubi.server.KyuubiServer.kyuubiServer import org.apache.kyuubi.server.api.ApiRequestContext import org.apache.kyuubi.server.api.v1.BatchesResource._ import org.apache.kyuubi.server.metadata.MetadataManager @@ -568,6 +569,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { uploadFileFolderPath: JPath): Unit = { try { val tempFile = Utils.writeToTempFile(inputStream, uploadFileFolderPath, fileName) + kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath) request.setResource(tempFile.getPath) } catch { case e: Exception => @@ -599,10 +601,12 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { val tempFilePaths = fileParts.map { filePart => val fileName = filePart.getContentDisposition.getFileName try { - Utils.writeToTempFile( + val tempFile = Utils.writeToTempFile( filePart.getValueAs(classOf[InputStream]), uploadFileFolderPath, - fileName).getPath + fileName) + kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath) + tempFile.getPath } catch { case e: Exception => throw new RuntimeException( diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index d0e8e042f7..6b4e2a2e7d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -118,6 +118,8 @@ class KyuubiSessionImpl( // we should call super.open before running launch engine operation super.open() + sessionManager.tempFileService.addPathToExpiration(operationalLogRootDir.get) + runOperation(launchEngineOp) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 9edc8218eb..a20b4bc97c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -36,8 +36,10 @@ import org.apache.kyuubi.metrics.MetricsConstants._ import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState} import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader, SessionConfAdvisor} +import org.apache.kyuubi.server.KyuubiServer.kyuubiServer import org.apache.kyuubi.server.metadata.{MetadataManager, MetadataRequestsRetryRef} import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} +import org.apache.kyuubi.service.TempFileService import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.sql.parser.server.KyuubiParser import org.apache.kyuubi.util.{SignUtils, ThreadUtils} @@ -71,6 +73,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { private val engineConnectionAliveChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker") + def tempFileService: TempFileService = kyuubiServer.tempFileService + override def initialize(conf: KyuubiConf): Unit = { this.conf = conf addService(applicationManager) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala new file mode 100644 index 0000000000..4b7568c1c7 --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.server + +import java.io.ByteArrayInputStream +import java.time.Duration + +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime + +import org.apache.kyuubi.{Utils, WithKyuubiServer} +import org.apache.kyuubi.Utils.writeToTempFile +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.SERVER_TEMP_FILE_EXPIRE_TIME + +class TempFileServiceSuite extends WithKyuubiServer { + private val expirationInMs = 100 + + override protected val conf: KyuubiConf = KyuubiConf() + .set(SERVER_TEMP_FILE_EXPIRE_TIME, Duration.ofMillis(expirationInMs).toMillis) + + test("file cleaned up after expiration") { + val tempFileService = KyuubiServer.kyuubiServer.tempFileService + (0 until 3).map { i => + val dir = Utils.createTempDir() + writeToTempFile(new ByteArrayInputStream(s"$i".getBytes()), dir, s"$i.txt") + dir.toFile + }.map { dirFile => + assert(dirFile.exists()) + tempFileService.addPathToExpiration(dirFile.toPath) + dirFile + }.foreach { f => + eventually(Timeout((expirationInMs * 2).millis)) { + assert(!f.exists()) + } + } + } +}