This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9a023c479c6 [SPARK-45014][CONNECT] Clean up fileserver when cleaning up files, jars and archives in SparkContext 9a023c479c6 is described below commit 9a023c479c6a91a602f96ccabba398223c04b3d1 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Thu Aug 31 15:21:19 2023 +0900 [SPARK-45014][CONNECT] Clean up fileserver when cleaning up files, jars and archives in SparkContext ### What changes were proposed in this pull request? This PR proposes to clean up the files, jars and archives added via Spark Connect sessions. ### Why are the changes needed? In [SPARK-44348](https://issues.apache.org/jira/browse/SPARK-44348), we clean up Spark Context's added files but we don't clean up the ones in fileserver. ### Does this PR introduce _any_ user-facing change? Yes, it will avoid slowly growing memory within the file server. ### How was this patch tested? Manually tested. Also existing tests should not be broken. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42731 from HyukjinKwon/SPARK-45014. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../sql/connect/artifact/SparkConnectArtifactManager.scala | 10 ++++++---- core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala | 13 +++++++++++++ .../org/apache/spark/rpc/netty/NettyStreamManager.scala | 4 ++++ 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala index a2df11eeb58..fee99532bd5 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala @@ -208,12 +208,14 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging s"sessionId: ${sessionHolder.sessionId}") // Clean up added files - sessionHolder.session.sparkContext.addedFiles.remove(state.uuid) - sessionHolder.session.sparkContext.addedArchives.remove(state.uuid) - sessionHolder.session.sparkContext.addedJars.remove(state.uuid) + val fileserver = SparkEnv.get.rpcEnv.fileServer + val sparkContext = sessionHolder.session.sparkContext + sparkContext.addedFiles.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile)) + sparkContext.addedArchives.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile)) + sparkContext.addedJars.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeJar)) // Clean up cached relations - val blockManager = sessionHolder.session.sparkContext.env.blockManager + val blockManager = sparkContext.env.blockManager blockManager.removeCache(sessionHolder.userId, sessionHolder.sessionId) // Clean up artifacts folder diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index 2fce2889c09..2575cffdeb3 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -206,6 +206,19 @@ private[spark] trait RpcEnvFileServer { fixedBaseUri } + /** + * Removes a file from this RpcEnv. + * + * @param key Local file to remove. + */ + def removeFile(key: String): Unit + + /** + * Removes a jar to from this RpcEnv. + * + * @param key Local jar to remove. + */ + def removeJar(key: String): Unit } private[spark] case class RpcEnvConfig( diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index 57243133aba..9ac14f34836 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -43,6 +43,10 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) private val jars = new ConcurrentHashMap[String, File]() private val dirs = new ConcurrentHashMap[String, File]() + override def removeFile(key: String): Unit = files.remove(key) + + override def removeJar(key: String): Unit = jars.remove(key) + override def getChunk(streamId: Long, chunkIndex: Int): ManagedBuffer = { throw new UnsupportedOperationException() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org