Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22674#discussion_r223886858
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala 
---
    @@ -75,95 +76,74 @@ trait QueryExecutionListener {
      */
     @Experimental
     @InterfaceStability.Evolving
    -class ExecutionListenerManager private extends Logging {
    -
    -  private[sql] def this(conf: SparkConf) = {
    -    this()
    +// The `session` is used to indicate which session carries this listener 
manager, and we only
    +// catch SQL executions which are launched by the same session.
    +// The `loadExtensions` flag is used to indicate whether we should load 
the pre-defined,
    +// user-specified listeners during construction. We should not do it when 
cloning this listener
    +// manager, as we will copy all listeners to the cloned listener manager.
    +class ExecutionListenerManager private[sql](session: SparkSession, 
loadExtensions: Boolean)
    +  extends SparkListener with Logging {
    +
    +  private[this] val listeners = new 
CopyOnWriteArrayList[QueryExecutionListener]
    +
    +  if (loadExtensions) {
    +    val conf = session.sparkContext.conf
         conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames =>
           Utils.loadExtensions(classOf[QueryExecutionListener], classNames, 
conf).foreach(register)
         }
       }
     
    +  session.sparkContext.listenerBus.addToSharedQueue(this)
    +
       /**
        * Registers the specified [[QueryExecutionListener]].
        */
       @DeveloperApi
    -  def register(listener: QueryExecutionListener): Unit = writeLock {
    -    listeners += listener
    +  def register(listener: QueryExecutionListener): Unit = {
    +    listeners.add(listener)
       }
     
       /**
        * Unregisters the specified [[QueryExecutionListener]].
        */
       @DeveloperApi
    -  def unregister(listener: QueryExecutionListener): Unit = writeLock {
    -    listeners -= listener
    +  def unregister(listener: QueryExecutionListener): Unit = {
    +    listeners.remove(listener)
       }
     
       /**
        * Removes all the registered [[QueryExecutionListener]].
        */
       @DeveloperApi
    -  def clear(): Unit = writeLock {
    +  def clear(): Unit = {
         listeners.clear()
       }
     
       /**
        * Get an identical copy of this listener manager.
        */
    -  @DeveloperApi
    -  override def clone(): ExecutionListenerManager = writeLock {
    -    val newListenerManager = new ExecutionListenerManager
    -    listeners.foreach(newListenerManager.register)
    +  private[sql] def clone(session: SparkSession): ExecutionListenerManager 
= {
    +    val newListenerManager = new ExecutionListenerManager(session, 
loadExtensions = false)
    +    listeners.iterator().asScala.foreach(newListenerManager.register)
         newListenerManager
       }
     
    -  private[sql] def onSuccess(funcName: String, qe: QueryExecution, 
duration: Long): Unit = {
    -    readLock {
    -      withErrorHandling { listener =>
    -        listener.onSuccess(funcName, qe, duration)
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
    +    case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) =>
    +      val funcName = e.executionName.get
    +      e.executionFailure match {
    +        case Some(ex) =>
    +          listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe, 
ex))
    --- End diff --
    
    This is a bit of high level thought, you could consider making the calling 
event queue responsible for the dispatch of these events. That way you can 
leverage any improvement to the underlying event bus.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to