Repository: spark
Updated Branches:
  refs/heads/master 9d06a9e0c -> ee10ca7ec


[SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus

## What changes were proposed in this pull request?

Use a separate Spark event queue for StreamingQueryListenerBus so that if there 
are many non-streaming events, streaming query listeners don't need to wait for 
other Spark listeners and can catch up.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxw...@gmail.com>

Closes #19838 from zsxwing/SPARK-22638.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee10ca7e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee10ca7e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee10ca7e

Branch: refs/heads/master
Commit: ee10ca7ec6cf7fbaab3f95a097b46936d97d0835
Parents: 9d06a9e
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Fri Dec 1 13:02:03 2017 -0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Fri Dec 1 13:02:03 2017 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/LiveListenerBus.scala     | 4 +++-
 .../sql/execution/streaming/StreamingQueryListenerBus.scala    | 6 +++++-
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee10ca7e/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 2f93c49..2312140 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -87,7 +87,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
    * of each other (each one uses a separate thread for delivering events), 
allowing slower
    * listeners to be somewhat isolated from others.
    */
-  private def addToQueue(listener: SparkListenerInterface, queue: String): 
Unit = synchronized {
+  private[spark] def addToQueue(
+      listener: SparkListenerInterface,
+      queue: String): Unit = synchronized {
     if (stopped.get()) {
       throw new IllegalStateException("LiveListenerBus is stopped.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ee10ca7e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index 07e3902..7dd491e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
 
   import StreamingQueryListener._
 
-  sparkListenerBus.addToSharedQueue(this)
+  sparkListenerBus.addToQueue(this, 
StreamingQueryListenerBus.STREAM_EVENT_QUERY)
 
   /**
    * RunIds of active queries whose events are supposed to be forwarded by 
this ListenerBus
@@ -130,3 +130,7 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
     }
   }
 }
+
+object StreamingQueryListenerBus {
+  val STREAM_EVENT_QUERY = "streams"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to