Repository: spark
Updated Branches:
  refs/heads/master e3727c409 -> 944fdadf7


[SPARK-12847][CORE][STREAMING] Remove StreamingListenerBus and post all 
Streaming events to the same thread as Spark events

Including the following changes:

1. Add StreamingListenerForwardingBus to WrappedStreamingListenerEvent process 
events in `onOtherEvent` to StreamingListener
2. Remove StreamingListenerBus
3. Merge AsynchronousListenerBus and LiveListenerBus to the same class 
LiveListenerBus
4. Add `logEvent` method to SparkListenerEvent so that EventLoggingListener can 
use it to ignore WrappedStreamingListenerEvents

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #10779 from zsxwing/streaming-listener.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/944fdadf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/944fdadf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/944fdadf

Branch: refs/heads/master
Commit: 944fdadf77523570f6b33544ad0b388031498952
Parents: e3727c4
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Jan 20 11:57:53 2016 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Jan 20 11:57:53 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   6 +-
 .../spark/scheduler/EventLoggingListener.scala  |   4 +-
 .../spark/scheduler/LiveListenerBus.scala       | 167 +++++++++++++++-
 .../apache/spark/scheduler/SparkListener.scala  |   5 +-
 .../spark/scheduler/SparkListenerBus.scala      |   2 +-
 .../spark/util/AsynchronousListenerBus.scala    | 190 -------------------
 .../org/apache/spark/util/ListenerBus.scala     |  14 +-
 project/MimaExcludes.scala                      |   4 +
 .../spark/streaming/StreamingContext.scala      |   9 +-
 .../streaming/scheduler/JobScheduler.scala      |   4 +-
 .../scheduler/StreamingListenerBus.scala        |  69 +++++--
 .../spark/streaming/InputStreamsSuite.scala     |   2 +-
 .../streaming/StreamingListenerSuite.scala      |  22 +++
 .../scheduler/ReceiverTrackerSuite.scala        |   2 -
 14 files changed, 269 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 77acb70..d7c605a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1644,9 +1644,9 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
 
   // Shut down the SparkContext.
   def stop() {
-    if (AsynchronousListenerBus.withinListenerThread.value) {
-      throw new SparkException("Cannot stop SparkContext within listener 
thread of" +
-        " AsynchronousListenerBus")
+    if (LiveListenerBus.withinListenerThread.value) {
+      throw new SparkException(
+        s"Cannot stop SparkContext within listener thread of 
${LiveListenerBus.name}")
     }
     // Use the stopping variable to ensure no contention for the stop scenario.
     // Still track the stopped variable for use elsewhere in the code.

http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index aa607c5..36f2b74 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -200,7 +200,9 @@ private[spark] class EventLoggingListener(
   override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
-    logEvent(event, flushLogger = true)
+    if (event.logEvent) {
+      logEvent(event, flushLogger = true)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index be23056..1c21313 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -17,24 +17,169 @@
 
 package org.apache.spark.scheduler
 
+import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
 
-import org.apache.spark.util.AsynchronousListenerBus
+import scala.util.DynamicVariable
+
+import org.apache.spark.SparkContext
+import org.apache.spark.util.Utils
 
 /**
  * Asynchronously passes SparkListenerEvents to registered SparkListeners.
  *
- * Until start() is called, all posted events are only buffered. Only after 
this listener bus
+ * Until `start()` is called, all posted events are only buffered. Only after 
this listener bus
  * has started will events be actually propagated to all attached listeners. 
This listener bus
- * is stopped when it receives a SparkListenerShutdown event, which is posted 
using stop().
+ * is stopped when `stop()` is called, and it will drop further events after 
stopping.
  */
-private[spark] class LiveListenerBus
-  extends AsynchronousListenerBus[SparkListener, 
SparkListenerEvent]("SparkListenerBus")
-  with SparkListenerBus {
+private[spark] class LiveListenerBus extends SparkListenerBus {
+
+  self =>
+
+  import LiveListenerBus._
+
+  private var sparkContext: SparkContext = null
+
+  // Cap the capacity of the event queue so we get an explicit error (rather 
than
+  // an OOM exception) if it's perpetually being added to more quickly than 
it's being drained.
+  private val EVENT_QUEUE_CAPACITY = 10000
+  private val eventQueue = new 
LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
+
+  // Indicate if `start()` is called
+  private val started = new AtomicBoolean(false)
+  // Indicate if `stop()` is called
+  private val stopped = new AtomicBoolean(false)
+
+  // Indicate if we are processing some event
+  // Guarded by `self`
+  private var processingEvent = false
 
   private val logDroppedEvent = new AtomicBoolean(false)
 
-  override def onDropEvent(event: SparkListenerEvent): Unit = {
+  // A counter that represents the number of events produced and consumed in 
the queue
+  private val eventLock = new Semaphore(0)
+
+  private val listenerThread = new Thread(name) {
+    setDaemon(true)
+    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
+      LiveListenerBus.withinListenerThread.withValue(true) {
+        while (true) {
+          eventLock.acquire()
+          self.synchronized {
+            processingEvent = true
+          }
+          try {
+            val event = eventQueue.poll
+            if (event == null) {
+              // Get out of the while loop and shutdown the daemon thread
+              if (!stopped.get) {
+                throw new IllegalStateException("Polling `null` from 
eventQueue means" +
+                  " the listener bus has been stopped. So `stopped` must be 
true")
+              }
+              return
+            }
+            postToAll(event)
+          } finally {
+            self.synchronized {
+              processingEvent = false
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Start sending events to attached listeners.
+   *
+   * This first sends out all buffered events posted before this listener bus 
has started, then
+   * listens for any additional events asynchronously while the listener bus 
is still running.
+   * This should only be called once.
+   *
+   * @param sc Used to stop the SparkContext in case the listener thread dies.
+   */
+  def start(sc: SparkContext): Unit = {
+    if (started.compareAndSet(false, true)) {
+      sparkContext = sc
+      listenerThread.start()
+    } else {
+      throw new IllegalStateException(s"$name already started!")
+    }
+  }
+
+  def post(event: SparkListenerEvent): Unit = {
+    if (stopped.get) {
+      // Drop further events to make `listenerThread` exit ASAP
+      logError(s"$name has already stopped! Dropping event $event")
+      return
+    }
+    val eventAdded = eventQueue.offer(event)
+    if (eventAdded) {
+      eventLock.release()
+    } else {
+      onDropEvent(event)
+    }
+  }
+
+  /**
+   * For testing only. Wait until there are no more events in the queue, or 
until the specified
+   * time has elapsed. Throw `TimeoutException` if the specified time elapsed 
before the queue
+   * emptied.
+   * Exposed for testing.
+   */
+  @throws(classOf[TimeoutException])
+  def waitUntilEmpty(timeoutMillis: Long): Unit = {
+    val finishTime = System.currentTimeMillis + timeoutMillis
+    while (!queueIsEmpty) {
+      if (System.currentTimeMillis > finishTime) {
+        throw new TimeoutException(
+          s"The event queue is not empty after $timeoutMillis milliseconds")
+      }
+      /* Sleep rather than using wait/notify, because this is used only for 
testing and
+       * wait/notify add overhead in the general case. */
+      Thread.sleep(10)
+    }
+  }
+
+  /**
+   * For testing only. Return whether the listener daemon thread is still 
alive.
+   * Exposed for testing.
+   */
+  def listenerThreadIsAlive: Boolean = listenerThread.isAlive
+
+  /**
+   * Return whether the event queue is empty.
+   *
+   * The use of synchronized here guarantees that all events that once 
belonged to this queue
+   * have already been processed by all attached listeners, if this returns 
true.
+   */
+  private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && 
!processingEvent }
+
+  /**
+   * Stop the listener bus. It will wait until the queued events have been 
processed, but drop the
+   * new events after stopping.
+   */
+  def stop(): Unit = {
+    if (!started.get()) {
+      throw new IllegalStateException(s"Attempted to stop $name that has not 
yet started!")
+    }
+    if (stopped.compareAndSet(false, true)) {
+      // Call eventLock.release() so that listenerThread will poll `null` from 
`eventQueue` and know
+      // `stop` is called.
+      eventLock.release()
+      listenerThread.join()
+    } else {
+      // Keep quiet
+    }
+  }
+
+  /**
+   * If the event queue exceeds its capacity, the new events will be dropped. 
The subclasses will be
+   * notified with the dropped events.
+   *
+   * Note: `onDropEvent` can be called in any thread.
+   */
+  def onDropEvent(event: SparkListenerEvent): Unit = {
     if (logDroppedEvent.compareAndSet(false, true)) {
       // Only log the following message once to avoid duplicated annoying logs.
       logError("Dropping SparkListenerEvent because no remaining room in event 
queue. " +
@@ -42,5 +187,13 @@ private[spark] class LiveListenerBus
         "the rate at which tasks are being started by the scheduler.")
     }
   }
+}
+
+private[spark] object LiveListenerBus {
+  // Allows for Context to check whether stop() call is made within listener 
thread
+  val withinListenerThread: DynamicVariable[Boolean] = new 
DynamicVariable[Boolean](false)
 
+  /** The thread name of Spark listener bus */
+  val name = "SparkListenerBus"
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index f5267f5..6c6883d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -34,7 +34,10 @@ import org.apache.spark.util.{Distribution, Utils}
 
 @DeveloperApi
 @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, 
property = "Event")
-trait SparkListenerEvent
+trait SparkListenerEvent {
+  /* Whether output this event to the event log */
+  protected[spark] def logEvent: Boolean = true
+}
 
 @DeveloperApi
 case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: 
Properties = null)

http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 95722a0..94f0574 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -24,7 +24,7 @@ import org.apache.spark.util.ListenerBus
  */
 private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, 
SparkListenerEvent] {
 
-  override def onPostEvent(listener: SparkListener, event: 
SparkListenerEvent): Unit = {
+  protected override def doPostEvent(listener: SparkListener, event: 
SparkListenerEvent): Unit = {
     event match {
       case stageSubmitted: SparkListenerStageSubmitted =>
         listener.onStageSubmitted(stageSubmitted)

http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala 
b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
deleted file mode 100644
index f6b7ea2..0000000
--- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
-
-import scala.util.DynamicVariable
-
-import org.apache.spark.SparkContext
-
-/**
- * Asynchronously passes events to registered listeners.
- *
- * Until `start()` is called, all posted events are only buffered. Only after 
this listener bus
- * has started will events be actually propagated to all attached listeners. 
This listener bus
- * is stopped when `stop()` is called, and it will drop further events after 
stopping.
- *
- * @param name name of the listener bus, will be the name of the listener 
thread.
- * @tparam L type of listener
- * @tparam E type of event
- */
-private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: 
String)
-  extends ListenerBus[L, E] {
-
-  self =>
-
-  private var sparkContext: SparkContext = null
-
-  /* Cap the capacity of the event queue so we get an explicit error (rather 
than
-   * an OOM exception) if it's perpetually being added to more quickly than 
it's being drained. */
-  private val EVENT_QUEUE_CAPACITY = 10000
-  private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)
-
-  // Indicate if `start()` is called
-  private val started = new AtomicBoolean(false)
-  // Indicate if `stop()` is called
-  private val stopped = new AtomicBoolean(false)
-
-  // Indicate if we are processing some event
-  // Guarded by `self`
-  private var processingEvent = false
-
-  // A counter that represents the number of events produced and consumed in 
the queue
-  private val eventLock = new Semaphore(0)
-
-  private val listenerThread = new Thread(name) {
-    setDaemon(true)
-    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
-      AsynchronousListenerBus.withinListenerThread.withValue(true) {
-        while (true) {
-          eventLock.acquire()
-          self.synchronized {
-            processingEvent = true
-          }
-          try {
-            val event = eventQueue.poll
-            if (event == null) {
-              // Get out of the while loop and shutdown the daemon thread
-              if (!stopped.get) {
-                throw new IllegalStateException("Polling `null` from 
eventQueue means" +
-                  " the listener bus has been stopped. So `stopped` must be 
true")
-              }
-              return
-            }
-            postToAll(event)
-          } finally {
-            self.synchronized {
-              processingEvent = false
-            }
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Start sending events to attached listeners.
-   *
-   * This first sends out all buffered events posted before this listener bus 
has started, then
-   * listens for any additional events asynchronously while the listener bus 
is still running.
-   * This should only be called once.
-   *
-   * @param sc Used to stop the SparkContext in case the listener thread dies.
-   */
-  def start(sc: SparkContext) {
-    if (started.compareAndSet(false, true)) {
-      sparkContext = sc
-      listenerThread.start()
-    } else {
-      throw new IllegalStateException(s"$name already started!")
-    }
-  }
-
-  def post(event: E) {
-    if (stopped.get) {
-      // Drop further events to make `listenerThread` exit ASAP
-      logError(s"$name has already stopped! Dropping event $event")
-      return
-    }
-    val eventAdded = eventQueue.offer(event)
-    if (eventAdded) {
-      eventLock.release()
-    } else {
-      onDropEvent(event)
-    }
-  }
-
-  /**
-   * For testing only. Wait until there are no more events in the queue, or 
until the specified
-   * time has elapsed. Throw `TimeoutException` if the specified time elapsed 
before the queue
-   * emptied.
-   * Exposed for testing.
-   */
-  @throws(classOf[TimeoutException])
-  def waitUntilEmpty(timeoutMillis: Long): Unit = {
-    val finishTime = System.currentTimeMillis + timeoutMillis
-    while (!queueIsEmpty) {
-      if (System.currentTimeMillis > finishTime) {
-        throw new TimeoutException(
-          s"The event queue is not empty after $timeoutMillis milliseconds")
-      }
-      /* Sleep rather than using wait/notify, because this is used only for 
testing and
-       * wait/notify add overhead in the general case. */
-      Thread.sleep(10)
-    }
-  }
-
-  /**
-   * For testing only. Return whether the listener daemon thread is still 
alive.
-   * Exposed for testing.
-   */
-  def listenerThreadIsAlive: Boolean = listenerThread.isAlive
-
-  /**
-   * Return whether the event queue is empty.
-   *
-   * The use of synchronized here guarantees that all events that once 
belonged to this queue
-   * have already been processed by all attached listeners, if this returns 
true.
-   */
-  private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && 
!processingEvent }
-
-  /**
-   * Stop the listener bus. It will wait until the queued events have been 
processed, but drop the
-   * new events after stopping.
-   */
-  def stop() {
-    if (!started.get()) {
-      throw new IllegalStateException(s"Attempted to stop $name that has not 
yet started!")
-    }
-    if (stopped.compareAndSet(false, true)) {
-      // Call eventLock.release() so that listenerThread will poll `null` from 
`eventQueue` and know
-      // `stop` is called.
-      eventLock.release()
-      listenerThread.join()
-    } else {
-      // Keep quiet
-    }
-  }
-
-  /**
-   * If the event queue exceeds its capacity, the new events will be dropped. 
The subclasses will be
-   * notified with the dropped events.
-   *
-   * Note: `onDropEvent` can be called in any thread.
-   */
-  def onDropEvent(event: E): Unit
-}
-
-private[spark] object AsynchronousListenerBus {
-  /* Allows for Context to check whether stop() call is made within listener 
thread
-  */
-  val withinListenerThread: DynamicVariable[Boolean] = new 
DynamicVariable[Boolean](false)
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala 
b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index 13cb516..5e1fab0 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -36,11 +36,19 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends 
Logging {
   /**
    * Add a listener to listen events. This method is thread-safe and can be 
called in any thread.
    */
-  final def addListener(listener: L) {
+  final def addListener(listener: L): Unit = {
     listeners.add(listener)
   }
 
   /**
+   * Remove a listener and it won't receive any events. This method is 
thread-safe and can be called
+   * in any thread.
+   */
+  final def removeListener(listener: L): Unit = {
+    listeners.remove(listener)
+  }
+
+  /**
    * Post the event to all registered listeners. The `postToAll` caller should 
guarantee calling
    * `postToAll` in the same thread for all events.
    */
@@ -52,7 +60,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends 
Logging {
     while (iter.hasNext) {
       val listener = iter.next()
       try {
-        onPostEvent(listener, event)
+        doPostEvent(listener, event)
       } catch {
         case NonFatal(e) =>
           logError(s"Listener ${Utils.getFormattedClassName(listener)} threw 
an exception", e)
@@ -64,7 +72,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends 
Logging {
    * Post an event to the specified listener. `onPostEvent` is guaranteed to 
be called in the same
    * thread.
    */
-  def onPostEvent(listener: L, event: E): Unit
+  protected def doPostEvent(listener: L, event: E): Unit
 
   private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
     val c = implicitly[ClassTag[T]].runtimeClass

http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4430bfd..6469201 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -153,6 +153,10 @@ object MimaExcludes {
         
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$_log_="),
         
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log_"),
         
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log__=")
+      ) ++ Seq(
+        // SPARK-12847 Remove StreamingListenerBus and post all Streaming 
events to the same thread as Spark events
+        
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus$"),
+        
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus")
       )
     case v if v.startsWith("1.6") =>
       Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 157ee92..b7070dd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -37,6 +37,7 @@ import org.apache.spark.annotation.{DeveloperApi, 
Experimental}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.input.FixedLengthBinaryInputFormat
 import org.apache.spark.rdd.{RDD, RDDOperationScope}
+import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.SerializationDebugger
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContextState._
@@ -44,7 +45,7 @@ import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.{ActorReceiverSupervisor, 
ActorSupervisorStrategy, Receiver}
 import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, 
StreamingTab}
-import org.apache.spark.util.{AsynchronousListenerBus, CallSite, 
ShutdownHookManager, ThreadUtils, Utils}
+import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, 
Utils}
 
 /**
  * Main entry point for Spark Streaming functionality. It provides methods 
used to create
@@ -694,9 +695,9 @@ class StreamingContext private[streaming] (
    */
   def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
     var shutdownHookRefToRemove: AnyRef = null
-    if (AsynchronousListenerBus.withinListenerThread.value) {
-      throw new SparkException("Cannot stop StreamingContext within listener 
thread of" +
-        " AsynchronousListenerBus")
+    if (LiveListenerBus.withinListenerThread.value) {
+      throw new SparkException(
+        s"Cannot stop StreamingContext within listener thread of 
${LiveListenerBus.name}")
     }
     synchronized {
       // The state should always be Stopped after calling `stop()`, even if we 
haven't started yet

http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 1ed6fb0..9535c8e 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -49,7 +49,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging 
{
     ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, 
"streaming-job-executor")
   private val jobGenerator = new JobGenerator(this)
   val clock = jobGenerator.clock
-  val listenerBus = new StreamingListenerBus()
+  val listenerBus = new StreamingListenerBus(ssc.sparkContext.listenerBus)
 
   // These two are created only when scheduler starts.
   // eventLoop not being null means the scheduler has been started and not 
stopped
@@ -76,7 +76,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging 
{
       rateController <- inputDStream.rateController
     } ssc.addStreamingListener(rateController)
 
-    listenerBus.start(ssc.sparkContext)
+    listenerBus.start()
     receiverTracker = new ReceiverTracker(ssc)
     inputInfoTracker = new InputInfoTracker(ssc)
     receiverTracker.start()

http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index ca111bb..39f6e71 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -17,19 +17,37 @@
 
 package org.apache.spark.streaming.scheduler
 
-import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerEvent}
+import org.apache.spark.util.ListenerBus
 
-import org.apache.spark.Logging
-import org.apache.spark.util.AsynchronousListenerBus
+/**
+ * A Streaming listener bus to forward events to StreamingListeners. This one 
will wrap received
+ * Streaming events as WrappedStreamingListenerEvent and send them to Spark 
listener bus. It also
+ * registers itself with Spark listener bus, so that it can receive 
WrappedStreamingListenerEvents,
+ * unwrap them as StreamingListenerEvent and dispatch them to 
StreamingListeners.
+ */
+private[streaming] class StreamingListenerBus(sparkListenerBus: 
LiveListenerBus)
+  extends SparkListener with ListenerBus[StreamingListener, 
StreamingListenerEvent] {
 
-/** Asynchronously passes StreamingListenerEvents to registered 
StreamingListeners. */
-private[spark] class StreamingListenerBus
-  extends AsynchronousListenerBus[StreamingListener, 
StreamingListenerEvent]("StreamingListenerBus")
-  with Logging {
+  /**
+   * Post a StreamingListenerEvent to the Spark listener bus asynchronously. 
This event will be
+   * dispatched to all StreamingListeners in the thread of the Spark listener 
bus.
+   */
+  def post(event: StreamingListenerEvent) {
+    sparkListenerBus.post(new WrappedStreamingListenerEvent(event))
+  }
 
-  private val logDroppedEvent = new AtomicBoolean(false)
+  override def onOtherEvent(event: SparkListenerEvent): Unit = {
+    event match {
+      case WrappedStreamingListenerEvent(e) =>
+        postToAll(e)
+      case _ =>
+    }
+  }
 
-  override def onPostEvent(listener: StreamingListener, event: 
StreamingListenerEvent): Unit = {
+  protected override def doPostEvent(
+      listener: StreamingListener,
+      event: StreamingListenerEvent): Unit = {
     event match {
       case receiverStarted: StreamingListenerReceiverStarted =>
         listener.onReceiverStarted(receiverStarted)
@@ -51,12 +69,31 @@ private[spark] class StreamingListenerBus
     }
   }
 
-  override def onDropEvent(event: StreamingListenerEvent): Unit = {
-    if (logDroppedEvent.compareAndSet(false, true)) {
-      // Only log the following message once to avoid duplicated annoying logs.
-      logError("Dropping StreamingListenerEvent because no remaining room in 
event queue. " +
-        "This likely means one of the StreamingListeners is too slow and 
cannot keep up with the " +
-        "rate at which events are being started by the scheduler.")
-    }
+  /**
+   * Register this one with the Spark listener bus so that it can receive 
Streaming events and
+   * forward them to StreamingListeners.
+   */
+  def start(): Unit = {
+    sparkListenerBus.addListener(this) // for getting callbacks on spark events
+  }
+
+  /**
+   * Unregister this one with the Spark listener bus and all 
StreamingListeners won't receive any
+   * events after that.
+   */
+  def stop(): Unit = {
+    sparkListenerBus.removeListener(this)
+  }
+
+  /**
+   * Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can 
be posted to Spark
+   * listener bus.
+   */
+  private case class WrappedStreamingListenerEvent(streamingListenerEvent: 
StreamingListenerEvent)
+    extends SparkListenerEvent {
+
+    // Do not log streaming events in event log as history server does not 
support streaming
+    // events (SPARK-12140). TODO Once SPARK-12140 is resolved we should set 
it to true.
+    protected[spark] override def logEvent: Boolean = false
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 2e23160..75591f0 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -77,7 +77,7 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
         }
 
         // Ensure progress listener has been notified of all events
-        ssc.scheduler.listenerBus.waitUntilEmpty(500)
+        ssc.sparkContext.listenerBus.waitUntilEmpty(500)
 
         // Verify all "InputInfo"s have been reported
         assert(ssc.progressListener.numTotalReceivedRecords === input.size)

http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 628a508..1ed68c7 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, 
SynchronizedBuffer, Synch
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
 
+import org.mockito.Mockito.{mock, reset, verifyNoMoreInteractions}
 import org.scalatest.Matchers
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
@@ -216,6 +217,27 @@ class StreamingListenerSuite extends TestSuiteBase with 
Matchers {
     assert(failureReasons(1).contains("This is another failed job"))
   }
 
+  test("StreamingListener receives no events after stopping 
StreamingListenerBus") {
+    val streamingListener = mock(classOf[StreamingListener])
+
+    ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+    ssc.addStreamingListener(streamingListener)
+    val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
+    inputStream.foreachRDD(_.count)
+    ssc.start()
+    ssc.stop()
+
+    // Because "streamingListener" has already received some events, let's 
clear that.
+    reset(streamingListener)
+
+    // Post a Streaming event after stopping StreamingContext
+    val receiverInfoStopped = ReceiverInfo(0, "test", false, "localhost", "0")
+    
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(receiverInfoStopped))
+    ssc.sparkContext.listenerBus.waitUntilEmpty(1000)
+    // The StreamingListener should not receive any event
+    verifyNoMoreInteractions(streamingListener)
+  }
+
   private def startStreamingContextAndCallStop(_ssc: StreamingContext): Unit = 
{
     val contextStoppingCollector = new StreamingContextStoppingCollector(_ssc)
     _ssc.addStreamingListener(contextStoppingCollector)

http://git-wip-us.apache.org/repos/asf/spark/blob/944fdadf/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index b67189f..cfd7f86 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -34,8 +34,6 @@ class ReceiverTrackerSuite extends TestSuiteBase {
 
   test("send rate update to receivers") {
     withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc 
=>
-      ssc.scheduler.listenerBus.start(ssc.sc)
-
       val newRateLimit = 100L
       val inputDStream = new RateTestInputDStream(ssc)
       val tracker = new ReceiverTracker(ssc)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to