This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d90c5d  [SPARK-34087][SQL] Fix memory leak of ExecutionListenerBus
4d90c5d is described below

commit 4d90c5dc0efcf77ef6735000ee7016428c57077b
Author: yi.wu <yi...@databricks.com>
AuthorDate: Thu Mar 18 13:27:03 2021 +0900

    [SPARK-34087][SQL] Fix memory leak of ExecutionListenerBus
    
    ### What changes were proposed in this pull request?
    
    This PR proposes an alternative way to fix the memory leak of 
`ExecutionListenerBus`, which would automatically clean them up.
    
    Basically, the idea is to add `registerSparkListenerForCleanup` to 
`ContextCleaner`, so we can remove the `ExecutionListenerBus` from 
`LiveListenerBus` when the `SparkSession` is GC'ed.
    
    On the other hand, to make the `SparkSession` GC-able, we need to get rid 
of the reference of `SparkSession` in `ExecutionListenerBus`. Therefore, we 
introduced the `sessionUUID`, which is a unique identifier for SparkSession, to 
replace the  `SparkSession` object.
    
    Note that, the proposal wouldn't take effect when 
`spark.cleaner.referenceTracking=false` since it depends on `ContextCleaner`.
    
    ### Why are the changes needed?
    
    Fix the memory leak caused by `ExecutionListenerBus` mentioned in 
SPARK-34087.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, save memory for users.
    
    ### How was this patch tested?
    
    Added unit test.
    
    Closes #31839 from Ngone51/fix-mem-leak-of-ExecutionListenerBus.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../scala/org/apache/spark/ContextCleaner.scala    | 21 +++++++++++++
 .../scala/org/apache/spark/sql/SparkSession.scala  |  3 ++
 .../spark/sql/util/QueryExecutionListener.scala    | 12 ++++----
 .../spark/sql/SparkSessionBuilderSuite.scala       | 35 +++++++++++++++++++++-
 4 files changed, 65 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index cfa1139..34b3089 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -27,6 +27,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
+import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.shuffle.api.ShuffleDriverComponents
 import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils, 
Utils}
 
@@ -39,6 +40,7 @@ private case class CleanShuffle(shuffleId: Int) extends 
CleanupTask
 private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
 private case class CleanAccum(accId: Long) extends CleanupTask
 private case class CleanCheckpoint(rddId: Int) extends CleanupTask
+private case class CleanSparkListener(listener: SparkListener) extends 
CleanupTask
 
 /**
  * A WeakReference associated with a CleanupTask.
@@ -175,6 +177,13 @@ private[spark] class ContextCleaner(
     referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, 
referenceQueue))
   }
 
+  /** Register a SparkListener to be cleaned up when its owner is garbage 
collected. */
+  def registerSparkListenerForCleanup(
+      listenerOwner: AnyRef,
+      listener: SparkListener): Unit = {
+    registerForCleanup(listenerOwner, CleanSparkListener(listener))
+  }
+
   /** Keep cleaning RDD, shuffle, and broadcast state. */
   private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
     while (!stopped) {
@@ -197,6 +206,8 @@ private[spark] class ContextCleaner(
                 doCleanupAccum(accId, blocking = blockOnCleanupTasks)
               case CleanCheckpoint(rddId) =>
                 doCleanCheckpoint(rddId)
+              case CleanSparkListener(listener) =>
+                doCleanSparkListener(listener)
             }
           }
         }
@@ -276,6 +287,16 @@ private[spark] class ContextCleaner(
     }
   }
 
+  def doCleanSparkListener(listener: SparkListener): Unit = {
+    try {
+      logDebug(s"Cleaning Spark listener $listener")
+      sc.listenerBus.removeListener(listener)
+      logDebug(s"Cleaned Spark listener $listener")
+    } catch {
+      case e: Exception => logError(s"Error cleaning Spark listener 
$listener", e)
+    }
+  }
+
   private def broadcastManager = sc.env.broadcastManager
   private def mapOutputTrackerMaster = 
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 678233d..5b33564 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import java.io.Closeable
+import java.util.UUID
 import java.util.concurrent.TimeUnit._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
@@ -102,6 +103,8 @@ class SparkSession private(
         new SparkSessionExtensions), Map.empty)
   }
 
+  private[sql] val sessionUUID: String = UUID.randomUUID.toString
+
   sparkContext.assertNotStopped()
 
   // If there is no active SparkSession, uses the default SQL conf. Otherwise, 
use the session's.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
index 0b5951e..d8b630d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
@@ -76,7 +76,11 @@ trait QueryExecutionListener {
 class ExecutionListenerManager private[sql](session: SparkSession, 
loadExtensions: Boolean)
   extends Logging {
 
-  private val listenerBus = new ExecutionListenerBus(session)
+  private val listenerBus = new ExecutionListenerBus(session.sessionUUID)
+  session.sparkContext.listenerBus.addToSharedQueue(listenerBus)
+  session.sparkContext.cleaner.foreach { cleaner =>
+    cleaner.registerSparkListenerForCleanup(this, listenerBus)
+  }
 
   if (loadExtensions) {
     val conf = session.sparkContext.conf
@@ -124,11 +128,9 @@ class ExecutionListenerManager private[sql](session: 
SparkSession, loadExtension
   }
 }
 
-private[sql] class ExecutionListenerBus(session: SparkSession)
+private[sql] class ExecutionListenerBus(sessionUUID: String)
   extends SparkListener with ListenerBus[QueryExecutionListener, 
SparkListenerSQLExecutionEnd] {
 
-  session.sparkContext.listenerBus.addToSharedQueue(this)
-
   override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
     case e: SparkListenerSQLExecutionEnd => postToAll(e)
     case _ =>
@@ -158,6 +160,6 @@ private[sql] class ExecutionListenerBus(session: 
SparkSession)
   private def shouldReport(e: SparkListenerSQLExecutionEnd): Boolean = {
     // Only catch SQL execution with a name, and triggered by the same spark 
session that this
     // listener manager belongs.
-    e.executionName.isDefined && e.qe.sparkSession.eq(this.session)
+    e.executionName.isDefined && e.qe.sparkSession.sessionUUID == sessionUUID
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index e8e2f68..f12f866 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -17,20 +17,25 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkException, 
SparkFunSuite}
 import org.apache.spark.internal.config.EXECUTOR_ALLOW_SPARK_CONTEXT
 import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.sql.util.ExecutionListenerBus
 import org.apache.spark.util.ThreadUtils
 
 /**
  * Test cases for the builder pattern of [[SparkSession]].
  */
-class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
+class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach 
with Eventually {
 
   override def afterEach(): Unit = {
     // This suite should not interfere with the other test suites.
@@ -40,6 +45,34 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     SparkSession.clearDefaultSession()
   }
 
+  test("SPARK-34087: Fix memory leak of ExecutionListenerBus") {
+    val spark = SparkSession.builder()
+      .master("local")
+      .getOrCreate()
+
+    @inline def listenersNum(): Int = {
+      spark.sparkContext
+        .listenerBus
+        .listeners
+        .asScala
+        .count(_.isInstanceOf[ExecutionListenerBus])
+    }
+
+    (1 to 10).foreach { _ =>
+      spark.cloneSession()
+      SparkSession.clearActiveSession()
+    }
+
+    eventually(timeout(10.seconds), interval(1.seconds)) {
+      System.gc()
+      // After GC, the number of ExecutionListenerBus should be less than 11 
(we created 11
+      // SparkSessions in total).
+      // Since GC can't 100% guarantee all out-of-referenced objects be 
cleaned at one time,
+      // here, we check at least one listener is cleaned up to prove the 
mechanism works.
+      assert(listenersNum() < 11)
+    }
+  }
+
   test("create with config options and propagate them to SparkContext and 
SparkSession") {
     val session = SparkSession.builder()
       .master("local")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to