Oh I see why the confusion. microbatch_data = event.progress
means that microbatch_data is a StreamingQueryProgress instance, it's not a dictionary, so you should use ` microbatch_data.processedRowsPerSecond`, instead of the `get` method which is used for dictionaries. But weirdly, for query.lastProgress and query.recentProgress, they should return StreamingQueryProgress but instead they returned a dict. So the `get` method works there. I think PySpark should improve on this part. Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月11日周一 05:51写道: > Hi, > > Thank you for your advice > > This is the amended code > > def onQueryProgress(self, event): > print("onQueryProgress") > # Access micro-batch data > microbatch_data = event.progress > #print("microbatch_data received") # Check if data is received > #print(microbatch_data) > #processed_rows_per_second = > microbatch_data.get("processed_rows_per_second") > processed_rows_per_second = > microbatch_data.get("processedRowsPerSecond") > print("CPC", processed_rows_per_second) > if processed_rows_per_second is not None: # Check if value exists > print("ocessed_rows_per_second retrieved") > print(f"Processed rows per second: {processed_rows_per_second}") > else: > print("processed_rows_per_second not retrieved!") > > This is the output > > onQueryStarted > 'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started! > SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder". > SLF4J: Defaulting to no-operation MDCAdapter implementation. > SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for > further details. > ------------------------------------------- > Batch: 0 > ------------------------------------------- > +---+-------------+-------+-------+ > |key|doubled_value|op_type|op_time| > +---+-------------+-------+-------+ > +---+-------------+-------+-------+ > > onQueryProgress > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +--------------------+-------------+-------+--------------------+ > | key|doubled_value|op_type| op_time| > +--------------------+-------------+-------+--------------------+ > |a960f663-d13a-49c...| 0| 1|2024-03-11 12:17:...| > +--------------------+-------------+-------+--------------------+ > > onQueryProgress > ------------------------------------------- > Batch: 2 > ------------------------------------------- > +--------------------+-------------+-------+--------------------+ > | key|doubled_value|op_type| op_time| > +--------------------+-------------+-------+--------------------+ > |a960f663-d13a-49c...| 2| 1|2024-03-11 12:17:...| > +--------------------+-------------+-------+--------------------+ > > I am afraid it is not working. Not even printing anything > > Cheers > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my kno > wledge but of course cannot be guaranteed . It is essential to note that, > as with any advice, quote "one test result is worth one-thousand expert op > inions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun > <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Mon, 11 Mar 2024 at 05:07, 刘唯 <z920631...@gmail.com> wrote: > >> *now -> not >> >> 刘唯 <z920631...@gmail.com> 于2024年3月10日周日 22:04写道: >> >>> Have you tried using microbatch_data.get("processedRowsPerSecond")? >>> Camel case now snake case >>> >>> Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月10日周日 11:46写道: >>> >>>> >>>> There is a paper from Databricks on this subject >>>> >>>> >>>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html >>>> >>>> But having tested it, there seems to be a bug there that I reported to >>>> Databricks forum as well (in answer to a user question) >>>> >>>> I have come to a conclusion that this is a bug. In general there is a b >>>> ug in obtaining individual values from the dictionary. For example, a b >>>> ug in the way Spark Streaming is populating the processe >>>> d_rows_per_second key within the microbatch_data -> microbatch_data = >>>> event.progres dictionary or any other key. I have explored various deb >>>> ugging steps, and even though the key seems to exist, the value might n >>>> ot be getting set. Note that the dictionary itself prints the elements >>>> correctly. This is with regard to method onQueryProgress(self, event) >>>> in class MyListener(StreamingQueryListener): >>>> >>>> For example with print(microbatch_data), you get all printed as below >>>> >>>> onQueryProgress >>>> microbatch_data received >>>> { >>>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66", >>>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967", >>>> "name" : null, >>>> "timestamp" : "2024-03-10T09:21:27.233Z", >>>> "batchId" : 21, >>>> "numInputRows" : 1, >>>> "inputRowsPerSecond" : 100.0, >>>> "processedRowsPerSecond" : 5.347593582887701, >>>> "durationMs" : { >>>> "addBatch" : 37, >>>> "commitOffsets" : 41, >>>> "getBatch" : 0, >>>> "latestOffset" : 0, >>>> "queryPlanning" : 5, >>>> "triggerExecution" : 187, >>>> "walCommit" : 104 >>>> }, >>>> "stateOperators" : [ ], >>>> "sources" : [ { >>>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, >>>> numPartitions=default", >>>> "startOffset" : 20, >>>> "endOffset" : 21, >>>> "latestOffset" : 21, >>>> "numInputRows" : 1, >>>> "inputRowsPerSecond" : 100.0, >>>> "processedRowsPerSecond" : 5.347593582887701 >>>> } ], >>>> "sink" : { >>>> "description" : "org.apache.spark.sql.execution.streaming.ConsoleT >>>> able$@430a977c", >>>> "numOutputRows" : 1 >>>> } >>>> } >>>> However, the observed behaviour (i.e. processed_rows_per_second is ei >>>> ther None or not being updated correctly). >>>> >>>> The spark version I used for my test is 3.4 >>>> >>>> Sample code uses format=rate for simulating a streaming process. You c >>>> an test the code yourself, all in one >>>> from pyspark.sql import SparkSession >>>> from pyspark.sql.functions import col >>>> from pyspark.sql.streaming import DataStreamWriter, >>>> StreamingQueryListener >>>> from pyspark.sql.functions import col, round, current_timestamp, lit >>>> import uuid >>>> >>>> def process_data(df): >>>> >>>> processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\ >>>> withColumn("doubled_value", col("value") * 2). \ >>>> withColumn("op_type", lit(1)). \ >>>> withColumn("op_time", current_timestamp()) >>>> >>>> return processed_df >>>> >>>> # Create a Spark session >>>> appName = "testListener" >>>> spark = SparkSession.builder.appName(appName).getOrCreate() >>>> >>>> # Define the schema for the streaming data >>>> schema = "key string timestamp timestamp, value long" >>>> >>>> # Define my listener. >>>> class MyListener(StreamingQueryListener): >>>> def onQueryStarted(self, event): >>>> print("onQueryStarted") >>>> print(f"'{event.name}' [{event.id}] got started!") >>>> def onQueryProgress(self, event): >>>> print("onQueryProgress") >>>> # Access micro-batch data >>>> microbatch_data = event.progress >>>> print("microbatch_data received") # Check if data is received >>>> print(microbatch_data) >>>> processed_rows_per_second = >>>> microbatch_data.get("processed_rows_per_second") >>>> if processed_rows_per_second is not None: # Check if value >>>> exists >>>> print("processed_rows_per_second retrieved") >>>> print(f"Processed rows per second: >>>> {processed_rows_per_second}") >>>> else: >>>> print("processed_rows_per_second not retrieved!") >>>> def onQueryTerminated(self, event): >>>> print("onQueryTerminated") >>>> if event.exception: >>>> print(f"Query terminated with exception: {event.exception}") >>>> else: >>>> print("Query successfully terminated.") >>>> # Add my listener. >>>> >>>> listener_instance = MyListener() >>>> spark.streams.addListener(listener_instance) >>>> >>>> >>>> # Create a streaming DataFrame with the rate source >>>> streaming_df = ( >>>> spark.readStream >>>> .format("rate") >>>> .option("rowsPerSecond", 1) >>>> .load() >>>> ) >>>> >>>> # Apply processing function to the streaming DataFrame >>>> processed_streaming_df = process_data(streaming_df) >>>> >>>> # Define the output sink (for example, console sink) >>>> query = ( >>>> processed_streaming_df.select( \ >>>> col("key").alias("key") \ >>>> , col("doubled_value").alias("doubled_value") \ >>>> , col("op_type").alias("op_type") \ >>>> , col("op_time").alias("op_time")). \ >>>> writeStream.\ >>>> outputMode("append").\ >>>> format("console"). \ >>>> start() >>>> ) >>>> >>>> # Wait for the streaming query to terminate >>>> query.awaitTermination() >>>> >>>> HTH >>>> >>>> Mich Talebzadeh, >>>> Dad | Technologist | Solutions Architect | Engineer >>>> London >>>> United Kingdom >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>> >>>> >>>> >>>> *Disclaimer:* The information provided is correct to the best of my kno >>>> wledge but of course cannot be guaranteed . It is essential to note t >>>> hat, as with any advice, quote "one test result is worth one-thousand >>>> expert opinions (Werner >>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>>> >>>