This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a023c479c6 [SPARK-45014][CONNECT] Clean up fileserver when cleaning 
up files, jars and archives in SparkContext
9a023c479c6 is described below

commit 9a023c479c6a91a602f96ccabba398223c04b3d1
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Thu Aug 31 15:21:19 2023 +0900

    [SPARK-45014][CONNECT] Clean up fileserver when cleaning up files, jars and 
archives in SparkContext
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to clean up the files, jars and archives added via Spark 
Connect sessions.
    
    ### Why are the changes needed?
    
    In [SPARK-44348](https://issues.apache.org/jira/browse/SPARK-44348), we 
clean up Spark Context's added files but we don't clean up the ones in 
fileserver.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it will avoid slowly growing memory within the file server.
    
    ### How was this patch tested?
    
    Manually tested. Also existing tests should not be broken.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42731 from HyukjinKwon/SPARK-45014.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../sql/connect/artifact/SparkConnectArtifactManager.scala  | 10 ++++++----
 core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala       | 13 +++++++++++++
 .../org/apache/spark/rpc/netty/NettyStreamManager.scala     |  4 ++++
 3 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
index a2df11eeb58..fee99532bd5 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
@@ -208,12 +208,14 @@ class SparkConnectArtifactManager(sessionHolder: 
SessionHolder) extends Logging
         s"sessionId: ${sessionHolder.sessionId}")
 
     // Clean up added files
-    sessionHolder.session.sparkContext.addedFiles.remove(state.uuid)
-    sessionHolder.session.sparkContext.addedArchives.remove(state.uuid)
-    sessionHolder.session.sparkContext.addedJars.remove(state.uuid)
+    val fileserver = SparkEnv.get.rpcEnv.fileServer
+    val sparkContext = sessionHolder.session.sparkContext
+    
sparkContext.addedFiles.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile))
+    
sparkContext.addedArchives.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile))
+    
sparkContext.addedJars.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeJar))
 
     // Clean up cached relations
-    val blockManager = sessionHolder.session.sparkContext.env.blockManager
+    val blockManager = sparkContext.env.blockManager
     blockManager.removeCache(sessionHolder.userId, sessionHolder.sessionId)
 
     // Clean up artifacts folder
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 2fce2889c09..2575cffdeb3 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -206,6 +206,19 @@ private[spark] trait RpcEnvFileServer {
     fixedBaseUri
   }
 
+  /**
+   * Removes a file from this RpcEnv.
+   *
+   * @param key Local file to remove.
+   */
+  def removeFile(key: String): Unit
+
+  /**
+   * Removes a jar to from this RpcEnv.
+   *
+   * @param key Local jar to remove.
+   */
+  def removeJar(key: String): Unit
 }
 
 private[spark] case class RpcEnvConfig(
diff --git 
a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index 57243133aba..9ac14f34836 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -43,6 +43,10 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
   private val jars = new ConcurrentHashMap[String, File]()
   private val dirs = new ConcurrentHashMap[String, File]()
 
+  override def removeFile(key: String): Unit = files.remove(key)
+
+  override def removeJar(key: String): Unit = jars.remove(key)
+
   override def getChunk(streamId: Long, chunkIndex: Int): ManagedBuffer = {
     throw new UnsupportedOperationException()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to