This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 75bc5ac9f2b0 [SPARK-45631][SS][PYSPARK] Remove @abstractmethod from onQueryIdle in PySpark StreamingQueryListener 75bc5ac9f2b0 is described below commit 75bc5ac9f2b07bc894091b8b15682ee906a19356 Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Mon Oct 23 21:12:01 2023 +0900 [SPARK-45631][SS][PYSPARK] Remove @abstractmethod from onQueryIdle in PySpark StreamingQueryListener ### What changes were proposed in this pull request? Credit to anish-db for the initial investigation and the fix. This PR proposes to remove `abstractmethod` annotation from `onQueryIdle` in PySpark StreamingQueryListener. The function `onQueryIdle` was added with the annotation `abstractmethod`, which does not pick up default implementation and enforces users to implement the new method. This breaks all existing streaming query listener implementations and enforces them to add the dummy function implementation at least. This PR re-allows existing implementations to work properly without explicitly adding a new function `onQueryIdle`. ### Why are the changes needed? We broke backward compatibility in [SPARK-43183](https://issues.apache.org/jira/browse/SPARK-43183) and we want to fix it. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Modified tests. Now tests are verifying two different implementations covering old interface vs new interface. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43483 from HeartSaVioR/SPARK-45631. Lead-authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Co-authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- python/pyspark/sql/streaming/listener.py | 4 +- .../sql/tests/streaming/test_streaming_listener.py | 117 ++++++++++++++------- 2 files changed, 82 insertions(+), 39 deletions(-) diff --git a/python/pyspark/sql/streaming/listener.py b/python/pyspark/sql/streaming/listener.py index 225ad6d45afb..c7c962578e2a 100644 --- a/python/pyspark/sql/streaming/listener.py +++ b/python/pyspark/sql/streaming/listener.py @@ -107,7 +107,9 @@ class StreamingQueryListener(ABC): """ pass - @abstractmethod + # NOTE: Do not mark this as abstract method, since we released this abstract class without + # this method in prior version and marking this as abstract method would break existing + # implementations. def onQueryIdle(self, event: "QueryIdleEvent") -> None: """ Called when the query is idle and waiting for new data to process. diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py b/python/pyspark/sql/tests/streaming/test_streaming_listener.py index cbbdc2955e59..b2200efb0e7b 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py +++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py @@ -251,7 +251,23 @@ class StreamingListenerTests(StreamingListenerTestsMixin, ReusedSQLTestCase): progress_event = None terminated_event = None - class TestListener(StreamingQueryListener): + # V1: Initial interface of StreamingQueryListener containing methods `onQueryStarted`, + # `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5. + class TestListenerV1(StreamingQueryListener): + def onQueryStarted(self, event): + nonlocal start_event + start_event = event + + def onQueryProgress(self, event): + nonlocal progress_event + progress_event = event + + def onQueryTerminated(self, event): + nonlocal terminated_event + terminated_event = event + + # V2: The interface after the method `onQueryIdle` is added. It is Spark 3.5+. + class TestListenerV2(StreamingQueryListener): def onQueryStarted(self, event): nonlocal start_event start_event = event @@ -267,48 +283,71 @@ class StreamingListenerTests(StreamingListenerTestsMixin, ReusedSQLTestCase): nonlocal terminated_event terminated_event = event - test_listener = TestListener() + def verify(test_listener): + nonlocal start_event + nonlocal progress_event + nonlocal terminated_event - try: - self.spark.streams.addListener(test_listener) + start_event = None + progress_event = None + terminated_event = None - df = self.spark.readStream.format("rate").option("rowsPerSecond", 10).load() + try: + self.spark.streams.addListener(test_listener) - # check successful stateful query - df_stateful = df.groupBy().count() # make query stateful - q = ( - df_stateful.writeStream.format("noop") - .queryName("test") - .outputMode("complete") - .start() - ) - self.assertTrue(q.isActive) - time.sleep(10) - q.stop() + df = self.spark.readStream.format("rate").option("rowsPerSecond", 10).load() - # Make sure all events are empty - self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty() + # check successful stateful query + df_stateful = df.groupBy().count() # make query stateful + q = ( + df_stateful.writeStream.format("noop") + .queryName("test") + .outputMode("complete") + .start() + ) + self.assertTrue(q.isActive) + time.sleep(10) + q.stop() - self.check_start_event(start_event) - self.check_progress_event(progress_event) - self.check_terminated_event(terminated_event) + # Make sure all events are empty + self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty() - # Check query terminated with exception - from pyspark.sql.functions import col, udf + self.check_start_event(start_event) + self.check_progress_event(progress_event) + self.check_terminated_event(terminated_event) - bad_udf = udf(lambda x: 1 / 0) - q = df.select(bad_udf(col("value"))).writeStream.format("noop").start() - time.sleep(5) - q.stop() - self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty() - self.check_terminated_event(terminated_event, "ZeroDivisionError") + # Check query terminated with exception + from pyspark.sql.functions import col, udf - finally: - self.spark.streams.removeListener(test_listener) + bad_udf = udf(lambda x: 1 / 0) + q = df.select(bad_udf(col("value"))).writeStream.format("noop").start() + time.sleep(5) + q.stop() + self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty() + self.check_terminated_event(terminated_event, "ZeroDivisionError") + + finally: + self.spark.streams.removeListener(test_listener) + + verify(TestListenerV1()) + verify(TestListenerV2()) def test_remove_listener(self): # SPARK-38804: Test StreamingQueryManager.removeListener - class TestListener(StreamingQueryListener): + # V1: Initial interface of StreamingQueryListener containing methods `onQueryStarted`, + # `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5. + class TestListenerV1(StreamingQueryListener): + def onQueryStarted(self, event): + pass + + def onQueryProgress(self, event): + pass + + def onQueryTerminated(self, event): + pass + + # V2: The interface after the method `onQueryIdle` is added. It is Spark 3.5+. + class TestListenerV2(StreamingQueryListener): def onQueryStarted(self, event): pass @@ -321,13 +360,15 @@ class StreamingListenerTests(StreamingListenerTestsMixin, ReusedSQLTestCase): def onQueryTerminated(self, event): pass - test_listener = TestListener() + def verify(test_listener): + num_listeners = len(self.spark.streams._jsqm.listListeners()) + self.spark.streams.addListener(test_listener) + self.assertEqual(num_listeners + 1, len(self.spark.streams._jsqm.listListeners())) + self.spark.streams.removeListener(test_listener) + self.assertEqual(num_listeners, len(self.spark.streams._jsqm.listListeners())) - num_listeners = len(self.spark.streams._jsqm.listListeners()) - self.spark.streams.addListener(test_listener) - self.assertEqual(num_listeners + 1, len(self.spark.streams._jsqm.listListeners())) - self.spark.streams.removeListener(test_listener) - self.assertEqual(num_listeners, len(self.spark.streams._jsqm.listListeners())) + verify(TestListenerV1()) + verify(TestListenerV2()) def test_query_started_event_fromJson(self): start_event = """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org