Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22674#discussion_r223885742
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala 
---
    @@ -75,95 +76,74 @@ trait QueryExecutionListener {
      */
     @Experimental
     @InterfaceStability.Evolving
    -class ExecutionListenerManager private extends Logging {
    -
    -  private[sql] def this(conf: SparkConf) = {
    -    this()
    +// The `session` is used to indicate which session carries this listener 
manager, and we only
    +// catch SQL executions which are launched by the same session.
    +// The `loadExtensions` flag is used to indicate whether we should load 
the pre-defined,
    +// user-specified listeners during construction. We should not do it when 
cloning this listener
    +// manager, as we will copy all listeners to the cloned listener manager.
    +class ExecutionListenerManager private[sql](session: SparkSession, 
loadExtensions: Boolean)
    +  extends SparkListener with Logging {
    +
    +  private[this] val listeners = new 
CopyOnWriteArrayList[QueryExecutionListener]
    +
    +  if (loadExtensions) {
    +    val conf = session.sparkContext.conf
         conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames =>
           Utils.loadExtensions(classOf[QueryExecutionListener], classNames, 
conf).foreach(register)
         }
       }
     
    +  session.sparkContext.listenerBus.addToSharedQueue(this)
    +
       /**
        * Registers the specified [[QueryExecutionListener]].
        */
       @DeveloperApi
    -  def register(listener: QueryExecutionListener): Unit = writeLock {
    -    listeners += listener
    +  def register(listener: QueryExecutionListener): Unit = {
    +    listeners.add(listener)
       }
     
       /**
        * Unregisters the specified [[QueryExecutionListener]].
        */
       @DeveloperApi
    -  def unregister(listener: QueryExecutionListener): Unit = writeLock {
    -    listeners -= listener
    +  def unregister(listener: QueryExecutionListener): Unit = {
    +    listeners.remove(listener)
       }
     
       /**
        * Removes all the registered [[QueryExecutionListener]].
        */
       @DeveloperApi
    -  def clear(): Unit = writeLock {
    +  def clear(): Unit = {
         listeners.clear()
       }
     
       /**
        * Get an identical copy of this listener manager.
        */
    -  @DeveloperApi
    -  override def clone(): ExecutionListenerManager = writeLock {
    -    val newListenerManager = new ExecutionListenerManager
    -    listeners.foreach(newListenerManager.register)
    +  private[sql] def clone(session: SparkSession): ExecutionListenerManager 
= {
    +    val newListenerManager = new ExecutionListenerManager(session, 
loadExtensions = false)
    +    listeners.iterator().asScala.foreach(newListenerManager.register)
         newListenerManager
       }
     
    -  private[sql] def onSuccess(funcName: String, qe: QueryExecution, 
duration: Long): Unit = {
    -    readLock {
    -      withErrorHandling { listener =>
    -        listener.onSuccess(funcName, qe, duration)
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
    +    case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) =>
    +      val funcName = e.executionName.get
    +      e.executionFailure match {
    +        case Some(ex) =>
    +          listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe, 
ex))
    +        case _ =>
    +          listeners.iterator().asScala.foreach(_.onSuccess(funcName, e.qe, 
e.duration))
           }
    -    }
    -  }
     
    -  private[sql] def onFailure(funcName: String, qe: QueryExecution, 
exception: Exception): Unit = {
    -    readLock {
    -      withErrorHandling { listener =>
    -        listener.onFailure(funcName, qe, exception)
    -      }
    -    }
    +    case _ => // Ignore
       }
     
    -  private[this] val listeners = ListBuffer.empty[QueryExecutionListener]
    -
    -  /** A lock to prevent updating the list of listeners while we are 
traversing through them. */
    -  private[this] val lock = new ReentrantReadWriteLock()
    -
    -  private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = 
{
    -    for (listener <- listeners) {
    -      try {
    -        f(listener)
    -      } catch {
    -        case NonFatal(e) => logWarning("Error executing query execution 
listener", e)
    -      }
    -    }
    -  }
    -
    -  /** Acquires a read lock on the cache for the duration of `f`. */
    -  private def readLock[A](f: => A): A = {
    -    val rl = lock.readLock()
    -    rl.lock()
    -    try f finally {
    -      rl.unlock()
    -    }
    -  }
    -
    -  /** Acquires a write lock on the cache for the duration of `f`. */
    -  private def writeLock[A](f: => A): A = {
    -    val wl = lock.writeLock()
    -    wl.lock()
    -    try f finally {
    -      wl.unlock()
    -    }
    +  private def shouldCatchEvent(e: SparkListenerSQLExecutionEnd): Boolean = 
{
    +    // Only catch SQL execution with a name, and triggered by the same 
spark session that this
    --- End diff --
    
    So this is what bugs me. You are adding separation between the SparkSession 
and its listeners, to undo that here. It seems like a bit of a hassle to go 
through because you basically need async execution.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to