Thanks for the clarification. That makes sense.. In the code below, we can
see

   def onQueryProgress(self, event):
        print("onQueryProgress")
        # Access micro-batch data
        microbatch_data = event.progress
        #print("microbatch_data received")  # Check if data is received
        #print(microbatch_data)
        print(f"Type of microbatch_data is {type(microbatch_data)}")
        #processedRowsPerSecond =
microbatch_data.get("processedRowsPerSecond")  incorrect
        processedRowsPerSecond = microbatch_data.processedRowsPerSecond
        if processedRowsPerSecond is not None:  # Check if value exists
           print("processedRowsPerSecond retrieved")
           print(f"Processed rows per second is ->
{processedRowsPerSecond}")
        else:
           print("processedRowsPerSecond not retrieved!")

The output

onQueryProgress
Type of microbatch_data is <class
'pyspark.sql.streaming.listener.StreamingQueryProgress'>
processedRowsPerSecond retrieved
Processed rows per second is -> 2.570694087403599

So we are dealing with the attribute of the class and NOT the dictionary.

The line (processedRowsPerSecond =
microbatch_data.get("processedRowsPerSecond")) fails because it uses the
.get() method, while the second line (processedRowsPerSecond =
microbatch_data.processedRowsPerSecond) accesses the attribute directly.

In short, they need to ensure that that event.progress* returns a
dictionary *

Cheers

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Tue, 12 Mar 2024 at 04:04, 刘唯 <z920631...@gmail.com> wrote:

> Oh I see why the confusion.
>
> microbatch_data = event.progress
>
> means that microbatch_data is a StreamingQueryProgress instance, it's not
> a dictionary, so you should use ` microbatch_data.processedRowsPerSecond`,
> instead of the `get` method which is used for dictionaries.
>
> But weirdly, for query.lastProgress and query.recentProgress, they should
> return StreamingQueryProgress  but instead they returned a dict. So the
> `get` method works there.
>
> I think PySpark should improve on this part.
>
> Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月11日周一 05:51写道:
>
>> Hi,
>>
>> Thank you for your advice
>>
>> This is the amended code
>>
>>    def onQueryProgress(self, event):
>>         print("onQueryProgress")
>>         # Access micro-batch data
>>         microbatch_data = event.progress
>>         #print("microbatch_data received")  # Check if data is received
>>         #print(microbatch_data)
>>         #processed_rows_per_second =
>> microbatch_data.get("processed_rows_per_second")
>>         processed_rows_per_second =
>> microbatch_data.get("processedRowsPerSecond")
>>         print("CPC", processed_rows_per_second)
>>         if processed_rows_per_second is not None:  # Check if value exists
>>            print("ocessed_rows_per_second retrieved")
>>            print(f"Processed rows per second:
>> {processed_rows_per_second}")
>>         else:
>>            print("processed_rows_per_second not retrieved!")
>>
>> This is the output
>>
>> onQueryStarted
>> 'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started!
>> SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
>> SLF4J: Defaulting to no-operation MDCAdapter implementation.
>> SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for
>> further details.
>> -------------------------------------------
>> Batch: 0
>> -------------------------------------------
>> +---+-------------+-------+-------+
>> |key|doubled_value|op_type|op_time|
>> +---+-------------+-------+-------+
>> +---+-------------+-------+-------+
>>
>> onQueryProgress
>> -------------------------------------------
>> Batch: 1
>> -------------------------------------------
>> +--------------------+-------------+-------+--------------------+
>> |                 key|doubled_value|op_type|             op_time|
>> +--------------------+-------------+-------+--------------------+
>> |a960f663-d13a-49c...|            0|      1|2024-03-11 12:17:...|
>> +--------------------+-------------+-------+--------------------+
>>
>> onQueryProgress
>> -------------------------------------------
>> Batch: 2
>> -------------------------------------------
>> +--------------------+-------------+-------+--------------------+
>> |                 key|doubled_value|op_type|             op_time|
>> +--------------------+-------------+-------+--------------------+
>> |a960f663-d13a-49c...|            2|      1|2024-03-11 12:17:...|
>> +--------------------+-------------+-------+--------------------+
>>
>> I am afraid it is not working. Not even printing anything
>>
>> Cheers
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my kno
>> wledge but of course cannot be guaranteed . It is essential to note that,
>> as with any advice, quote "one test result is worth one-thousand expert
>> opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
>> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>
>>
>> On Mon, 11 Mar 2024 at 05:07, 刘唯 <z920631...@gmail.com> wrote:
>>
>>> *now -> not
>>>
>>> 刘唯 <z920631...@gmail.com> 于2024年3月10日周日 22:04写道:
>>>
>>>> Have you tried using microbatch_data.get("processedRowsPerSecond")?
>>>> Camel case now snake case
>>>>
>>>> Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月10日周日 11:46写道:
>>>>
>>>>>
>>>>> There is a paper from Databricks on this subject
>>>>>
>>>>>
>>>>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>>>>>
>>>>> But having tested it, there seems to be a bug there that I reported to
>>>>> Databricks forum as well (in answer to a user question)
>>>>>
>>>>> I have come to a conclusion that this is a bug. In general there is a
>>>>> bug in obtaining individual values from the dictionary. For example, a
>>>>> bug in the way Spark Streaming is populating the processe
>>>>> d_rows_per_second key within the microbatch_data -> microbatch_data =
>>>>> event.progres dictionary or any other key. I have explored various deb
>>>>> ugging steps, and even though the key seems to exist, the value might
>>>>> not be getting set. Note that the dictionary itself prints the el
>>>>> ements correctly. This is with regard to method onQueryProgress(self,
>>>>> event) in class MyListener(StreamingQueryListener):
>>>>>
>>>>> For example with print(microbatch_data), you get all printed as below
>>>>>
>>>>> onQueryProgress
>>>>> microbatch_data received
>>>>> {
>>>>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
>>>>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
>>>>> "name" : null,
>>>>> "timestamp" : "2024-03-10T09:21:27.233Z",
>>>>> "batchId" : 21,
>>>>> "numInputRows" : 1,
>>>>> "inputRowsPerSecond" : 100.0,
>>>>> "processedRowsPerSecond" : 5.347593582887701,
>>>>> "durationMs" : {
>>>>> "addBatch" : 37,
>>>>> "commitOffsets" : 41,
>>>>> "getBatch" : 0,
>>>>> "latestOffset" : 0,
>>>>> "queryPlanning" : 5,
>>>>> "triggerExecution" : 187,
>>>>> "walCommit" : 104
>>>>> },
>>>>> "stateOperators" : [ ],
>>>>> "sources" : [ {
>>>>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
>>>>> numPartitions=default",
>>>>> "startOffset" : 20,
>>>>> "endOffset" : 21,
>>>>> "latestOffset" : 21,
>>>>> "numInputRows" : 1,
>>>>> "inputRowsPerSecond" : 100.0,
>>>>> "processedRowsPerSecond" : 5.347593582887701
>>>>> } ],
>>>>> "sink" : {
>>>>> "description" : "org.apache.spark.sql.execution.streaming.ConsoleT
>>>>> able$@430a977c",
>>>>> "numOutputRows" : 1
>>>>> }
>>>>> }
>>>>> However, the observed behaviour (i.e. processed_rows_per_second is ei
>>>>> ther None or not being updated correctly).
>>>>>
>>>>> The spark version I used for my test is 3.4
>>>>>
>>>>> Sample code uses format=rate for simulating a streaming process. You c
>>>>> an test the code yourself, all in one
>>>>> from pyspark.sql import SparkSession
>>>>> from pyspark.sql.functions import col
>>>>> from pyspark.sql.streaming import DataStreamWriter,
>>>>> StreamingQueryListener
>>>>> from pyspark.sql.functions import col, round, current_timestamp, lit
>>>>> import uuid
>>>>>
>>>>> def process_data(df):
>>>>>
>>>>>     processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\
>>>>>                       withColumn("doubled_value", col("value") * 2). \
>>>>>                       withColumn("op_type", lit(1)). \
>>>>>                       withColumn("op_time", current_timestamp())
>>>>>
>>>>>     return processed_df
>>>>>
>>>>> # Create a Spark session
>>>>> appName = "testListener"
>>>>> spark = SparkSession.builder.appName(appName).getOrCreate()
>>>>>
>>>>> # Define the schema for the streaming data
>>>>> schema = "key string timestamp timestamp, value long"
>>>>>
>>>>> # Define my listener.
>>>>> class MyListener(StreamingQueryListener):
>>>>>     def onQueryStarted(self, event):
>>>>>         print("onQueryStarted")
>>>>>         print(f"'{event.name}' [{event.id}] got started!")
>>>>>     def onQueryProgress(self, event):
>>>>>         print("onQueryProgress")
>>>>>         # Access micro-batch data
>>>>>         microbatch_data = event.progress
>>>>>         print("microbatch_data received")  # Check if data is received
>>>>>         print(microbatch_data)
>>>>>         processed_rows_per_second =
>>>>> microbatch_data.get("processed_rows_per_second")
>>>>>         if processed_rows_per_second is not None:  # Check if value
>>>>> exists
>>>>>            print("processed_rows_per_second retrieved")
>>>>>            print(f"Processed rows per second:
>>>>> {processed_rows_per_second}")
>>>>>         else:
>>>>>            print("processed_rows_per_second not retrieved!")
>>>>>     def onQueryTerminated(self, event):
>>>>>         print("onQueryTerminated")
>>>>>         if event.exception:
>>>>>             print(f"Query terminated with exception:
>>>>> {event.exception}")
>>>>>         else:
>>>>>             print("Query successfully terminated.")
>>>>>     # Add my listener.
>>>>>
>>>>> listener_instance = MyListener()
>>>>> spark.streams.addListener(listener_instance)
>>>>>
>>>>>
>>>>> # Create a streaming DataFrame with the rate source
>>>>> streaming_df = (
>>>>>     spark.readStream
>>>>>     .format("rate")
>>>>>     .option("rowsPerSecond", 1)
>>>>>     .load()
>>>>> )
>>>>>
>>>>> # Apply processing function to the streaming DataFrame
>>>>> processed_streaming_df = process_data(streaming_df)
>>>>>
>>>>> # Define the output sink (for example, console sink)
>>>>> query = (
>>>>>     processed_streaming_df.select( \
>>>>>                           col("key").alias("key") \
>>>>>                         , col("doubled_value").alias("doubled_value") \
>>>>>                         , col("op_type").alias("op_type") \
>>>>>                         , col("op_time").alias("op_time")). \
>>>>>                         writeStream.\
>>>>>                         outputMode("append").\
>>>>>                         format("console"). \
>>>>>                         start()
>>>>> )
>>>>>
>>>>> # Wait for the streaming query to terminate
>>>>> query.awaitTermination()
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Dad | Technologist | Solutions Architect | Engineer
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* The information provided is correct to the best of my
>>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>>> expert opinions (Werner
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>>
>>>>

Reply via email to