This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 75bc5ac9f2b0 [SPARK-45631][SS][PYSPARK] Remove @abstractmethod from 
onQueryIdle in PySpark StreamingQueryListener
75bc5ac9f2b0 is described below

commit 75bc5ac9f2b07bc894091b8b15682ee906a19356
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Mon Oct 23 21:12:01 2023 +0900

    [SPARK-45631][SS][PYSPARK] Remove @abstractmethod from onQueryIdle in 
PySpark StreamingQueryListener
    
    ### What changes were proposed in this pull request?
    
    Credit to anish-db for the initial investigation and the fix.
    
    This PR proposes to remove `abstractmethod` annotation from `onQueryIdle` 
in PySpark StreamingQueryListener.
    
    The function `onQueryIdle` was added with the annotation `abstractmethod`, 
which does not pick up default implementation and enforces users to implement 
the new method. This breaks all existing streaming query listener 
implementations and enforces them to add the dummy function implementation at 
least.
    
    This PR re-allows existing implementations to work properly without 
explicitly adding a new function `onQueryIdle`.
    
    ### Why are the changes needed?
    
    We broke backward compatibility in 
[SPARK-43183](https://issues.apache.org/jira/browse/SPARK-43183) and we want to 
fix it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Modified tests. Now tests are verifying two different implementations 
covering old interface vs new interface.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43483 from HeartSaVioR/SPARK-45631.
    
    Lead-authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Co-authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 python/pyspark/sql/streaming/listener.py           |   4 +-
 .../sql/tests/streaming/test_streaming_listener.py | 117 ++++++++++++++-------
 2 files changed, 82 insertions(+), 39 deletions(-)

diff --git a/python/pyspark/sql/streaming/listener.py 
b/python/pyspark/sql/streaming/listener.py
index 225ad6d45afb..c7c962578e2a 100644
--- a/python/pyspark/sql/streaming/listener.py
+++ b/python/pyspark/sql/streaming/listener.py
@@ -107,7 +107,9 @@ class StreamingQueryListener(ABC):
         """
         pass
 
-    @abstractmethod
+    # NOTE: Do not mark this as abstract method, since we released this 
abstract class without
+    # this method in prior version and marking this as abstract method would 
break existing
+    # implementations.
     def onQueryIdle(self, event: "QueryIdleEvent") -> None:
         """
         Called when the query is idle and waiting for new data to process.
diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py 
b/python/pyspark/sql/tests/streaming/test_streaming_listener.py
index cbbdc2955e59..b2200efb0e7b 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py
@@ -251,7 +251,23 @@ class StreamingListenerTests(StreamingListenerTestsMixin, 
ReusedSQLTestCase):
         progress_event = None
         terminated_event = None
 
-        class TestListener(StreamingQueryListener):
+        # V1: Initial interface of StreamingQueryListener containing methods 
`onQueryStarted`,
+        # `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5.
+        class TestListenerV1(StreamingQueryListener):
+            def onQueryStarted(self, event):
+                nonlocal start_event
+                start_event = event
+
+            def onQueryProgress(self, event):
+                nonlocal progress_event
+                progress_event = event
+
+            def onQueryTerminated(self, event):
+                nonlocal terminated_event
+                terminated_event = event
+
+        # V2: The interface after the method `onQueryIdle` is added. It is 
Spark 3.5+.
+        class TestListenerV2(StreamingQueryListener):
             def onQueryStarted(self, event):
                 nonlocal start_event
                 start_event = event
@@ -267,48 +283,71 @@ class StreamingListenerTests(StreamingListenerTestsMixin, 
ReusedSQLTestCase):
                 nonlocal terminated_event
                 terminated_event = event
 
-        test_listener = TestListener()
+        def verify(test_listener):
+            nonlocal start_event
+            nonlocal progress_event
+            nonlocal terminated_event
 
-        try:
-            self.spark.streams.addListener(test_listener)
+            start_event = None
+            progress_event = None
+            terminated_event = None
 
-            df = self.spark.readStream.format("rate").option("rowsPerSecond", 
10).load()
+            try:
+                self.spark.streams.addListener(test_listener)
 
-            # check successful stateful query
-            df_stateful = df.groupBy().count()  # make query stateful
-            q = (
-                df_stateful.writeStream.format("noop")
-                .queryName("test")
-                .outputMode("complete")
-                .start()
-            )
-            self.assertTrue(q.isActive)
-            time.sleep(10)
-            q.stop()
+                df = 
self.spark.readStream.format("rate").option("rowsPerSecond", 10).load()
 
-            # Make sure all events are empty
-            self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty()
+                # check successful stateful query
+                df_stateful = df.groupBy().count()  # make query stateful
+                q = (
+                    df_stateful.writeStream.format("noop")
+                    .queryName("test")
+                    .outputMode("complete")
+                    .start()
+                )
+                self.assertTrue(q.isActive)
+                time.sleep(10)
+                q.stop()
 
-            self.check_start_event(start_event)
-            self.check_progress_event(progress_event)
-            self.check_terminated_event(terminated_event)
+                # Make sure all events are empty
+                
self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty()
 
-            # Check query terminated with exception
-            from pyspark.sql.functions import col, udf
+                self.check_start_event(start_event)
+                self.check_progress_event(progress_event)
+                self.check_terminated_event(terminated_event)
 
-            bad_udf = udf(lambda x: 1 / 0)
-            q = 
df.select(bad_udf(col("value"))).writeStream.format("noop").start()
-            time.sleep(5)
-            q.stop()
-            self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty()
-            self.check_terminated_event(terminated_event, "ZeroDivisionError")
+                # Check query terminated with exception
+                from pyspark.sql.functions import col, udf
 
-        finally:
-            self.spark.streams.removeListener(test_listener)
+                bad_udf = udf(lambda x: 1 / 0)
+                q = 
df.select(bad_udf(col("value"))).writeStream.format("noop").start()
+                time.sleep(5)
+                q.stop()
+                
self.spark.sparkContext._jsc.sc().listenerBus().waitUntilEmpty()
+                self.check_terminated_event(terminated_event, 
"ZeroDivisionError")
+
+            finally:
+                self.spark.streams.removeListener(test_listener)
+
+        verify(TestListenerV1())
+        verify(TestListenerV2())
 
     def test_remove_listener(self):
         # SPARK-38804: Test StreamingQueryManager.removeListener
-        class TestListener(StreamingQueryListener):
+        # V1: Initial interface of StreamingQueryListener containing methods 
`onQueryStarted`,
+        # `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5.
+        class TestListenerV1(StreamingQueryListener):
+            def onQueryStarted(self, event):
+                pass
+
+            def onQueryProgress(self, event):
+                pass
+
+            def onQueryTerminated(self, event):
+                pass
+
+        # V2: The interface after the method `onQueryIdle` is added. It is 
Spark 3.5+.
+        class TestListenerV2(StreamingQueryListener):
             def onQueryStarted(self, event):
                 pass
 
@@ -321,13 +360,15 @@ class StreamingListenerTests(StreamingListenerTestsMixin, 
ReusedSQLTestCase):
             def onQueryTerminated(self, event):
                 pass
 
-        test_listener = TestListener()
+        def verify(test_listener):
+            num_listeners = len(self.spark.streams._jsqm.listListeners())
+            self.spark.streams.addListener(test_listener)
+            self.assertEqual(num_listeners + 1, 
len(self.spark.streams._jsqm.listListeners()))
+            self.spark.streams.removeListener(test_listener)
+            self.assertEqual(num_listeners, 
len(self.spark.streams._jsqm.listListeners()))
 
-        num_listeners = len(self.spark.streams._jsqm.listListeners())
-        self.spark.streams.addListener(test_listener)
-        self.assertEqual(num_listeners + 1, 
len(self.spark.streams._jsqm.listListeners()))
-        self.spark.streams.removeListener(test_listener)
-        self.assertEqual(num_listeners, 
len(self.spark.streams._jsqm.listListeners()))
+        verify(TestListenerV1())
+        verify(TestListenerV2())
 
     def test_query_started_event_fromJson(self):
         start_event = """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to