Thanks for the clarification. That makes sense.. In the code below, we can see
def onQueryProgress(self, event): print("onQueryProgress") # Access micro-batch data microbatch_data = event.progress #print("microbatch_data received") # Check if data is received #print(microbatch_data) print(f"Type of microbatch_data is {type(microbatch_data)}") #processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond") incorrect processedRowsPerSecond = microbatch_data.processedRowsPerSecond if processedRowsPerSecond is not None: # Check if value exists print("processedRowsPerSecond retrieved") print(f"Processed rows per second is -> {processedRowsPerSecond}") else: print("processedRowsPerSecond not retrieved!") The output onQueryProgress Type of microbatch_data is <class 'pyspark.sql.streaming.listener.StreamingQueryProgress'> processedRowsPerSecond retrieved Processed rows per second is -> 2.570694087403599 So we are dealing with the attribute of the class and NOT the dictionary. The line (processedRowsPerSecond = microbatch_data.get("processedRowsPerSecond")) fails because it uses the .get() method, while the second line (processedRowsPerSecond = microbatch_data.processedRowsPerSecond) accesses the attribute directly. In short, they need to ensure that that event.progress* returns a dictionary * Cheers Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 12 Mar 2024 at 04:04, 刘唯 <z920631...@gmail.com> wrote: > Oh I see why the confusion. > > microbatch_data = event.progress > > means that microbatch_data is a StreamingQueryProgress instance, it's not > a dictionary, so you should use ` microbatch_data.processedRowsPerSecond`, > instead of the `get` method which is used for dictionaries. > > But weirdly, for query.lastProgress and query.recentProgress, they should > return StreamingQueryProgress but instead they returned a dict. So the > `get` method works there. > > I think PySpark should improve on this part. > > Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月11日周一 05:51写道: > >> Hi, >> >> Thank you for your advice >> >> This is the amended code >> >> def onQueryProgress(self, event): >> print("onQueryProgress") >> # Access micro-batch data >> microbatch_data = event.progress >> #print("microbatch_data received") # Check if data is received >> #print(microbatch_data) >> #processed_rows_per_second = >> microbatch_data.get("processed_rows_per_second") >> processed_rows_per_second = >> microbatch_data.get("processedRowsPerSecond") >> print("CPC", processed_rows_per_second) >> if processed_rows_per_second is not None: # Check if value exists >> print("ocessed_rows_per_second retrieved") >> print(f"Processed rows per second: >> {processed_rows_per_second}") >> else: >> print("processed_rows_per_second not retrieved!") >> >> This is the output >> >> onQueryStarted >> 'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started! >> SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder". >> SLF4J: Defaulting to no-operation MDCAdapter implementation. >> SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for >> further details. >> ------------------------------------------- >> Batch: 0 >> ------------------------------------------- >> +---+-------------+-------+-------+ >> |key|doubled_value|op_type|op_time| >> +---+-------------+-------+-------+ >> +---+-------------+-------+-------+ >> >> onQueryProgress >> ------------------------------------------- >> Batch: 1 >> ------------------------------------------- >> +--------------------+-------------+-------+--------------------+ >> | key|doubled_value|op_type| op_time| >> +--------------------+-------------+-------+--------------------+ >> |a960f663-d13a-49c...| 0| 1|2024-03-11 12:17:...| >> +--------------------+-------------+-------+--------------------+ >> >> onQueryProgress >> ------------------------------------------- >> Batch: 2 >> ------------------------------------------- >> +--------------------+-------------+-------+--------------------+ >> | key|doubled_value|op_type| op_time| >> +--------------------+-------------+-------+--------------------+ >> |a960f663-d13a-49c...| 2| 1|2024-03-11 12:17:...| >> +--------------------+-------------+-------+--------------------+ >> >> I am afraid it is not working. Not even printing anything >> >> Cheers >> >> Mich Talebzadeh, >> Dad | Technologist | Solutions Architect | Engineer >> London >> United Kingdom >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* The information provided is correct to the best of my kno >> wledge but of course cannot be guaranteed . It is essential to note that, >> as with any advice, quote "one test result is worth one-thousand expert >> opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von >> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >> >> >> On Mon, 11 Mar 2024 at 05:07, 刘唯 <z920631...@gmail.com> wrote: >> >>> *now -> not >>> >>> 刘唯 <z920631...@gmail.com> 于2024年3月10日周日 22:04写道: >>> >>>> Have you tried using microbatch_data.get("processedRowsPerSecond")? >>>> Camel case now snake case >>>> >>>> Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月10日周日 11:46写道: >>>> >>>>> >>>>> There is a paper from Databricks on this subject >>>>> >>>>> >>>>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html >>>>> >>>>> But having tested it, there seems to be a bug there that I reported to >>>>> Databricks forum as well (in answer to a user question) >>>>> >>>>> I have come to a conclusion that this is a bug. In general there is a >>>>> bug in obtaining individual values from the dictionary. For example, a >>>>> bug in the way Spark Streaming is populating the processe >>>>> d_rows_per_second key within the microbatch_data -> microbatch_data = >>>>> event.progres dictionary or any other key. I have explored various deb >>>>> ugging steps, and even though the key seems to exist, the value might >>>>> not be getting set. Note that the dictionary itself prints the el >>>>> ements correctly. This is with regard to method onQueryProgress(self, >>>>> event) in class MyListener(StreamingQueryListener): >>>>> >>>>> For example with print(microbatch_data), you get all printed as below >>>>> >>>>> onQueryProgress >>>>> microbatch_data received >>>>> { >>>>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66", >>>>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967", >>>>> "name" : null, >>>>> "timestamp" : "2024-03-10T09:21:27.233Z", >>>>> "batchId" : 21, >>>>> "numInputRows" : 1, >>>>> "inputRowsPerSecond" : 100.0, >>>>> "processedRowsPerSecond" : 5.347593582887701, >>>>> "durationMs" : { >>>>> "addBatch" : 37, >>>>> "commitOffsets" : 41, >>>>> "getBatch" : 0, >>>>> "latestOffset" : 0, >>>>> "queryPlanning" : 5, >>>>> "triggerExecution" : 187, >>>>> "walCommit" : 104 >>>>> }, >>>>> "stateOperators" : [ ], >>>>> "sources" : [ { >>>>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, >>>>> numPartitions=default", >>>>> "startOffset" : 20, >>>>> "endOffset" : 21, >>>>> "latestOffset" : 21, >>>>> "numInputRows" : 1, >>>>> "inputRowsPerSecond" : 100.0, >>>>> "processedRowsPerSecond" : 5.347593582887701 >>>>> } ], >>>>> "sink" : { >>>>> "description" : "org.apache.spark.sql.execution.streaming.ConsoleT >>>>> able$@430a977c", >>>>> "numOutputRows" : 1 >>>>> } >>>>> } >>>>> However, the observed behaviour (i.e. processed_rows_per_second is ei >>>>> ther None or not being updated correctly). >>>>> >>>>> The spark version I used for my test is 3.4 >>>>> >>>>> Sample code uses format=rate for simulating a streaming process. You c >>>>> an test the code yourself, all in one >>>>> from pyspark.sql import SparkSession >>>>> from pyspark.sql.functions import col >>>>> from pyspark.sql.streaming import DataStreamWriter, >>>>> StreamingQueryListener >>>>> from pyspark.sql.functions import col, round, current_timestamp, lit >>>>> import uuid >>>>> >>>>> def process_data(df): >>>>> >>>>> processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\ >>>>> withColumn("doubled_value", col("value") * 2). \ >>>>> withColumn("op_type", lit(1)). \ >>>>> withColumn("op_time", current_timestamp()) >>>>> >>>>> return processed_df >>>>> >>>>> # Create a Spark session >>>>> appName = "testListener" >>>>> spark = SparkSession.builder.appName(appName).getOrCreate() >>>>> >>>>> # Define the schema for the streaming data >>>>> schema = "key string timestamp timestamp, value long" >>>>> >>>>> # Define my listener. >>>>> class MyListener(StreamingQueryListener): >>>>> def onQueryStarted(self, event): >>>>> print("onQueryStarted") >>>>> print(f"'{event.name}' [{event.id}] got started!") >>>>> def onQueryProgress(self, event): >>>>> print("onQueryProgress") >>>>> # Access micro-batch data >>>>> microbatch_data = event.progress >>>>> print("microbatch_data received") # Check if data is received >>>>> print(microbatch_data) >>>>> processed_rows_per_second = >>>>> microbatch_data.get("processed_rows_per_second") >>>>> if processed_rows_per_second is not None: # Check if value >>>>> exists >>>>> print("processed_rows_per_second retrieved") >>>>> print(f"Processed rows per second: >>>>> {processed_rows_per_second}") >>>>> else: >>>>> print("processed_rows_per_second not retrieved!") >>>>> def onQueryTerminated(self, event): >>>>> print("onQueryTerminated") >>>>> if event.exception: >>>>> print(f"Query terminated with exception: >>>>> {event.exception}") >>>>> else: >>>>> print("Query successfully terminated.") >>>>> # Add my listener. >>>>> >>>>> listener_instance = MyListener() >>>>> spark.streams.addListener(listener_instance) >>>>> >>>>> >>>>> # Create a streaming DataFrame with the rate source >>>>> streaming_df = ( >>>>> spark.readStream >>>>> .format("rate") >>>>> .option("rowsPerSecond", 1) >>>>> .load() >>>>> ) >>>>> >>>>> # Apply processing function to the streaming DataFrame >>>>> processed_streaming_df = process_data(streaming_df) >>>>> >>>>> # Define the output sink (for example, console sink) >>>>> query = ( >>>>> processed_streaming_df.select( \ >>>>> col("key").alias("key") \ >>>>> , col("doubled_value").alias("doubled_value") \ >>>>> , col("op_type").alias("op_type") \ >>>>> , col("op_time").alias("op_time")). \ >>>>> writeStream.\ >>>>> outputMode("append").\ >>>>> format("console"). \ >>>>> start() >>>>> ) >>>>> >>>>> # Wait for the streaming query to terminate >>>>> query.awaitTermination() >>>>> >>>>> HTH >>>>> >>>>> Mich Talebzadeh, >>>>> Dad | Technologist | Solutions Architect | Engineer >>>>> London >>>>> United Kingdom >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>> >>>>> >>>>> >>>>> *Disclaimer:* The information provided is correct to the best of my >>>>> knowledge but of course cannot be guaranteed . It is essential to note >>>>> that, as with any advice, quote "one test result is worth one-thousand >>>>> expert opinions (Werner >>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>>>> >>>>