This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 869fc2198a4 [SPARK-39864][SQL] Lazily register ExecutionListenerBus
869fc2198a4 is described below

commit 869fc2198a4bb51bc03dce36fb2b61a57fe3006e
Author: Josh Rosen <joshro...@databricks.com>
AuthorDate: Wed Jul 27 14:25:54 2022 +0900

    [SPARK-39864][SQL] Lazily register ExecutionListenerBus
    
    ### What changes were proposed in this pull request?
    
    This PR modifies `ExecutionListenerManager` so that its 
`ExecutionListenerBus` SparkListener is lazily registered during the first 
`.register(QueryExceutionListener)` (instead of eagerly registering it in the 
constructor).
    
    ### Why are the changes needed?
    
    This addresses a ListenerBus performance problem in applications with large 
numbers of short-lived SparkSessions.
    
    The `ExecutionListenerBus` SparkListener is unregistered by the 
ContextCleaner after its associated ExecutionListenerManager/SparkSession is 
garbage-collected (see #31839). If many sessions are rapidly created and 
destroyed but the driver GC doesn't run then this can result in large number of 
unused ExecutionListenerBus listeners being registered on the shared 
ListenerBus queue. This can cause performance problems in the ListenerBus 
because each listener invocation has some overhead.
    
    In one real-world application with a very large driver heap and high rate 
of SparkSession creation (hundreds per minute), I saw 5000 idle 
ExecutionListenerBus listeners, resulting in ~50ms median event processing 
times on the shared listener queue.
    
    This patch avoids this problem by making the listener registration lazy: if 
a short-lived SparkSession never uses QueryExecutionListeners then we won't 
register the ExecutionListenerBus and won't incur these overheads.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added a new unit test.
    
    Closes #37282 from JoshRosen/SPARK-39864.
    
    Authored-by: Josh Rosen <joshro...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/sql/util/QueryExecutionListener.scala      | 20 ++++++++++++++------
 .../sql/util/ExecutionListenerManagerSuite.scala     | 15 +++++++++++++++
 2 files changed, 29 insertions(+), 6 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
index 7ac06a5cd7e..45482f12f3c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
@@ -81,7 +81,10 @@ class ExecutionListenerManager private[sql](
     loadExtensions: Boolean)
   extends Logging {
 
-  private val listenerBus = new ExecutionListenerBus(this, session)
+  // SPARK-39864: lazily create the listener bus on the first register() call 
in order to
+  // avoid listener overheads when QueryExecutionListeners aren't used:
+  private val listenerBusInitializationLock = new Object()
+  @volatile private var listenerBus: Option[ExecutionListenerBus] = None
 
   if (loadExtensions) {
     val conf = session.sparkContext.conf
@@ -97,7 +100,12 @@ class ExecutionListenerManager private[sql](
    */
   @DeveloperApi
   def register(listener: QueryExecutionListener): Unit = {
-    listenerBus.addListener(listener)
+    listenerBusInitializationLock.synchronized {
+      if (listenerBus.isEmpty) {
+        listenerBus = Some(new ExecutionListenerBus(this, session))
+      }
+    }
+    listenerBus.get.addListener(listener)
   }
 
   /**
@@ -105,7 +113,7 @@ class ExecutionListenerManager private[sql](
    */
   @DeveloperApi
   def unregister(listener: QueryExecutionListener): Unit = {
-    listenerBus.removeListener(listener)
+    listenerBus.foreach(_.removeListener(listener))
   }
 
   /**
@@ -113,12 +121,12 @@ class ExecutionListenerManager private[sql](
    */
   @DeveloperApi
   def clear(): Unit = {
-    listenerBus.removeAllListeners()
+    listenerBus.foreach(_.removeAllListeners())
   }
 
   /** Only exposed for testing. */
   private[sql] def listListeners(): Array[QueryExecutionListener] = {
-    listenerBus.listeners.asScala.toArray
+    
listenerBus.map(_.listeners.asScala.toArray).getOrElse(Array.empty[QueryExecutionListener])
   }
 
   /**
@@ -127,7 +135,7 @@ class ExecutionListenerManager private[sql](
   private[sql] def clone(session: SparkSession, sqlConf: SQLConf): 
ExecutionListenerManager = {
     val newListenerManager =
       new ExecutionListenerManager(session, sqlConf, loadExtensions = false)
-    listenerBus.listeners.asScala.foreach(newListenerManager.register)
+    
listenerBus.foreach(_.listeners.asScala.foreach(newListenerManager.register))
     newListenerManager
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
index 2ab733eac0b..56219766f70 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
@@ -69,6 +69,21 @@ class ExecutionListenerManagerSuite extends SparkFunSuite 
with LocalSparkSession
     assert(INSTANCE_COUNT.get() === 1)
     assert(CALLBACK_COUNT.get() === 2)
   }
+
+  test("SPARK-39864 ExecutionListenerBus is lazily registered") {
+    spark = 
SparkSession.builder().master("local").appName("test").getOrCreate()
+    // Run a query to trigger the lazy initialization of the session state:
+    spark.sql("select 1").collect()
+    // The ExecutionListenerBus shouldn't be registered since no 
QueryExecutionListeners
+    // are registered:
+    
assert(spark.sparkContext.listenerBus.findListenersByClass[ExecutionListenerBus]().isEmpty)
+    // Registering the first query execution listener registers a listener bus:
+    spark.listenerManager.register(new CountingQueryExecutionListener)
+    
assert(spark.sparkContext.listenerBus.findListenersByClass[ExecutionListenerBus]().size
 == 1)
+    // Registering additional listeners reuses the same listener bus:
+    spark.listenerManager.register(new CountingQueryExecutionListener)
+    
assert(spark.sparkContext.listenerBus.findListenersByClass[ExecutionListenerBus]().size
 == 1)
+  }
 }
 
 private class CountingQueryExecutionListener extends QueryExecutionListener {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to