[ 
https://issues.apache.org/jira/browse/SPARK-27804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

duyu updated SPARK-27804:
-------------------------
    Description: 
LiveListenerBus.scala
{code}
  private[spark] def addToQueue(
      listener: SparkListenerInterface,
      queue: String): Unit = synchronized {
    if (stopped.get()) {
      throw new IllegalStateException("LiveListenerBus is stopped.")
    }

    queues.asScala.find(_.name == queue) match {
      case Some(queue) =>
        queue.addListener(listener)

      case None =>
        // it will create multiple AsyncEventQueues with the same name when run 
in multi-thread scene and those created AsyncEventQueues will be added to queues
        val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
        newQueue.addListener(listener)
        if (started.get()) {
          newQueue.start(sparkContext)
        }
        queues.add(newQueue)
    }
  }
{code}

  was:
LiveListenerBus.scala
{code:scala}

  private[spark] def addToQueue(
      listener: SparkListenerInterface,
      queue: String): Unit = synchronized {
    if (stopped.get()) {
      throw new IllegalStateException("LiveListenerBus is stopped.")
    }

    queues.asScala.find(_.name == queue) match {
      case Some(queue) =>
        queue.addListener(listener)

      case None =>
        // it will create multiple AsyncEventQueues when run in multi-thread 
scene and those created AsyncEventQueues will be added to queues
        val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
        newQueue.addListener(listener)
        if (started.get()) {
          newQueue.start(sparkContext)
        }
        queues.add(newQueue)
    }
  }
{code}


> LiveListenerBus#addToQueue : create multiple AsyncEventQueues under race 
> condition
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-27804
>                 URL: https://issues.apache.org/jira/browse/SPARK-27804
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.3
>            Reporter: duyu
>            Priority: Major
>
> LiveListenerBus.scala
> {code}
>   private[spark] def addToQueue(
>       listener: SparkListenerInterface,
>       queue: String): Unit = synchronized {
>     if (stopped.get()) {
>       throw new IllegalStateException("LiveListenerBus is stopped.")
>     }
>     queues.asScala.find(_.name == queue) match {
>       case Some(queue) =>
>         queue.addListener(listener)
>       case None =>
>         // it will create multiple AsyncEventQueues with the same name when 
> run in multi-thread scene and those created AsyncEventQueues will be added to 
> queues
>         val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
>         newQueue.addListener(listener)
>         if (started.get()) {
>           newQueue.start(sparkContext)
>         }
>         queues.add(newQueue)
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to