WweiL commented on code in PR #42521: URL: https://github.com/apache/spark/pull/42521#discussion_r1296451667
########## python/pyspark/sql/tests/connect/streaming/test_parity_listener.py: ########## @@ -19,38 +19,153 @@ import time from pyspark.sql.tests.streaming.test_streaming_listener import StreamingListenerTestsMixin -from pyspark.sql.streaming.listener import StreamingQueryListener, QueryStartedEvent -from pyspark.sql.types import StructType, StructField, StringType +from pyspark.sql.streaming.listener import ( + StreamingQueryListener, + QueryStartedEvent, + QueryProgressEvent, + QueryIdleEvent, + QueryTerminatedEvent, +) +from pyspark.sql.types import ( + ArrayType, + StructType, + StructField, + StringType, + IntegerType, + FloatType, + MapType, +) +from pyspark.sql.functions import count, lit from pyspark.testing.connectutils import ReusedConnectTestCase def get_start_event_schema(): return StructType( [ - StructField("id", StringType(), True), - StructField("runId", StringType(), True), + StructField("id", StringType(), False), + StructField("runId", StringType(), False), StructField("name", StringType(), True), - StructField("timestamp", StringType(), True), + StructField("timestamp", StringType(), False), ] ) Review Comment: I'm thinking of just move these methods to the `QueryxxxEvent`, maybe even create a method that called `asDataFrame`, to save user's effort to create this by themselves. For example, before: ``` def get_start_event_schema(): return StructType( [ StructField("id", StringType(), False), StructField("runId", StringType(), False), StructField("name", StringType(), True), StructField("timestamp", StringType(), False), ] ) class TestListener(StreamingQueryListener): def onQueryStarted(self, event): df = self.spark.createDataFrame( data=[(event.asDict())], schema=get_start_event_schema(), ) df.write.saveAsTable("listener_start_events") ``` Note that this looks simple because I wrote the `asDict` method, and the `get_start_event_schema` method for test. In production, users need to do this themselves. But these are really redundant, if we manage these: For example: ``` class TestListener(StreamingQueryListener): def onQueryStarted(self, event): df = self.spark.createDataFrame( data=[(event.asDict())], schema=event.schema(), ) df.write.saveAsTable("listener_start_events") ============= OR =============== class TestListener(StreamingQueryListener): def onQueryStarted(self, event): df = event.asDataFrame(self.spark) df.write.saveAsTable("listener_start_events") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org