Repository: spark Updated Branches: refs/heads/branch-2.3 99f5c0bc7 -> d9e1f7040
[SPARK-23670][SQL] Fix memory leak on SparkPlanGraphWrapper Clean up SparkPlanGraphWrapper objects from InMemoryStore together with cleaning up SQLExecutionUIData existing unit test was extended to check also SparkPlanGraphWrapper object count vanzin Author: myroslavlisniak <acni...@gmail.com> Closes #20813 from myroslavlisniak/master. (cherry picked from commit c2632edebd978716dbfa7874a2fc0a8f5a4a9951) Signed-off-by: Marcelo Vanzin <van...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9e1f704 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9e1f704 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9e1f704 Branch: refs/heads/branch-2.3 Commit: d9e1f7040092aa4aeab9bdb82f0c28a292c90609 Parents: 99f5c0b Author: myroslavlisniak <acni...@gmail.com> Authored: Thu Mar 15 17:20:17 2018 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Thu Mar 15 17:21:21 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/execution/ui/SQLAppStatusListener.scala | 5 ++++- .../org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala | 4 ++++ .../spark/sql/execution/ui/SQLAppStatusListenerSuite.scala | 1 + 3 files changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d9e1f704/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 53fb9a0..71e9f93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -334,7 +334,10 @@ class SQLAppStatusListener( val view = kvstore.view(classOf[SQLExecutionUIData]).index("completionTime").first(0L) val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined) - toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) } + toDelete.foreach { e => + kvstore.delete(e.getClass(), e.executionId) + kvstore.delete(classOf[SparkPlanGraphWrapper], e.executionId) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/d9e1f704/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 9a76584..241001a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -54,6 +54,10 @@ class SQLAppStatusStore( store.count(classOf[SQLExecutionUIData]) } + def planGraphCount(): Long = { + store.count(classOf[SparkPlanGraphWrapper]) + } + def executionMetrics(executionId: Long): Map[Long, String] = { def metricsFromStore(): Option[Map[Long, String]] = { val exec = store.read(classOf[SQLExecutionUIData], executionId) http://git-wip-us.apache.org/repos/asf/spark/blob/d9e1f704/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 85face3..f3f0883 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -611,6 +611,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite { sc.listenerBus.waitUntilEmpty(10000) val statusStore = spark.sharedState.statusStore assert(statusStore.executionsCount() <= 50) + assert(statusStore.planGraphCount() <= 50) // No live data should be left behind after all executions end. assert(statusStore.listener.get.noLiveData()) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org