Repository: spark
Updated Branches:
  refs/heads/branch-2.3 cd92913f3 -> bc4bef472


[SPARK-22850][CORE] Ensure queued events are delivered to all event queues.

The code in LiveListenerBus was queueing events before start in the
queues themselves; so in situations like the following:

   bus.post(someEvent)
   bus.addToEventLogQueue(listener)
   bus.start()

"someEvent" would not be delivered to "listener" if that was the first
listener in the queue, because the queue wouldn't exist when the
event was posted.

This change buffers the events before starting the bus in the bus itself,
so that they can be delivered to all registered queues when the bus is
started.

Also tweaked the unit tests to cover the behavior above.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #20039 from vanzin/SPARK-22850.

(cherry picked from commit d2cddc88eac32f26b18ec26bb59e85c6f09a8c88)
Signed-off-by: Imran Rashid <iras...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc4bef47
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc4bef47
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc4bef47

Branch: refs/heads/branch-2.3
Commit: bc4bef472de0e99f74a80954d694c3d1744afe3a
Parents: cd92913
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Thu Jan 4 16:19:00 2018 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Thu Jan 4 16:19:22 2018 -0600

----------------------------------------------------------------------
 .../spark/scheduler/LiveListenerBus.scala       | 45 +++++++++++++++++---
 .../spark/scheduler/SparkListenerSuite.scala    | 21 +++++----
 2 files changed, 52 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bc4bef47/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 2312140..ba6387a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -62,6 +62,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
 
   private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
 
+  // Visible for testing.
+  @volatile private[scheduler] var queuedEvents = new 
mutable.ListBuffer[SparkListenerEvent]()
+
   /** Add a listener to queue shared by all non-internal listeners. */
   def addToSharedQueue(listener: SparkListenerInterface): Unit = {
     addToQueue(listener, SHARED_QUEUE)
@@ -125,13 +128,39 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
 
   /** Post an event to all queues. */
   def post(event: SparkListenerEvent): Unit = {
-    if (!stopped.get()) {
-      metrics.numEventsPosted.inc()
-      val it = queues.iterator()
-      while (it.hasNext()) {
-        it.next().post(event)
+    if (stopped.get()) {
+      return
+    }
+
+    metrics.numEventsPosted.inc()
+
+    // If the event buffer is null, it means the bus has been started and we 
can avoid
+    // synchronization and post events directly to the queues. This should be 
the most
+    // common case during the life of the bus.
+    if (queuedEvents == null) {
+      postToQueues(event)
+      return
+    }
+
+    // Otherwise, need to synchronize to check whether the bus is started, to 
make sure the thread
+    // calling start() picks up the new event.
+    synchronized {
+      if (!started.get()) {
+        queuedEvents += event
+        return
       }
     }
+
+    // If the bus was already started when the check above was made, just post 
directly to the
+    // queues.
+    postToQueues(event)
+  }
+
+  private def postToQueues(event: SparkListenerEvent): Unit = {
+    val it = queues.iterator()
+    while (it.hasNext()) {
+      it.next().post(event)
+    }
   }
 
   /**
@@ -149,7 +178,11 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
     }
 
     this.sparkContext = sc
-    queues.asScala.foreach(_.start(sc))
+    queues.asScala.foreach { q =>
+      q.start(sc)
+      queuedEvents.foreach(q.post)
+    }
+    queuedEvents = null
     metricsSystem.registerSource(metrics)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bc4bef47/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 1beb36a..da6ecb8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -48,7 +48,7 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     
bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount
   }
 
-  private def queueSize(bus: LiveListenerBus): Int = {
+  private def sharedQueueSize(bus: LiveListenerBus): Int = {
     
bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue()
       .asInstanceOf[Int]
   }
@@ -73,12 +73,11 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     val conf = new SparkConf()
     val counter = new BasicJobCounter
     val bus = new LiveListenerBus(conf)
-    bus.addToSharedQueue(counter)
 
     // Metrics are initially empty.
     assert(bus.metrics.numEventsPosted.getCount === 0)
     assert(numDroppedEvents(bus) === 0)
-    assert(queueSize(bus) === 0)
+    assert(bus.queuedEvents.size === 0)
     assert(eventProcessingTimeCount(bus) === 0)
 
     // Post five events:
@@ -87,7 +86,10 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     // Five messages should be marked as received and queued, but no messages 
should be posted to
     // listeners yet because the the listener bus hasn't been started.
     assert(bus.metrics.numEventsPosted.getCount === 5)
-    assert(queueSize(bus) === 5)
+    assert(bus.queuedEvents.size === 5)
+
+    // Add the counter to the bus after messages have been queued for later 
delivery.
+    bus.addToSharedQueue(counter)
     assert(counter.count === 0)
 
     // Starting listener bus should flush all buffered events
@@ -95,9 +97,12 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
     bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(counter.count === 5)
-    assert(queueSize(bus) === 0)
+    assert(sharedQueueSize(bus) === 0)
     assert(eventProcessingTimeCount(bus) === 5)
 
+    // After the bus is started, there should be no more queued events.
+    assert(bus.queuedEvents === null)
+
     // After listener bus has stopped, posting events should not increment 
counter
     bus.stop()
     (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, 
JobSucceeded)) }
@@ -188,18 +193,18 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     // Post a message to the listener bus and wait for processing to begin:
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
     listenerStarted.acquire()
-    assert(queueSize(bus) === 0)
+    assert(sharedQueueSize(bus) === 0)
     assert(numDroppedEvents(bus) === 0)
 
     // If we post an additional message then it should remain in the queue 
because the listener is
     // busy processing the first event:
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
-    assert(queueSize(bus) === 1)
+    assert(sharedQueueSize(bus) === 1)
     assert(numDroppedEvents(bus) === 0)
 
     // The queue is now full, so any additional events posted to the listener 
will be dropped:
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
-    assert(queueSize(bus) === 1)
+    assert(sharedQueueSize(bus) === 1)
     assert(numDroppedEvents(bus) === 1)
 
     // Allow the the remaining events to be processed so we can stop the 
listener bus:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to