Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22674#discussion_r223885742 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala --- @@ -75,95 +76,74 @@ trait QueryExecutionListener { */ @Experimental @InterfaceStability.Evolving -class ExecutionListenerManager private extends Logging { - - private[sql] def this(conf: SparkConf) = { - this() +// The `session` is used to indicate which session carries this listener manager, and we only +// catch SQL executions which are launched by the same session. +// The `loadExtensions` flag is used to indicate whether we should load the pre-defined, +// user-specified listeners during construction. We should not do it when cloning this listener +// manager, as we will copy all listeners to the cloned listener manager. +class ExecutionListenerManager private[sql](session: SparkSession, loadExtensions: Boolean) + extends SparkListener with Logging { + + private[this] val listeners = new CopyOnWriteArrayList[QueryExecutionListener] + + if (loadExtensions) { + val conf = session.sparkContext.conf conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames => Utils.loadExtensions(classOf[QueryExecutionListener], classNames, conf).foreach(register) } } + session.sparkContext.listenerBus.addToSharedQueue(this) + /** * Registers the specified [[QueryExecutionListener]]. */ @DeveloperApi - def register(listener: QueryExecutionListener): Unit = writeLock { - listeners += listener + def register(listener: QueryExecutionListener): Unit = { + listeners.add(listener) } /** * Unregisters the specified [[QueryExecutionListener]]. */ @DeveloperApi - def unregister(listener: QueryExecutionListener): Unit = writeLock { - listeners -= listener + def unregister(listener: QueryExecutionListener): Unit = { + listeners.remove(listener) } /** * Removes all the registered [[QueryExecutionListener]]. */ @DeveloperApi - def clear(): Unit = writeLock { + def clear(): Unit = { listeners.clear() } /** * Get an identical copy of this listener manager. */ - @DeveloperApi - override def clone(): ExecutionListenerManager = writeLock { - val newListenerManager = new ExecutionListenerManager - listeners.foreach(newListenerManager.register) + private[sql] def clone(session: SparkSession): ExecutionListenerManager = { + val newListenerManager = new ExecutionListenerManager(session, loadExtensions = false) + listeners.iterator().asScala.foreach(newListenerManager.register) newListenerManager } - private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { - readLock { - withErrorHandling { listener => - listener.onSuccess(funcName, qe, duration) + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) => + val funcName = e.executionName.get + e.executionFailure match { + case Some(ex) => + listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe, ex)) + case _ => + listeners.iterator().asScala.foreach(_.onSuccess(funcName, e.qe, e.duration)) } - } - } - private[sql] def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { - readLock { - withErrorHandling { listener => - listener.onFailure(funcName, qe, exception) - } - } + case _ => // Ignore } - private[this] val listeners = ListBuffer.empty[QueryExecutionListener] - - /** A lock to prevent updating the list of listeners while we are traversing through them. */ - private[this] val lock = new ReentrantReadWriteLock() - - private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = { - for (listener <- listeners) { - try { - f(listener) - } catch { - case NonFatal(e) => logWarning("Error executing query execution listener", e) - } - } - } - - /** Acquires a read lock on the cache for the duration of `f`. */ - private def readLock[A](f: => A): A = { - val rl = lock.readLock() - rl.lock() - try f finally { - rl.unlock() - } - } - - /** Acquires a write lock on the cache for the duration of `f`. */ - private def writeLock[A](f: => A): A = { - val wl = lock.writeLock() - wl.lock() - try f finally { - wl.unlock() - } + private def shouldCatchEvent(e: SparkListenerSQLExecutionEnd): Boolean = { + // Only catch SQL execution with a name, and triggered by the same spark session that this --- End diff -- So this is what bugs me. You are adding separation between the SparkSession and its listeners, to undo that here. It seems like a bit of a hassle to go through because you basically need async execution.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org