Repository: spark
Updated Branches:
  refs/heads/master 3675af724 -> c2632edeb


[SPARK-23670][SQL] Fix memory leak on SparkPlanGraphWrapper

Clean up SparkPlanGraphWrapper objects from InMemoryStore together with 
cleaning up SQLExecutionUIData
existing unit test was extended to check also SparkPlanGraphWrapper object count

vanzin

Author: myroslavlisniak <acni...@gmail.com>

Closes #20813 from myroslavlisniak/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2632ede
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2632ede
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2632ede

Branch: refs/heads/master
Commit: c2632edebd978716dbfa7874a2fc0a8f5a4a9951
Parents: 3675af7
Author: myroslavlisniak <acni...@gmail.com>
Authored: Thu Mar 15 17:20:17 2018 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Thu Mar 15 17:20:59 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/ui/SQLAppStatusListener.scala    | 5 ++++-
 .../org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala   | 4 ++++
 .../spark/sql/execution/ui/SQLAppStatusListenerSuite.scala      | 1 +
 3 files changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c2632ede/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 53fb9a0..71e9f93 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -334,7 +334,10 @@ class SQLAppStatusListener(
 
     val view = 
kvstore.view(classOf[SQLExecutionUIData]).index("completionTime").first(0L)
     val toDelete = KVUtils.viewToSeq(view, 
countToDelete.toInt)(_.completionTime.isDefined)
-    toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) }
+    toDelete.foreach { e =>
+      kvstore.delete(e.getClass(), e.executionId)
+      kvstore.delete(classOf[SparkPlanGraphWrapper], e.executionId)
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c2632ede/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
index 9a76584..241001a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
@@ -54,6 +54,10 @@ class SQLAppStatusStore(
     store.count(classOf[SQLExecutionUIData])
   }
 
+  def planGraphCount(): Long = {
+    store.count(classOf[SparkPlanGraphWrapper])
+  }
+
   def executionMetrics(executionId: Long): Map[Long, String] = {
     def metricsFromStore(): Option[Map[Long, String]] = {
       val exec = store.read(classOf[SQLExecutionUIData], executionId)

http://git-wip-us.apache.org/repos/asf/spark/blob/c2632ede/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 85face3..f3f0883 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -611,6 +611,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends 
SparkFunSuite {
         sc.listenerBus.waitUntilEmpty(10000)
         val statusStore = spark.sharedState.statusStore
         assert(statusStore.executionsCount() <= 50)
+        assert(statusStore.planGraphCount() <= 50)
         // No live data should be left behind after all executions end.
         assert(statusStore.listener.get.noLiveData())
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to