Github user abellina commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18253#discussion_r129879433
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -39,98 +39,107 @@ import org.apache.spark.util.Utils
      * has started will events be actually propagated to all attached 
listeners. This listener bus
      * is stopped when `stop()` is called, and it will drop further events 
after stopping.
      */
    -private[spark] class LiveListenerBus(conf: SparkConf) extends 
SparkListenerBus {
    -
    -  self =>
    +private[spark] class LiveListenerBus(conf: SparkConf)
    +  extends WithMultipleListenerBus[SparkListenerInterface, 
SparkListenerEvent] with Logging{
     
       import LiveListenerBus._
     
       private var sparkContext: SparkContext = _
    +  private var metricsSystem: MetricsSystem = _
     
    -  // Cap the capacity of the event queue so we get an explicit error 
(rather than
    -  // an OOM exception) if it's perpetually being added to more quickly 
than it's being drained.
    -  private val eventQueue =
    -    new 
LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
    +  private val defaultListenerQueue =
    +    new GroupOfListenersBusQueue("default", 
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
     
    -  private[spark] val metrics = new LiveListenerBusMetrics(conf, eventQueue)
    +  @volatile private var otherListenerQueues = Seq.empty[ListenerBusQueue]
     
    -  // Indicate if `start()` is called
    -  private val started = new AtomicBoolean(false)
    -  // Indicate if `stop()` is called
    +  // start, stop and add/remove listener should be mutually exclusive
    +  private val startStopAddRemoveLock = new ReentrantLock()
    +  // Will be set modified in a synchronized function
    +  @volatile private var started = false
       private val stopped = new AtomicBoolean(false)
     
    -  /** A counter for dropped events. It will be reset every time we log it. 
*/
    -  private val droppedEventsCounter = new AtomicLong(0L)
    -
    -  /** When `droppedEventsCounter` was logged last time in milliseconds. */
    -  @volatile private var lastReportTimestamp = 0L
    -
    -  // Indicate if we are processing some event
    -  // Guarded by `self`
    -  private var processingEvent = false
    +   /**
    +    * Add a listener to the default pool.
    +    * This method is thread-safe and can be called in any thread.
    +    */
    +  final override def addListener(listener: SparkListenerInterface): Unit = 
{
    +    startStopAddRemoveLock.lock()
    --- End diff --
    
    same for other lock/unlocks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to