Repository: spark Updated Branches: refs/heads/master 9d06a9e0c -> ee10ca7ec
[SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus ## What changes were proposed in this pull request? Use a separate Spark event queue for StreamingQueryListenerBus so that if there are many non-streaming events, streaming query listeners don't need to wait for other Spark listeners and can catch up. ## How was this patch tested? Jenkins Author: Shixiong Zhu <zsxw...@gmail.com> Closes #19838 from zsxwing/SPARK-22638. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee10ca7e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee10ca7e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee10ca7e Branch: refs/heads/master Commit: ee10ca7ec6cf7fbaab3f95a097b46936d97d0835 Parents: 9d06a9e Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Fri Dec 1 13:02:03 2017 -0800 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Fri Dec 1 13:02:03 2017 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/scheduler/LiveListenerBus.scala | 4 +++- .../sql/execution/streaming/StreamingQueryListenerBus.scala | 6 +++++- 2 files changed, 8 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ee10ca7e/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 2f93c49..2312140 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -87,7 +87,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) { * of each other (each one uses a separate thread for delivering events), allowing slower * listeners to be somewhat isolated from others. */ - private def addToQueue(listener: SparkListenerInterface, queue: String): Unit = synchronized { + private[spark] def addToQueue( + listener: SparkListenerInterface, + queue: String): Unit = synchronized { if (stopped.get()) { throw new IllegalStateException("LiveListenerBus is stopped.") } http://git-wip-us.apache.org/repos/asf/spark/blob/ee10ca7e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 07e3902..7dd491e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) import StreamingQueryListener._ - sparkListenerBus.addToSharedQueue(this) + sparkListenerBus.addToQueue(this, StreamingQueryListenerBus.STREAM_EVENT_QUERY) /** * RunIds of active queries whose events are supposed to be forwarded by this ListenerBus @@ -130,3 +130,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) } } } + +object StreamingQueryListenerBus { + val STREAM_EVENT_QUERY = "streams" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org