Oh I see why the confusion.

microbatch_data = event.progress

means that microbatch_data is a StreamingQueryProgress instance, it's not a
dictionary, so you should use ` microbatch_data.processedRowsPerSecond`,
instead of the `get` method which is used for dictionaries.

But weirdly, for query.lastProgress and query.recentProgress, they should
return StreamingQueryProgress  but instead they returned a dict. So the
`get` method works there.

I think PySpark should improve on this part.

Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月11日周一 05:51写道:

> Hi,
>
> Thank you for your advice
>
> This is the amended code
>
>    def onQueryProgress(self, event):
>         print("onQueryProgress")
>         # Access micro-batch data
>         microbatch_data = event.progress
>         #print("microbatch_data received")  # Check if data is received
>         #print(microbatch_data)
>         #processed_rows_per_second =
> microbatch_data.get("processed_rows_per_second")
>         processed_rows_per_second =
> microbatch_data.get("processedRowsPerSecond")
>         print("CPC", processed_rows_per_second)
>         if processed_rows_per_second is not None:  # Check if value exists
>            print("ocessed_rows_per_second retrieved")
>            print(f"Processed rows per second: {processed_rows_per_second}")
>         else:
>            print("processed_rows_per_second not retrieved!")
>
> This is the output
>
> onQueryStarted
> 'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started!
> SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
> SLF4J: Defaulting to no-operation MDCAdapter implementation.
> SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for
> further details.
> -------------------------------------------
> Batch: 0
> -------------------------------------------
> +---+-------------+-------+-------+
> |key|doubled_value|op_type|op_time|
> +---+-------------+-------+-------+
> +---+-------------+-------+-------+
>
> onQueryProgress
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +--------------------+-------------+-------+--------------------+
> |                 key|doubled_value|op_type|             op_time|
> +--------------------+-------------+-------+--------------------+
> |a960f663-d13a-49c...|            0|      1|2024-03-11 12:17:...|
> +--------------------+-------------+-------+--------------------+
>
> onQueryProgress
> -------------------------------------------
> Batch: 2
> -------------------------------------------
> +--------------------+-------------+-------+--------------------+
> |                 key|doubled_value|op_type|             op_time|
> +--------------------+-------------+-------+--------------------+
> |a960f663-d13a-49c...|            2|      1|2024-03-11 12:17:...|
> +--------------------+-------------+-------+--------------------+
>
> I am afraid it is not working. Not even printing anything
>
> Cheers
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my kno
> wledge but of course cannot be guaranteed . It is essential to note that,
> as with any advice, quote "one test result is worth one-thousand expert op
> inions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Mon, 11 Mar 2024 at 05:07, 刘唯 <z920631...@gmail.com> wrote:
>
>> *now -> not
>>
>> 刘唯 <z920631...@gmail.com> 于2024年3月10日周日 22:04写道:
>>
>>> Have you tried using microbatch_data.get("processedRowsPerSecond")?
>>> Camel case now snake case
>>>
>>> Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月10日周日 11:46写道:
>>>
>>>>
>>>> There is a paper from Databricks on this subject
>>>>
>>>>
>>>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>>>>
>>>> But having tested it, there seems to be a bug there that I reported to
>>>> Databricks forum as well (in answer to a user question)
>>>>
>>>> I have come to a conclusion that this is a bug. In general there is a b
>>>> ug in obtaining individual values from the dictionary. For example, a b
>>>> ug in the way Spark Streaming is populating the processe
>>>> d_rows_per_second key within the microbatch_data -> microbatch_data =
>>>> event.progres dictionary or any other key. I have explored various deb
>>>> ugging steps, and even though the key seems to exist, the value might n
>>>> ot be getting set. Note that the dictionary itself prints the elements
>>>> correctly. This is with regard to method onQueryProgress(self, event)
>>>> in class MyListener(StreamingQueryListener):
>>>>
>>>> For example with print(microbatch_data), you get all printed as below
>>>>
>>>> onQueryProgress
>>>> microbatch_data received
>>>> {
>>>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
>>>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
>>>> "name" : null,
>>>> "timestamp" : "2024-03-10T09:21:27.233Z",
>>>> "batchId" : 21,
>>>> "numInputRows" : 1,
>>>> "inputRowsPerSecond" : 100.0,
>>>> "processedRowsPerSecond" : 5.347593582887701,
>>>> "durationMs" : {
>>>> "addBatch" : 37,
>>>> "commitOffsets" : 41,
>>>> "getBatch" : 0,
>>>> "latestOffset" : 0,
>>>> "queryPlanning" : 5,
>>>> "triggerExecution" : 187,
>>>> "walCommit" : 104
>>>> },
>>>> "stateOperators" : [ ],
>>>> "sources" : [ {
>>>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
>>>> numPartitions=default",
>>>> "startOffset" : 20,
>>>> "endOffset" : 21,
>>>> "latestOffset" : 21,
>>>> "numInputRows" : 1,
>>>> "inputRowsPerSecond" : 100.0,
>>>> "processedRowsPerSecond" : 5.347593582887701
>>>> } ],
>>>> "sink" : {
>>>> "description" : "org.apache.spark.sql.execution.streaming.ConsoleT
>>>> able$@430a977c",
>>>> "numOutputRows" : 1
>>>> }
>>>> }
>>>> However, the observed behaviour (i.e. processed_rows_per_second is ei
>>>> ther None or not being updated correctly).
>>>>
>>>> The spark version I used for my test is 3.4
>>>>
>>>> Sample code uses format=rate for simulating a streaming process. You c
>>>> an test the code yourself, all in one
>>>> from pyspark.sql import SparkSession
>>>> from pyspark.sql.functions import col
>>>> from pyspark.sql.streaming import DataStreamWriter,
>>>> StreamingQueryListener
>>>> from pyspark.sql.functions import col, round, current_timestamp, lit
>>>> import uuid
>>>>
>>>> def process_data(df):
>>>>
>>>>     processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\
>>>>                       withColumn("doubled_value", col("value") * 2). \
>>>>                       withColumn("op_type", lit(1)). \
>>>>                       withColumn("op_time", current_timestamp())
>>>>
>>>>     return processed_df
>>>>
>>>> # Create a Spark session
>>>> appName = "testListener"
>>>> spark = SparkSession.builder.appName(appName).getOrCreate()
>>>>
>>>> # Define the schema for the streaming data
>>>> schema = "key string timestamp timestamp, value long"
>>>>
>>>> # Define my listener.
>>>> class MyListener(StreamingQueryListener):
>>>>     def onQueryStarted(self, event):
>>>>         print("onQueryStarted")
>>>>         print(f"'{event.name}' [{event.id}] got started!")
>>>>     def onQueryProgress(self, event):
>>>>         print("onQueryProgress")
>>>>         # Access micro-batch data
>>>>         microbatch_data = event.progress
>>>>         print("microbatch_data received")  # Check if data is received
>>>>         print(microbatch_data)
>>>>         processed_rows_per_second =
>>>> microbatch_data.get("processed_rows_per_second")
>>>>         if processed_rows_per_second is not None:  # Check if value
>>>> exists
>>>>            print("processed_rows_per_second retrieved")
>>>>            print(f"Processed rows per second:
>>>> {processed_rows_per_second}")
>>>>         else:
>>>>            print("processed_rows_per_second not retrieved!")
>>>>     def onQueryTerminated(self, event):
>>>>         print("onQueryTerminated")
>>>>         if event.exception:
>>>>             print(f"Query terminated with exception: {event.exception}")
>>>>         else:
>>>>             print("Query successfully terminated.")
>>>>     # Add my listener.
>>>>
>>>> listener_instance = MyListener()
>>>> spark.streams.addListener(listener_instance)
>>>>
>>>>
>>>> # Create a streaming DataFrame with the rate source
>>>> streaming_df = (
>>>>     spark.readStream
>>>>     .format("rate")
>>>>     .option("rowsPerSecond", 1)
>>>>     .load()
>>>> )
>>>>
>>>> # Apply processing function to the streaming DataFrame
>>>> processed_streaming_df = process_data(streaming_df)
>>>>
>>>> # Define the output sink (for example, console sink)
>>>> query = (
>>>>     processed_streaming_df.select( \
>>>>                           col("key").alias("key") \
>>>>                         , col("doubled_value").alias("doubled_value") \
>>>>                         , col("op_type").alias("op_type") \
>>>>                         , col("op_time").alias("op_time")). \
>>>>                         writeStream.\
>>>>                         outputMode("append").\
>>>>                         format("console"). \
>>>>                         start()
>>>> )
>>>>
>>>> # Wait for the streaming query to terminate
>>>> query.awaitTermination()
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Dad | Technologist | Solutions Architect | Engineer
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* The information provided is correct to the best of my kno
>>>> wledge but of course cannot be guaranteed . It is essential to note t
>>>> hat, as with any advice, quote "one test result is worth one-thousand
>>>> expert opinions (Werner
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>
>>>

Reply via email to