This is an automated email from the ASF dual-hosted git repository. jtorres pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3929d16 [SPARK-26046][SS] Add StreamingQueryManager.listListeners() 3929d16 is described below commit 3929d166043deb104dc3f3180ab43be54c50937d Author: Mukul Murthy <mukul.mur...@gmail.com> AuthorDate: Thu Sep 5 14:27:54 2019 -0700 [SPARK-26046][SS] Add StreamingQueryManager.listListeners() ### What changes were proposed in this pull request? Add a listListeners() method to StreamingQueryManager that lists all StreamingQueryListeners that have been added to that manager. ### Why are the changes needed? While it's best practice to keep handles on all listeners added, it's still nice to have an API to be able to list what listeners have been added to a StreamingQueryManager. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Modified existing unit tests to use the new API instead of using reflection. Closes #25518 from mukulmurthy/26046-listener. Authored-by: Mukul Murthy <mukul.mur...@gmail.com> Signed-off-by: Jose Torres <torres.joseph.f+git...@gmail.com> --- .../spark/sql/streaming/StreamingQueryManager.scala | 10 ++++++++++ .../spark/sql/streaming/StreamingQueryListenerSuite.scala | 15 ++++----------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index abee5f6..9765956 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -21,6 +21,7 @@ import java.util.UUID import java.util.concurrent.TimeUnit import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -199,6 +200,15 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo listenerBus.removeListener(listener) } + /** + * List all [[StreamingQueryListener]]s attached to this [[StreamingQueryManager]]. + * + * @since 3.0.0 + */ + def listListeners(): Array[StreamingQueryListener] = { + listenerBus.listeners.asScala.toArray + } + /** Post a listener event */ private[sql] def postListenerEvent(event: StreamingQueryListener.Event): Unit = { listenerBus.post(event) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 422223b..d964048 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -47,7 +47,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { after { spark.streams.active.foreach(_.stop()) assert(spark.streams.active.isEmpty) - assert(addedListeners().isEmpty) + assert(spark.streams.listListeners().isEmpty) // Make sure we don't leak any events to the next test spark.sparkContext.listenerBus.waitUntilEmpty(10000) } @@ -223,7 +223,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(isListenerActive(listener1) === false) assert(isListenerActive(listener2)) } finally { - addedListeners().foreach(spark.streams.removeListener) + spark.streams.listListeners().foreach(spark.streams.removeListener) } } @@ -362,10 +362,10 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(session1.streams.ne(session2.streams)) withListenerAdded(collector1, session1) { - assert(addedListeners(session1).nonEmpty) + assert(session1.streams.listListeners().nonEmpty) withListenerAdded(collector2, session2) { - assert(addedListeners(session2).nonEmpty) + assert(session2.streams.listListeners().nonEmpty) // query on session1 should send events only to collector1 runQuery(session1) @@ -440,13 +440,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } } - private def addedListeners(session: SparkSession = spark): Array[StreamingQueryListener] = { - val listenerBusMethod = - PrivateMethod[StreamingQueryListenerBus]('listenerBus) - val listenerBus = session.streams invokePrivate listenerBusMethod() - listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener]) - } - /** Collects events from the StreamingQueryListener for testing */ class EventCollector extends StreamingQueryListener { // to catch errors in the async listener events --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org