This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b18b6cca28ef [SPARK-46716][SS][TESTS] Add a test regarding to backward 
compatibility check for Scala StreamingQueryListener
b18b6cca28ef is described below

commit b18b6cca28ef7ed1199ae591d00e945ae4ae611a
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Mon Jan 15 15:32:32 2024 +0900

    [SPARK-46716][SS][TESTS] Add a test regarding to backward compatibility 
check for Scala StreamingQueryListener
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to add a functionality to perform backward compatibility 
check for StreamingQueryListener in Scala, specifically implementing 
`onQueryIdle` or not.
    
    ### Why are the changes needed?
    
    We missed to add backward compatibility test when introducing onQueryIdle, 
and it led to an issue in PySpark 
(https://issues.apache.org/jira/browse/SPARK-45631). We added the compatibility 
test in PySpark but didn't add it in Scala.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Modified UT.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44730 from HeartSaVioR/SPARK-46716.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../streaming/StreamingQueryListenerSuite.scala    | 63 ++++++++++++++++------
 1 file changed, 48 insertions(+), 15 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 2504cd17acfc..d9ce8002d285 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -57,10 +57,14 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   }
 
   testQuietly("single listener, check trigger events are generated correctly") 
{
+    testSingleListenerBasic(new EventCollectorV1)
+    testSingleListenerBasic(new EventCollectorV2)
+  }
+
+  private def testSingleListenerBasic(listener: EventCollector): Unit = {
     val clock = new StreamManualClock
     val inputData = new MemoryStream[Int](0, sqlContext)
     val df = inputData.toDS().as[Long].map { 10 / _ }
-    val listener = new EventCollector
 
     case class AssertStreamExecThreadToWaitForClock()
       extends AssertOnQuery(q => {
@@ -155,7 +159,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
 
   test("SPARK-19594: all of listeners should receive QueryTerminatedEvent") {
     val df = MemoryStream[Int].toDS().as[Long]
-    val listeners = (1 to 5).map(_ => new EventCollector)
+    val listeners = (1 to 5).map(_ => new EventCollectorV2)
     try {
       listeners.foreach(listener => spark.streams.addListener(listener))
       testStream(df, OutputMode.Append)(
@@ -182,7 +186,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
 
   test("continuous processing listeners should receive QueryTerminatedEvent") {
     val df = spark.readStream.format("rate").load()
-    val listeners = (1 to 5).map(_ => new EventCollector)
+    val listeners = (1 to 5).map(_ => new EventCollectorV2)
     try {
       listeners.foreach(listener => spark.streams.addListener(listener))
       testStream(df, OutputMode.Append)(
@@ -218,8 +222,8 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     }
 
     try {
-      val listener1 = new EventCollector
-      val listener2 = new EventCollector
+      val listener1 = new EventCollectorV1
+      val listener2 = new EventCollectorV2
 
       spark.streams.addListener(listener1)
       assert(isListenerActive(listener1))
@@ -236,7 +240,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   }
 
   test("event ordering") {
-    val listener = new EventCollector
+    val listener = new EventCollectorV2
     withListenerAdded(listener) {
       for (i <- 1 to 50) {
         listener.reset()
@@ -348,8 +352,8 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   test("listener only posts events from queries started in the related 
sessions") {
     val session1 = spark.newSession()
     val session2 = spark.newSession()
-    val collector1 = new EventCollector
-    val collector2 = new EventCollector
+    val collector1 = new EventCollectorV2
+    val collector2 = new EventCollectorV2
 
     def runQuery(session: SparkSession): Unit = {
       collector1.reset()
@@ -434,7 +438,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
       .observe(
         name = "other_event",
         avg($"value").cast("int").as("avg_val"))
-    val listener = new EventCollector
+    val listener = new EventCollectorV2
     def checkMetrics(f: java.util.Map[String, Row] => Unit): StreamAction = {
       AssertOnQuery { _ =>
         eventually(Timeout(streamingTimeout)) {
@@ -576,7 +580,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   }
 
   /** Collects events from the StreamingQueryListener for testing */
-  class EventCollector extends StreamingQueryListener {
+  abstract class EventCollector extends StreamingQueryListener {
     // to catch errors in the async listener events
     @volatile private var asyncTestWaiter = new Waiter
 
@@ -606,27 +610,29 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
       asyncTestWaiter.await(timeout(streamingTimeout))
     }
 
-    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+    protected def handleOnQueryStarted(queryStarted: QueryStartedEvent): Unit 
= {
       asyncTestWaiter {
         startEvent = queryStarted
       }
     }
 
-    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
+    protected def handleOnQueryProgress(queryProgress: QueryProgressEvent): 
Unit = {
       asyncTestWaiter {
         assert(startEvent != null, "onQueryProgress called before 
onQueryStarted")
-        _progressEvents.synchronized { _progressEvents += 
queryProgress.progress }
+        _progressEvents.synchronized {
+          _progressEvents += queryProgress.progress
+        }
       }
     }
 
-    override def onQueryIdle(queryIdle: QueryIdleEvent): Unit = {
+    protected def handleOnQueryIdle(queryIdle: QueryIdleEvent): Unit = {
       asyncTestWaiter {
         assert(startEvent != null, "onQueryIdle called before onQueryStarted")
         idleEvent = queryIdle
       }
     }
 
-    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): 
Unit = {
+    protected def handleOnQueryTerminated(queryTerminated: 
QueryTerminatedEvent): Unit = {
       asyncTestWaiter {
         assert(startEvent != null, "onQueryTerminated called before 
onQueryStarted")
         terminationEvent = queryTerminated
@@ -634,4 +640,31 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
       asyncTestWaiter.dismiss()
     }
   }
+
+  /**
+   * V1: Initial interface of StreamingQueryListener containing methods 
`onQueryStarted`,
+   * `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5.
+   */
+  class EventCollectorV1 extends EventCollector {
+    override def onQueryStarted(event: QueryStartedEvent): Unit = 
handleOnQueryStarted(event)
+
+    override def onQueryProgress(event: QueryProgressEvent): Unit = 
handleOnQueryProgress(event)
+
+    override def onQueryTerminated(event: QueryTerminatedEvent): Unit =
+      handleOnQueryTerminated(event)
+  }
+
+  /**
+   * V2: The interface after the method `onQueryIdle` is added. It is Spark 
3.5+.
+   */
+  class EventCollectorV2 extends EventCollector {
+    override def onQueryStarted(event: QueryStartedEvent): Unit = 
handleOnQueryStarted(event)
+
+    override def onQueryProgress(event: QueryProgressEvent): Unit = 
handleOnQueryProgress(event)
+
+    override def onQueryIdle(event: QueryIdleEvent): Unit = 
handleOnQueryIdle(event)
+
+    override def onQueryTerminated(event: QueryTerminatedEvent): Unit =
+      handleOnQueryTerminated(event)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to