Hi,

Thank you for your advice

This is the amended code

   def onQueryProgress(self, event):
        print("onQueryProgress")
        # Access micro-batch data
        microbatch_data = event.progress
        #print("microbatch_data received")  # Check if data is received
        #print(microbatch_data)
        #processed_rows_per_second =
microbatch_data.get("processed_rows_per_second")
        processed_rows_per_second =
microbatch_data.get("processedRowsPerSecond")
        print("CPC", processed_rows_per_second)
        if processed_rows_per_second is not None:  # Check if value exists
           print("ocessed_rows_per_second retrieved")
           print(f"Processed rows per second: {processed_rows_per_second}")
        else:
           print("processed_rows_per_second not retrieved!")

This is the output

onQueryStarted
'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started!
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further
details.
-------------------------------------------
Batch: 0
-------------------------------------------
+---+-------------+-------+-------+
|key|doubled_value|op_type|op_time|
+---+-------------+-------+-------+
+---+-------------+-------+-------+

onQueryProgress
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-------------+-------+--------------------+
|                 key|doubled_value|op_type|             op_time|
+--------------------+-------------+-------+--------------------+
|a960f663-d13a-49c...|            0|      1|2024-03-11 12:17:...|
+--------------------+-------------+-------+--------------------+

onQueryProgress
-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-------------+-------+--------------------+
|                 key|doubled_value|op_type|             op_time|
+--------------------+-------------+-------+--------------------+
|a960f663-d13a-49c...|            2|      1|2024-03-11 12:17:...|
+--------------------+-------------+-------+--------------------+

I am afraid it is not working. Not even printing anything

Cheers

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Mon, 11 Mar 2024 at 05:07, 刘唯 <z920631...@gmail.com> wrote:

> *now -> not
>
> 刘唯 <z920631...@gmail.com> 于2024年3月10日周日 22:04写道:
>
>> Have you tried using microbatch_data.get("processedRowsPerSecond")?
>> Camel case now snake case
>>
>> Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月10日周日 11:46写道:
>>
>>>
>>> There is a paper from Databricks on this subject
>>>
>>>
>>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>>>
>>> But having tested it, there seems to be a bug there that I reported to
>>> Databricks forum as well (in answer to a user question)
>>>
>>> I have come to a conclusion that this is a bug. In general there is a
>>> bug in obtaining individual values from the dictionary. For example, a bug
>>> in the way Spark Streaming is populating the processed_rows_per_second key
>>> within the microbatch_data -> microbatch_data = event.progres dictionary or
>>> any other key. I have explored various debugging steps, and even though the
>>> key seems to exist, the value might not be getting set. Note that the
>>> dictionary itself prints the elements correctly. This is with regard to
>>> method onQueryProgress(self, event) in class
>>> MyListener(StreamingQueryListener):
>>>
>>> For example with print(microbatch_data), you get all printed as below
>>>
>>> onQueryProgress
>>> microbatch_data received
>>> {
>>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
>>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
>>> "name" : null,
>>> "timestamp" : "2024-03-10T09:21:27.233Z",
>>> "batchId" : 21,
>>> "numInputRows" : 1,
>>> "inputRowsPerSecond" : 100.0,
>>> "processedRowsPerSecond" : 5.347593582887701,
>>> "durationMs" : {
>>> "addBatch" : 37,
>>> "commitOffsets" : 41,
>>> "getBatch" : 0,
>>> "latestOffset" : 0,
>>> "queryPlanning" : 5,
>>> "triggerExecution" : 187,
>>> "walCommit" : 104
>>> },
>>> "stateOperators" : [ ],
>>> "sources" : [ {
>>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
>>> numPartitions=default",
>>> "startOffset" : 20,
>>> "endOffset" : 21,
>>> "latestOffset" : 21,
>>> "numInputRows" : 1,
>>> "inputRowsPerSecond" : 100.0,
>>> "processedRowsPerSecond" : 5.347593582887701
>>> } ],
>>> "sink" : {
>>> "description" :
>>> "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
>>> "numOutputRows" : 1
>>> }
>>> }
>>> However, the observed behaviour (i.e. processed_rows_per_second is
>>> either None or not being updated correctly).
>>>
>>> The spark version I used for my test is 3.4
>>>
>>> Sample code uses format=rate for simulating a streaming process. You can
>>> test the code yourself, all in one
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import col
>>> from pyspark.sql.streaming import DataStreamWriter,
>>> StreamingQueryListener
>>> from pyspark.sql.functions import col, round, current_timestamp, lit
>>> import uuid
>>>
>>> def process_data(df):
>>>
>>>     processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\
>>>                       withColumn("doubled_value", col("value") * 2). \
>>>                       withColumn("op_type", lit(1)). \
>>>                       withColumn("op_time", current_timestamp())
>>>
>>>     return processed_df
>>>
>>> # Create a Spark session
>>> appName = "testListener"
>>> spark = SparkSession.builder.appName(appName).getOrCreate()
>>>
>>> # Define the schema for the streaming data
>>> schema = "key string timestamp timestamp, value long"
>>>
>>> # Define my listener.
>>> class MyListener(StreamingQueryListener):
>>>     def onQueryStarted(self, event):
>>>         print("onQueryStarted")
>>>         print(f"'{event.name}' [{event.id}] got started!")
>>>     def onQueryProgress(self, event):
>>>         print("onQueryProgress")
>>>         # Access micro-batch data
>>>         microbatch_data = event.progress
>>>         print("microbatch_data received")  # Check if data is received
>>>         print(microbatch_data)
>>>         processed_rows_per_second =
>>> microbatch_data.get("processed_rows_per_second")
>>>         if processed_rows_per_second is not None:  # Check if value
>>> exists
>>>            print("processed_rows_per_second retrieved")
>>>            print(f"Processed rows per second:
>>> {processed_rows_per_second}")
>>>         else:
>>>            print("processed_rows_per_second not retrieved!")
>>>     def onQueryTerminated(self, event):
>>>         print("onQueryTerminated")
>>>         if event.exception:
>>>             print(f"Query terminated with exception: {event.exception}")
>>>         else:
>>>             print("Query successfully terminated.")
>>>     # Add my listener.
>>>
>>> listener_instance = MyListener()
>>> spark.streams.addListener(listener_instance)
>>>
>>>
>>> # Create a streaming DataFrame with the rate source
>>> streaming_df = (
>>>     spark.readStream
>>>     .format("rate")
>>>     .option("rowsPerSecond", 1)
>>>     .load()
>>> )
>>>
>>> # Apply processing function to the streaming DataFrame
>>> processed_streaming_df = process_data(streaming_df)
>>>
>>> # Define the output sink (for example, console sink)
>>> query = (
>>>     processed_streaming_df.select( \
>>>                           col("key").alias("key") \
>>>                         , col("doubled_value").alias("doubled_value") \
>>>                         , col("op_type").alias("op_type") \
>>>                         , col("op_time").alias("op_time")). \
>>>                         writeStream.\
>>>                         outputMode("append").\
>>>                         format("console"). \
>>>                         start()
>>> )
>>>
>>> # Wait for the streaming query to terminate
>>> query.awaitTermination()
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>

Reply via email to