Re: Bug in How to Monitor Streaming Queries in PySpark

2024-03-12 Thread Mich Talebzadeh
Thanks for the clarification. That makes sense.. In the code below, we can
see

   def onQueryProgress(self, event):
print("onQueryProgress")
# Access micro-batch data
microbatch_data = event.progress
#print("microbatch_data received")  # Check if data is received
#print(microbatch_data)
print(f"Type of microbatch_data is {type(microbatch_data)}")
#processedRowsPerSecond =
microbatch_data.get("processedRowsPerSecond")  incorrect
processedRowsPerSecond = microbatch_data.processedRowsPerSecond
if processedRowsPerSecond is not None:  # Check if value exists
   print("processedRowsPerSecond retrieved")
   print(f"Processed rows per second is ->
{processedRowsPerSecond}")
else:
   print("processedRowsPerSecond not retrieved!")

The output

onQueryProgress
Type of microbatch_data is 
processedRowsPerSecond retrieved
Processed rows per second is -> 2.570694087403599

So we are dealing with the attribute of the class and NOT the dictionary.

The line (processedRowsPerSecond =
microbatch_data.get("processedRowsPerSecond")) fails because it uses the
.get() method, while the second line (processedRowsPerSecond =
microbatch_data.processedRowsPerSecond) accesses the attribute directly.

In short, they need to ensure that that event.progress* returns a
dictionary *

Cheers

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 12 Mar 2024 at 04:04, 刘唯  wrote:

> Oh I see why the confusion.
>
> microbatch_data = event.progress
>
> means that microbatch_data is a StreamingQueryProgress instance, it's not
> a dictionary, so you should use ` microbatch_data.processedRowsPerSecond`,
> instead of the `get` method which is used for dictionaries.
>
> But weirdly, for query.lastProgress and query.recentProgress, they should
> return StreamingQueryProgress  but instead they returned a dict. So the
> `get` method works there.
>
> I think PySpark should improve on this part.
>
> Mich Talebzadeh  于2024年3月11日周一 05:51写道:
>
>> Hi,
>>
>> Thank you for your advice
>>
>> This is the amended code
>>
>>def onQueryProgress(self, event):
>> print("onQueryProgress")
>> # Access micro-batch data
>> microbatch_data = event.progress
>> #print("microbatch_data received")  # Check if data is received
>> #print(microbatch_data)
>> #processed_rows_per_second =
>> microbatch_data.get("processed_rows_per_second")
>> processed_rows_per_second =
>> microbatch_data.get("processedRowsPerSecond")
>> print("CPC", processed_rows_per_second)
>> if processed_rows_per_second is not None:  # Check if value exists
>>print("ocessed_rows_per_second retrieved")
>>print(f"Processed rows per second:
>> {processed_rows_per_second}")
>> else:
>>print("processed_rows_per_second not retrieved!")
>>
>> This is the output
>>
>> onQueryStarted
>> 'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started!
>> SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
>> SLF4J: Defaulting to no-operation MDCAdapter implementation.
>> SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for
>> further details.
>> ---
>> Batch: 0
>> ---
>> +---+-+---+---+
>> |key|doubled_value|op_type|op_time|
>> +---+-+---+---+
>> +---+-+---+---+
>>
>> onQueryProgress
>> ---
>> Batch: 1
>> ---
>> ++-+---++
>> | key|doubled_value|op_type| op_time|
>> ++-+---++
>> |a960f663-d13a-49c...|0|  1|2024-03-11 12:17:...|
>> ++-+---++
>>
>> onQueryProgress
>> ---
>> Batch: 2
>> ---
>> ++-+---++
>> | key|doubled_value|op_type| op_time|
>> ++-+---++
>> |a960f663-d13a-49c...|2|  1|2024-03-11 12:17:...|
>> ++-+---++
>>
>> I am afraid it is not 

Re: Bug in How to Monitor Streaming Queries in PySpark

2024-03-11 Thread 刘唯
Oh I see why the confusion.

microbatch_data = event.progress

means that microbatch_data is a StreamingQueryProgress instance, it's not a
dictionary, so you should use ` microbatch_data.processedRowsPerSecond`,
instead of the `get` method which is used for dictionaries.

But weirdly, for query.lastProgress and query.recentProgress, they should
return StreamingQueryProgress  but instead they returned a dict. So the
`get` method works there.

I think PySpark should improve on this part.

Mich Talebzadeh  于2024年3月11日周一 05:51写道:

> Hi,
>
> Thank you for your advice
>
> This is the amended code
>
>def onQueryProgress(self, event):
> print("onQueryProgress")
> # Access micro-batch data
> microbatch_data = event.progress
> #print("microbatch_data received")  # Check if data is received
> #print(microbatch_data)
> #processed_rows_per_second =
> microbatch_data.get("processed_rows_per_second")
> processed_rows_per_second =
> microbatch_data.get("processedRowsPerSecond")
> print("CPC", processed_rows_per_second)
> if processed_rows_per_second is not None:  # Check if value exists
>print("ocessed_rows_per_second retrieved")
>print(f"Processed rows per second: {processed_rows_per_second}")
> else:
>print("processed_rows_per_second not retrieved!")
>
> This is the output
>
> onQueryStarted
> 'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started!
> SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
> SLF4J: Defaulting to no-operation MDCAdapter implementation.
> SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for
> further details.
> ---
> Batch: 0
> ---
> +---+-+---+---+
> |key|doubled_value|op_type|op_time|
> +---+-+---+---+
> +---+-+---+---+
>
> onQueryProgress
> ---
> Batch: 1
> ---
> ++-+---++
> | key|doubled_value|op_type| op_time|
> ++-+---++
> |a960f663-d13a-49c...|0|  1|2024-03-11 12:17:...|
> ++-+---++
>
> onQueryProgress
> ---
> Batch: 2
> ---
> ++-+---++
> | key|doubled_value|op_type| op_time|
> ++-+---++
> |a960f663-d13a-49c...|2|  1|2024-03-11 12:17:...|
> ++-+---++
>
> I am afraid it is not working. Not even printing anything
>
> Cheers
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my kno
> wledge but of course cannot be guaranteed . It is essential to note that,
> as with any advice, quote "one test result is worth one-thousand expert op
> inions (Werner  Von Braun
> )".
>
>
> On Mon, 11 Mar 2024 at 05:07, 刘唯  wrote:
>
>> *now -> not
>>
>> 刘唯  于2024年3月10日周日 22:04写道:
>>
>>> Have you tried using microbatch_data.get("processedRowsPerSecond")?
>>> Camel case now snake case
>>>
>>> Mich Talebzadeh  于2024年3月10日周日 11:46写道:
>>>

 There is a paper from Databricks on this subject


 https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html

 But having tested it, there seems to be a bug there that I reported to
 Databricks forum as well (in answer to a user question)

 I have come to a conclusion that this is a bug. In general there is a b
 ug in obtaining individual values from the dictionary. For example, a b
 ug in the way Spark Streaming is populating the processe
 d_rows_per_second key within the microbatch_data -> microbatch_data =
 event.progres dictionary or any other key. I have explored various deb
 ugging steps, and even though the key seems to exist, the value might n
 ot be getting set. Note that the dictionary itself prints the elements
 correctly. This is with regard to method onQueryProgress(self, event)
 in class MyListener(StreamingQueryListener):

 For example with print(microbatch_data), you get all printed as below

 onQueryProgress
 microbatch_data received
 {
 "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
 "runId" : 

Re: Bug in How to Monitor Streaming Queries in PySpark

2024-03-11 Thread Mich Talebzadeh
Hi,

Thank you for your advice

This is the amended code

   def onQueryProgress(self, event):
print("onQueryProgress")
# Access micro-batch data
microbatch_data = event.progress
#print("microbatch_data received")  # Check if data is received
#print(microbatch_data)
#processed_rows_per_second =
microbatch_data.get("processed_rows_per_second")
processed_rows_per_second =
microbatch_data.get("processedRowsPerSecond")
print("CPC", processed_rows_per_second)
if processed_rows_per_second is not None:  # Check if value exists
   print("ocessed_rows_per_second retrieved")
   print(f"Processed rows per second: {processed_rows_per_second}")
else:
   print("processed_rows_per_second not retrieved!")

This is the output

onQueryStarted
'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started!
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further
details.
---
Batch: 0
---
+---+-+---+---+
|key|doubled_value|op_type|op_time|
+---+-+---+---+
+---+-+---+---+

onQueryProgress
---
Batch: 1
---
++-+---++
| key|doubled_value|op_type| op_time|
++-+---++
|a960f663-d13a-49c...|0|  1|2024-03-11 12:17:...|
++-+---++

onQueryProgress
---
Batch: 2
---
++-+---++
| key|doubled_value|op_type| op_time|
++-+---++
|a960f663-d13a-49c...|2|  1|2024-03-11 12:17:...|
++-+---++

I am afraid it is not working. Not even printing anything

Cheers

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Mon, 11 Mar 2024 at 05:07, 刘唯  wrote:

> *now -> not
>
> 刘唯  于2024年3月10日周日 22:04写道:
>
>> Have you tried using microbatch_data.get("processedRowsPerSecond")?
>> Camel case now snake case
>>
>> Mich Talebzadeh  于2024年3月10日周日 11:46写道:
>>
>>>
>>> There is a paper from Databricks on this subject
>>>
>>>
>>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>>>
>>> But having tested it, there seems to be a bug there that I reported to
>>> Databricks forum as well (in answer to a user question)
>>>
>>> I have come to a conclusion that this is a bug. In general there is a
>>> bug in obtaining individual values from the dictionary. For example, a bug
>>> in the way Spark Streaming is populating the processed_rows_per_second key
>>> within the microbatch_data -> microbatch_data = event.progres dictionary or
>>> any other key. I have explored various debugging steps, and even though the
>>> key seems to exist, the value might not be getting set. Note that the
>>> dictionary itself prints the elements correctly. This is with regard to
>>> method onQueryProgress(self, event) in class
>>> MyListener(StreamingQueryListener):
>>>
>>> For example with print(microbatch_data), you get all printed as below
>>>
>>> onQueryProgress
>>> microbatch_data received
>>> {
>>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
>>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
>>> "name" : null,
>>> "timestamp" : "2024-03-10T09:21:27.233Z",
>>> "batchId" : 21,
>>> "numInputRows" : 1,
>>> "inputRowsPerSecond" : 100.0,
>>> "processedRowsPerSecond" : 5.347593582887701,
>>> "durationMs" : {
>>> "addBatch" : 37,
>>> "commitOffsets" : 41,
>>> "getBatch" : 0,
>>> "latestOffset" : 0,
>>> "queryPlanning" : 5,
>>> "triggerExecution" : 187,
>>> "walCommit" : 104
>>> },
>>> "stateOperators" : [ ],
>>> "sources" : [ {
>>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
>>> numPartitions=default",
>>> "startOffset" : 20,
>>> "endOffset" : 21,
>>> "latestOffset" : 21,
>>> "numInputRows" : 1,
>>> "inputRowsPerSecond" : 100.0,
>>> "processedRowsPerSecond" : 

Re: Bug in How to Monitor Streaming Queries in PySpark

2024-03-10 Thread 刘唯
*now -> not

刘唯  于2024年3月10日周日 22:04写道:

> Have you tried using microbatch_data.get("processedRowsPerSecond")?
> Camel case now snake case
>
> Mich Talebzadeh  于2024年3月10日周日 11:46写道:
>
>>
>> There is a paper from Databricks on this subject
>>
>>
>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>>
>> But having tested it, there seems to be a bug there that I reported to
>> Databricks forum as well (in answer to a user question)
>>
>> I have come to a conclusion that this is a bug. In general there is a bug
>> in obtaining individual values from the dictionary. For example, a bug in
>> the way Spark Streaming is populating the processed_rows_per_second key
>> within the microbatch_data -> microbatch_data = event.progres dictionary or
>> any other key. I have explored various debugging steps, and even though the
>> key seems to exist, the value might not be getting set. Note that the
>> dictionary itself prints the elements correctly. This is with regard to
>> method onQueryProgress(self, event) in class
>> MyListener(StreamingQueryListener):
>>
>> For example with print(microbatch_data), you get all printed as below
>>
>> onQueryProgress
>> microbatch_data received
>> {
>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
>> "name" : null,
>> "timestamp" : "2024-03-10T09:21:27.233Z",
>> "batchId" : 21,
>> "numInputRows" : 1,
>> "inputRowsPerSecond" : 100.0,
>> "processedRowsPerSecond" : 5.347593582887701,
>> "durationMs" : {
>> "addBatch" : 37,
>> "commitOffsets" : 41,
>> "getBatch" : 0,
>> "latestOffset" : 0,
>> "queryPlanning" : 5,
>> "triggerExecution" : 187,
>> "walCommit" : 104
>> },
>> "stateOperators" : [ ],
>> "sources" : [ {
>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
>> numPartitions=default",
>> "startOffset" : 20,
>> "endOffset" : 21,
>> "latestOffset" : 21,
>> "numInputRows" : 1,
>> "inputRowsPerSecond" : 100.0,
>> "processedRowsPerSecond" : 5.347593582887701
>> } ],
>> "sink" : {
>> "description" :
>> "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
>> "numOutputRows" : 1
>> }
>> }
>> However, the observed behaviour (i.e. processed_rows_per_second is either
>> None or not being updated correctly).
>>
>> The spark version I used for my test is 3.4
>>
>> Sample code uses format=rate for simulating a streaming process. You can
>> test the code yourself, all in one
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import col
>> from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener
>> from pyspark.sql.functions import col, round, current_timestamp, lit
>> import uuid
>>
>> def process_data(df):
>>
>> processed_df = df.withColumn("key", lit(str(uuid.uuid4(.\
>>   withColumn("doubled_value", col("value") * 2). \
>>   withColumn("op_type", lit(1)). \
>>   withColumn("op_time", current_timestamp())
>>
>> return processed_df
>>
>> # Create a Spark session
>> appName = "testListener"
>> spark = SparkSession.builder.appName(appName).getOrCreate()
>>
>> # Define the schema for the streaming data
>> schema = "key string timestamp timestamp, value long"
>>
>> # Define my listener.
>> class MyListener(StreamingQueryListener):
>> def onQueryStarted(self, event):
>> print("onQueryStarted")
>> print(f"'{event.name}' [{event.id}] got started!")
>> def onQueryProgress(self, event):
>> print("onQueryProgress")
>> # Access micro-batch data
>> microbatch_data = event.progress
>> print("microbatch_data received")  # Check if data is received
>> print(microbatch_data)
>> processed_rows_per_second =
>> microbatch_data.get("processed_rows_per_second")
>> if processed_rows_per_second is not None:  # Check if value exists
>>print("processed_rows_per_second retrieved")
>>print(f"Processed rows per second:
>> {processed_rows_per_second}")
>> else:
>>print("processed_rows_per_second not retrieved!")
>> def onQueryTerminated(self, event):
>> print("onQueryTerminated")
>> if event.exception:
>> print(f"Query terminated with exception: {event.exception}")
>> else:
>> print("Query successfully terminated.")
>> # Add my listener.
>>
>> listener_instance = MyListener()
>> spark.streams.addListener(listener_instance)
>>
>>
>> # Create a streaming DataFrame with the rate source
>> streaming_df = (
>> spark.readStream
>> .format("rate")
>> .option("rowsPerSecond", 1)
>> .load()
>> )
>>
>> # Apply processing function to the streaming DataFrame
>> processed_streaming_df = process_data(streaming_df)
>>
>> # Define the output sink (for example, console sink)
>> query = (
>> processed_streaming_df.select( \
>>   col("key").alias("key") \
>> , 

Re: Bug in How to Monitor Streaming Queries in PySpark

2024-03-10 Thread 刘唯
Have you tried using microbatch_data.get("processedRowsPerSecond")?
Camel case now snake case

Mich Talebzadeh  于2024年3月10日周日 11:46写道:

>
> There is a paper from Databricks on this subject
>
>
> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>
> But having tested it, there seems to be a bug there that I reported to
> Databricks forum as well (in answer to a user question)
>
> I have come to a conclusion that this is a bug. In general there is a bug
> in obtaining individual values from the dictionary. For example, a bug in
> the way Spark Streaming is populating the processed_rows_per_second key
> within the microbatch_data -> microbatch_data = event.progres dictionary or
> any other key. I have explored various debugging steps, and even though the
> key seems to exist, the value might not be getting set. Note that the
> dictionary itself prints the elements correctly. This is with regard to
> method onQueryProgress(self, event) in class
> MyListener(StreamingQueryListener):
>
> For example with print(microbatch_data), you get all printed as below
>
> onQueryProgress
> microbatch_data received
> {
> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
> "name" : null,
> "timestamp" : "2024-03-10T09:21:27.233Z",
> "batchId" : 21,
> "numInputRows" : 1,
> "inputRowsPerSecond" : 100.0,
> "processedRowsPerSecond" : 5.347593582887701,
> "durationMs" : {
> "addBatch" : 37,
> "commitOffsets" : 41,
> "getBatch" : 0,
> "latestOffset" : 0,
> "queryPlanning" : 5,
> "triggerExecution" : 187,
> "walCommit" : 104
> },
> "stateOperators" : [ ],
> "sources" : [ {
> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
> numPartitions=default",
> "startOffset" : 20,
> "endOffset" : 21,
> "latestOffset" : 21,
> "numInputRows" : 1,
> "inputRowsPerSecond" : 100.0,
> "processedRowsPerSecond" : 5.347593582887701
> } ],
> "sink" : {
> "description" :
> "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
> "numOutputRows" : 1
> }
> }
> However, the observed behaviour (i.e. processed_rows_per_second is either
> None or not being updated correctly).
>
> The spark version I used for my test is 3.4
>
> Sample code uses format=rate for simulating a streaming process. You can
> test the code yourself, all in one
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import col
> from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener
> from pyspark.sql.functions import col, round, current_timestamp, lit
> import uuid
>
> def process_data(df):
>
> processed_df = df.withColumn("key", lit(str(uuid.uuid4(.\
>   withColumn("doubled_value", col("value") * 2). \
>   withColumn("op_type", lit(1)). \
>   withColumn("op_time", current_timestamp())
>
> return processed_df
>
> # Create a Spark session
> appName = "testListener"
> spark = SparkSession.builder.appName(appName).getOrCreate()
>
> # Define the schema for the streaming data
> schema = "key string timestamp timestamp, value long"
>
> # Define my listener.
> class MyListener(StreamingQueryListener):
> def onQueryStarted(self, event):
> print("onQueryStarted")
> print(f"'{event.name}' [{event.id}] got started!")
> def onQueryProgress(self, event):
> print("onQueryProgress")
> # Access micro-batch data
> microbatch_data = event.progress
> print("microbatch_data received")  # Check if data is received
> print(microbatch_data)
> processed_rows_per_second =
> microbatch_data.get("processed_rows_per_second")
> if processed_rows_per_second is not None:  # Check if value exists
>print("processed_rows_per_second retrieved")
>print(f"Processed rows per second: {processed_rows_per_second}")
> else:
>print("processed_rows_per_second not retrieved!")
> def onQueryTerminated(self, event):
> print("onQueryTerminated")
> if event.exception:
> print(f"Query terminated with exception: {event.exception}")
> else:
> print("Query successfully terminated.")
> # Add my listener.
>
> listener_instance = MyListener()
> spark.streams.addListener(listener_instance)
>
>
> # Create a streaming DataFrame with the rate source
> streaming_df = (
> spark.readStream
> .format("rate")
> .option("rowsPerSecond", 1)
> .load()
> )
>
> # Apply processing function to the streaming DataFrame
> processed_streaming_df = process_data(streaming_df)
>
> # Define the output sink (for example, console sink)
> query = (
> processed_streaming_df.select( \
>   col("key").alias("key") \
> , col("doubled_value").alias("doubled_value") \
> , col("op_type").alias("op_type") \
> , col("op_time").alias("op_time")). \
> 

Bug in How to Monitor Streaming Queries in PySpark

2024-03-10 Thread Mich Talebzadeh
There is a paper from Databricks on this subject

https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html

But having tested it, there seems to be a bug there that I reported to
Databricks forum as well (in answer to a user question)

I have come to a conclusion that this is a bug. In general there is a bug
in obtaining individual values from the dictionary. For example, a bug in
the way Spark Streaming is populating the processed_rows_per_second key
within the microbatch_data -> microbatch_data = event.progres dictionary or
any other key. I have explored various debugging steps, and even though the
key seems to exist, the value might not be getting set. Note that the
dictionary itself prints the elements correctly. This is with regard to
method onQueryProgress(self, event) in class
MyListener(StreamingQueryListener):

For example with print(microbatch_data), you get all printed as below

onQueryProgress
microbatch_data received
{
"id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
"runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
"name" : null,
"timestamp" : "2024-03-10T09:21:27.233Z",
"batchId" : 21,
"numInputRows" : 1,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 5.347593582887701,
"durationMs" : {
"addBatch" : 37,
"commitOffsets" : 41,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 5,
"triggerExecution" : 187,
"walCommit" : 104
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
numPartitions=default",
"startOffset" : 20,
"endOffset" : 21,
"latestOffset" : 21,
"numInputRows" : 1,
"inputRowsPerSecond" : 100.0,
"processedRowsPerSecond" : 5.347593582887701
} ],
"sink" : {
"description" :
"org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
"numOutputRows" : 1
}
}
However, the observed behaviour (i.e. processed_rows_per_second is either
None or not being updated correctly).

The spark version I used for my test is 3.4

Sample code uses format=rate for simulating a streaming process. You can
test the code yourself, all in one
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener
from pyspark.sql.functions import col, round, current_timestamp, lit
import uuid

def process_data(df):

processed_df = df.withColumn("key", lit(str(uuid.uuid4(.\
  withColumn("doubled_value", col("value") * 2). \
  withColumn("op_type", lit(1)). \
  withColumn("op_time", current_timestamp())

return processed_df

# Create a Spark session
appName = "testListener"
spark = SparkSession.builder.appName(appName).getOrCreate()

# Define the schema for the streaming data
schema = "key string timestamp timestamp, value long"

# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("onQueryStarted")
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
print("onQueryProgress")
# Access micro-batch data
microbatch_data = event.progress
print("microbatch_data received")  # Check if data is received
print(microbatch_data)
processed_rows_per_second =
microbatch_data.get("processed_rows_per_second")
if processed_rows_per_second is not None:  # Check if value exists
   print("processed_rows_per_second retrieved")
   print(f"Processed rows per second: {processed_rows_per_second}")
else:
   print("processed_rows_per_second not retrieved!")
def onQueryTerminated(self, event):
print("onQueryTerminated")
if event.exception:
print(f"Query terminated with exception: {event.exception}")
else:
print("Query successfully terminated.")
# Add my listener.

listener_instance = MyListener()
spark.streams.addListener(listener_instance)


# Create a streaming DataFrame with the rate source
streaming_df = (
spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
)

# Apply processing function to the streaming DataFrame
processed_streaming_df = process_data(streaming_df)

# Define the output sink (for example, console sink)
query = (
processed_streaming_df.select( \
  col("key").alias("key") \
, col("doubled_value").alias("doubled_value") \
, col("op_type").alias("op_type") \
, col("op_time").alias("op_time")). \
writeStream.\
outputMode("append").\
format("console"). \
start()
)

# Wait for the streaming query to terminate
query.awaitTermination()

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile