[ https://issues.apache.org/jira/browse/SPARK-27804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
duyu updated SPARK-27804: ------------------------- Description: LiveListenerBus.scala {code} private[spark] def addToQueue( listener: SparkListenerInterface, queue: String): Unit = synchronized { if (stopped.get()) { throw new IllegalStateException("LiveListenerBus is stopped.") } queues.asScala.find(_.name == queue) match { case Some(queue) => queue.addListener(listener) case None => // it will create multiple AsyncEventQueues with the same name when run in multi-thread scene and those created AsyncEventQueues will be added to queues val newQueue = new AsyncEventQueue(queue, conf, metrics, this) newQueue.addListener(listener) if (started.get()) { newQueue.start(sparkContext) } queues.add(newQueue) } } {code} was: LiveListenerBus.scala {code:scala} private[spark] def addToQueue( listener: SparkListenerInterface, queue: String): Unit = synchronized { if (stopped.get()) { throw new IllegalStateException("LiveListenerBus is stopped.") } queues.asScala.find(_.name == queue) match { case Some(queue) => queue.addListener(listener) case None => // it will create multiple AsyncEventQueues when run in multi-thread scene and those created AsyncEventQueues will be added to queues val newQueue = new AsyncEventQueue(queue, conf, metrics, this) newQueue.addListener(listener) if (started.get()) { newQueue.start(sparkContext) } queues.add(newQueue) } } {code} > LiveListenerBus#addToQueue : create multiple AsyncEventQueues under race > condition > ---------------------------------------------------------------------------------- > > Key: SPARK-27804 > URL: https://issues.apache.org/jira/browse/SPARK-27804 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.3 > Reporter: duyu > Priority: Major > > LiveListenerBus.scala > {code} > private[spark] def addToQueue( > listener: SparkListenerInterface, > queue: String): Unit = synchronized { > if (stopped.get()) { > throw new IllegalStateException("LiveListenerBus is stopped.") > } > queues.asScala.find(_.name == queue) match { > case Some(queue) => > queue.addListener(listener) > case None => > // it will create multiple AsyncEventQueues with the same name when > run in multi-thread scene and those created AsyncEventQueues will be added to > queues > val newQueue = new AsyncEventQueue(queue, conf, metrics, this) > newQueue.addListener(listener) > if (started.get()) { > newQueue.start(sparkContext) > } > queues.add(newQueue) > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org