WweiL commented on code in PR #42521:
URL: https://github.com/apache/spark/pull/42521#discussion_r1296451667


##########
python/pyspark/sql/tests/connect/streaming/test_parity_listener.py:
##########
@@ -19,38 +19,153 @@
 import time
 
 from pyspark.sql.tests.streaming.test_streaming_listener import 
StreamingListenerTestsMixin
-from pyspark.sql.streaming.listener import StreamingQueryListener, 
QueryStartedEvent
-from pyspark.sql.types import StructType, StructField, StringType
+from pyspark.sql.streaming.listener import (
+    StreamingQueryListener,
+    QueryStartedEvent,
+    QueryProgressEvent,
+    QueryIdleEvent,
+    QueryTerminatedEvent,
+)
+from pyspark.sql.types import (
+    ArrayType,
+    StructType,
+    StructField,
+    StringType,
+    IntegerType,
+    FloatType,
+    MapType,
+)
+from pyspark.sql.functions import count, lit
 from pyspark.testing.connectutils import ReusedConnectTestCase
 
 
 def get_start_event_schema():
     return StructType(
         [
-            StructField("id", StringType(), True),
-            StructField("runId", StringType(), True),
+            StructField("id", StringType(), False),
+            StructField("runId", StringType(), False),
             StructField("name", StringType(), True),
-            StructField("timestamp", StringType(), True),
+            StructField("timestamp", StringType(), False),
         ]
     )

Review Comment:
   In connect mode, this won't work anymore:
   
   ```
   event = None
   
   class MyListener(StreamingQueryListener):
       def onQueryStarted(self, e):
           global event
           event = e
      ....
   
   spark.streams.addListener(MyListener())
   q = spark.readStream.....start()
   q.awaitTermination()
   
   print(event) # Still None on client side, because the code is running on the 
server
   ```
   
   I assume in connect mode, it would be more common for people to write the 
listener events directly to tables. Because that's probably one of the only few 
ways to access these events now in spark connect.
   
   So I'm thinking of just move these methods to the `QueryxxxEvent`, maybe 
even create a method that called `asDataFrame`, to save user's effort to create 
this by themselves. For example, before:
   
   ```
   def get_start_event_schema():
       return StructType(
           [
               StructField("id", StringType(), False),
               StructField("runId", StringType(), False),
               StructField("name", StringType(), True),
               StructField("timestamp", StringType(), False),
           ]
       )
   
   class TestListener(StreamingQueryListener):
       def onQueryStarted(self, event):
           df = self.spark.createDataFrame(
               data=[(event.asDict())],
               schema=get_start_event_schema(),
           )
           df.write.saveAsTable("listener_start_events")
   ```
   Note that this looks simple because I wrote the `asDict` method, and the 
`get_start_event_schema` method for test. In production, users need to do this 
themselves. But these are really redundant, if we could add a helper method:
   
   For example:
   
   ```
   class TestListener(StreamingQueryListener):
       def onQueryStarted(self, event):
           df = self.spark.createDataFrame(
               data=[(event.asDict())],
               schema=event.schema(), # if we create an asDict method and a 
schema method for each event
           )
           df.write.saveAsTable("listener_start_events")
   
   ============= OR ===============
   
   class TestListener(StreamingQueryListener):
       def onQueryStarted(self, event):
           df = event.asDataFrame(self.spark) # or we can just create the df 
for the user
           df.write.saveAsTable("listener_start_events")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to