This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b18b6cca28ef [SPARK-46716][SS][TESTS] Add a test regarding to backward compatibility check for Scala StreamingQueryListener b18b6cca28ef is described below commit b18b6cca28ef7ed1199ae591d00e945ae4ae611a Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Mon Jan 15 15:32:32 2024 +0900 [SPARK-46716][SS][TESTS] Add a test regarding to backward compatibility check for Scala StreamingQueryListener ### What changes were proposed in this pull request? This PR proposes to add a functionality to perform backward compatibility check for StreamingQueryListener in Scala, specifically implementing `onQueryIdle` or not. ### Why are the changes needed? We missed to add backward compatibility test when introducing onQueryIdle, and it led to an issue in PySpark (https://issues.apache.org/jira/browse/SPARK-45631). We added the compatibility test in PySpark but didn't add it in Scala. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Modified UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44730 from HeartSaVioR/SPARK-46716. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../streaming/StreamingQueryListenerSuite.scala | 63 ++++++++++++++++------ 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 2504cd17acfc..d9ce8002d285 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -57,10 +57,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } testQuietly("single listener, check trigger events are generated correctly") { + testSingleListenerBasic(new EventCollectorV1) + testSingleListenerBasic(new EventCollectorV2) + } + + private def testSingleListenerBasic(listener: EventCollector): Unit = { val clock = new StreamManualClock val inputData = new MemoryStream[Int](0, sqlContext) val df = inputData.toDS().as[Long].map { 10 / _ } - val listener = new EventCollector case class AssertStreamExecThreadToWaitForClock() extends AssertOnQuery(q => { @@ -155,7 +159,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("SPARK-19594: all of listeners should receive QueryTerminatedEvent") { val df = MemoryStream[Int].toDS().as[Long] - val listeners = (1 to 5).map(_ => new EventCollector) + val listeners = (1 to 5).map(_ => new EventCollectorV2) try { listeners.foreach(listener => spark.streams.addListener(listener)) testStream(df, OutputMode.Append)( @@ -182,7 +186,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("continuous processing listeners should receive QueryTerminatedEvent") { val df = spark.readStream.format("rate").load() - val listeners = (1 to 5).map(_ => new EventCollector) + val listeners = (1 to 5).map(_ => new EventCollectorV2) try { listeners.foreach(listener => spark.streams.addListener(listener)) testStream(df, OutputMode.Append)( @@ -218,8 +222,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } try { - val listener1 = new EventCollector - val listener2 = new EventCollector + val listener1 = new EventCollectorV1 + val listener2 = new EventCollectorV2 spark.streams.addListener(listener1) assert(isListenerActive(listener1)) @@ -236,7 +240,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } test("event ordering") { - val listener = new EventCollector + val listener = new EventCollectorV2 withListenerAdded(listener) { for (i <- 1 to 50) { listener.reset() @@ -348,8 +352,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("listener only posts events from queries started in the related sessions") { val session1 = spark.newSession() val session2 = spark.newSession() - val collector1 = new EventCollector - val collector2 = new EventCollector + val collector1 = new EventCollectorV2 + val collector2 = new EventCollectorV2 def runQuery(session: SparkSession): Unit = { collector1.reset() @@ -434,7 +438,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { .observe( name = "other_event", avg($"value").cast("int").as("avg_val")) - val listener = new EventCollector + val listener = new EventCollectorV2 def checkMetrics(f: java.util.Map[String, Row] => Unit): StreamAction = { AssertOnQuery { _ => eventually(Timeout(streamingTimeout)) { @@ -576,7 +580,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } /** Collects events from the StreamingQueryListener for testing */ - class EventCollector extends StreamingQueryListener { + abstract class EventCollector extends StreamingQueryListener { // to catch errors in the async listener events @volatile private var asyncTestWaiter = new Waiter @@ -606,27 +610,29 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { asyncTestWaiter.await(timeout(streamingTimeout)) } - override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { + protected def handleOnQueryStarted(queryStarted: QueryStartedEvent): Unit = { asyncTestWaiter { startEvent = queryStarted } } - override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { + protected def handleOnQueryProgress(queryProgress: QueryProgressEvent): Unit = { asyncTestWaiter { assert(startEvent != null, "onQueryProgress called before onQueryStarted") - _progressEvents.synchronized { _progressEvents += queryProgress.progress } + _progressEvents.synchronized { + _progressEvents += queryProgress.progress + } } } - override def onQueryIdle(queryIdle: QueryIdleEvent): Unit = { + protected def handleOnQueryIdle(queryIdle: QueryIdleEvent): Unit = { asyncTestWaiter { assert(startEvent != null, "onQueryIdle called before onQueryStarted") idleEvent = queryIdle } } - override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { + protected def handleOnQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { asyncTestWaiter { assert(startEvent != null, "onQueryTerminated called before onQueryStarted") terminationEvent = queryTerminated @@ -634,4 +640,31 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { asyncTestWaiter.dismiss() } } + + /** + * V1: Initial interface of StreamingQueryListener containing methods `onQueryStarted`, + * `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5. + */ + class EventCollectorV1 extends EventCollector { + override def onQueryStarted(event: QueryStartedEvent): Unit = handleOnQueryStarted(event) + + override def onQueryProgress(event: QueryProgressEvent): Unit = handleOnQueryProgress(event) + + override def onQueryTerminated(event: QueryTerminatedEvent): Unit = + handleOnQueryTerminated(event) + } + + /** + * V2: The interface after the method `onQueryIdle` is added. It is Spark 3.5+. + */ + class EventCollectorV2 extends EventCollector { + override def onQueryStarted(event: QueryStartedEvent): Unit = handleOnQueryStarted(event) + + override def onQueryProgress(event: QueryProgressEvent): Unit = handleOnQueryProgress(event) + + override def onQueryIdle(event: QueryIdleEvent): Unit = handleOnQueryIdle(event) + + override def onQueryTerminated(event: QueryTerminatedEvent): Unit = + handleOnQueryTerminated(event) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org