Re: Bug in How to Monitor Streaming Queries in PySpark

2024-03-10 Thread
Have you tried using microbatch_data.get("processedRowsPerSecond")?
Camel case now snake case

Mich Talebzadeh  于2024年3月10日周日 11:46写道:

>
> There is a paper from Databricks on this subject
>
>
> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>
> But having tested it, there seems to be a bug there that I reported to
> Databricks forum as well (in answer to a user question)
>
> I have come to a conclusion that this is a bug. In general there is a bug
> in obtaining individual values from the dictionary. For example, a bug in
> the way Spark Streaming is populating the processed_rows_per_second key
> within the microbatch_data -> microbatch_data = event.progres dictionary or
> any other key. I have explored various debugging steps, and even though the
> key seems to exist, the value might not be getting set. Note that the
> dictionary itself prints the elements correctly. This is with regard to
> method onQueryProgress(self, event) in class
> MyListener(StreamingQueryListener):
>
> For example with print(microbatch_data), you get all printed as below
>
> onQueryProgress
> microbatch_data received
> {
> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
> "name" : null,
> "timestamp" : "2024-03-10T09:21:27.233Z",
> "batchId" : 21,
> "numInputRows" : 1,
> "inputRowsPerSecond" : 100.0,
> "processedRowsPerSecond" : 5.347593582887701,
> "durationMs" : {
> "addBatch" : 37,
> "commitOffsets" : 41,
> "getBatch" : 0,
> "latestOffset" : 0,
> "queryPlanning" : 5,
> "triggerExecution" : 187,
> "walCommit" : 104
> },
> "stateOperators" : [ ],
> "sources" : [ {
> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
> numPartitions=default",
> "startOffset" : 20,
> "endOffset" : 21,
> "latestOffset" : 21,
> "numInputRows" : 1,
> "inputRowsPerSecond" : 100.0,
> "processedRowsPerSecond" : 5.347593582887701
> } ],
> "sink" : {
> "description" :
> "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
> "numOutputRows" : 1
> }
> }
> However, the observed behaviour (i.e. processed_rows_per_second is either
> None or not being updated correctly).
>
> The spark version I used for my test is 3.4
>
> Sample code uses format=rate for simulating a streaming process. You can
> test the code yourself, all in one
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import col
> from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener
> from pyspark.sql.functions import col, round, current_timestamp, lit
> import uuid
>
> def process_data(df):
>
> processed_df = df.withColumn("key", lit(str(uuid.uuid4(.\
>   withColumn("doubled_value", col("value") * 2). \
>   withColumn("op_type", lit(1)). \
>   withColumn("op_time", current_timestamp())
>
> return processed_df
>
> # Create a Spark session
> appName = "testListener"
> spark = SparkSession.builder.appName(appName).getOrCreate()
>
> # Define the schema for the streaming data
> schema = "key string timestamp timestamp, value long"
>
> # Define my listener.
> class MyListener(StreamingQueryListener):
> def onQueryStarted(self, event):
> print("onQueryStarted")
> print(f"'{event.name}' [{event.id}] got started!")
> def onQueryProgress(self, event):
> print("onQueryProgress")
> # Access micro-batch data
> microbatch_data = event.progress
> print("microbatch_data received")  # Check if data is received
> print(microbatch_data)
> processed_rows_per_second =
> microbatch_data.get("processed_rows_per_second")
> if processed_rows_per_second is not None:  # Check if value exists
>print("processed_rows_per_second retrieved")
>print(f"Processed rows per second: {processed_rows_per_second}")
> else:
>print("processed_rows_per_second not retrieved!")
> def onQueryTerminated(self, event):
> print("onQueryTerminated")
> if event.exception:
> print(f"Query terminated with exception: {event.exception}")
> else:
> print("Query successfully terminated.")
> # Add my listener.
>
> listener_instance = MyListener()
> spark.streams.addListener(listener_instance)
>
>
> # Create a streaming DataFrame with the rate source
> streaming_df = (
> spark.readStream
> .format("rate")
> .option("rowsPerSecond", 1)
> .load()
> )
>
> # Apply processing function to the streaming DataFrame
> processed_streaming_df = process_data(streaming_df)
>
> # Define the output sink (for example, console sink)
> query = (
> processed_streaming_df.select( \
>   col("key").alias("key") \
> , col("doubled_value").alias("doubled_value") \
> , col("op_type").alias("op_type") \
> , col("op_time").alias("op_time")). \
> 

Re: Bug in How to Monitor Streaming Queries in PySpark

2024-03-10 Thread
*now -> not

刘唯  于2024年3月10日周日 22:04写道:

> Have you tried using microbatch_data.get("processedRowsPerSecond")?
> Camel case now snake case
>
> Mich Talebzadeh  于2024年3月10日周日 11:46写道:
>
>>
>> There is a paper from Databricks on this subject
>>
>>
>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>>
>> But having tested it, there seems to be a bug there that I reported to
>> Databricks forum as well (in answer to a user question)
>>
>> I have come to a conclusion that this is a bug. In general there is a bug
>> in obtaining individual values from the dictionary. For example, a bug in
>> the way Spark Streaming is populating the processed_rows_per_second key
>> within the microbatch_data -> microbatch_data = event.progres dictionary or
>> any other key. I have explored various debugging steps, and even though the
>> key seems to exist, the value might not be getting set. Note that the
>> dictionary itself prints the elements correctly. This is with regard to
>> method onQueryProgress(self, event) in class
>> MyListener(StreamingQueryListener):
>>
>> For example with print(microbatch_data), you get all printed as below
>>
>> onQueryProgress
>> microbatch_data received
>> {
>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
>> "name" : null,
>> "timestamp" : "2024-03-10T09:21:27.233Z",
>> "batchId" : 21,
>> "numInputRows" : 1,
>> "inputRowsPerSecond" : 100.0,
>> "processedRowsPerSecond" : 5.347593582887701,
>> "durationMs" : {
>> "addBatch" : 37,
>> "commitOffsets" : 41,
>> "getBatch" : 0,
>> "latestOffset" : 0,
>> "queryPlanning" : 5,
>> "triggerExecution" : 187,
>> "walCommit" : 104
>> },
>> "stateOperators" : [ ],
>> "sources" : [ {
>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
>> numPartitions=default",
>> "startOffset" : 20,
>> "endOffset" : 21,
>> "latestOffset" : 21,
>> "numInputRows" : 1,
>> "inputRowsPerSecond" : 100.0,
>> "processedRowsPerSecond" : 5.347593582887701
>> } ],
>> "sink" : {
>> "description" :
>> "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
>> "numOutputRows" : 1
>> }
>> }
>> However, the observed behaviour (i.e. processed_rows_per_second is either
>> None or not being updated correctly).
>>
>> The spark version I used for my test is 3.4
>>
>> Sample code uses format=rate for simulating a streaming process. You can
>> test the code yourself, all in one
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import col
>> from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener
>> from pyspark.sql.functions import col, round, current_timestamp, lit
>> import uuid
>>
>> def process_data(df):
>>
>> processed_df = df.withColumn("key", lit(str(uuid.uuid4(.\
>>   withColumn("doubled_value", col("value") * 2). \
>>   withColumn("op_type", lit(1)). \
>>   withColumn("op_time", current_timestamp())
>>
>> return processed_df
>>
>> # Create a Spark session
>> appName = "testListener"
>> spark = SparkSession.builder.appName(appName).getOrCreate()
>>
>> # Define the schema for the streaming data
>> schema = "key string timestamp timestamp, value long"
>>
>> # Define my listener.
>> class MyListener(StreamingQueryListener):
>> def onQueryStarted(self, event):
>> print("onQueryStarted")
>> print(f"'{event.name}' [{event.id}] got started!")
>> def onQueryProgress(self, event):
>> print("onQueryProgress")
>> # Access micro-batch data
>> microbatch_data = event.progress
>> print("microbatch_data received")  # Check if data is received
>> print(microbatch_data)
>> processed_rows_per_second =
>> microbatch_data.get("processed_rows_per_second")
>> if processed_rows_per_second is not None:  # Check if value exi

Re: Bug in How to Monitor Streaming Queries in PySpark

2024-03-11 Thread
Oh I see why the confusion.

microbatch_data = event.progress

means that microbatch_data is a StreamingQueryProgress instance, it's not a
dictionary, so you should use ` microbatch_data.processedRowsPerSecond`,
instead of the `get` method which is used for dictionaries.

But weirdly, for query.lastProgress and query.recentProgress, they should
return StreamingQueryProgress  but instead they returned a dict. So the
`get` method works there.

I think PySpark should improve on this part.

Mich Talebzadeh  于2024年3月11日周一 05:51写道:

> Hi,
>
> Thank you for your advice
>
> This is the amended code
>
>def onQueryProgress(self, event):
> print("onQueryProgress")
> # Access micro-batch data
> microbatch_data = event.progress
> #print("microbatch_data received")  # Check if data is received
> #print(microbatch_data)
> #processed_rows_per_second =
> microbatch_data.get("processed_rows_per_second")
> processed_rows_per_second =
> microbatch_data.get("processedRowsPerSecond")
> print("CPC", processed_rows_per_second)
> if processed_rows_per_second is not None:  # Check if value exists
>print("ocessed_rows_per_second retrieved")
>print(f"Processed rows per second: {processed_rows_per_second}")
> else:
>print("processed_rows_per_second not retrieved!")
>
> This is the output
>
> onQueryStarted
> 'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started!
> SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
> SLF4J: Defaulting to no-operation MDCAdapter implementation.
> SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for
> further details.
> ---
> Batch: 0
> ---
> +---+-+---+---+
> |key|doubled_value|op_type|op_time|
> +---+-+---+---+
> +---+-+---+---+
>
> onQueryProgress
> ---
> Batch: 1
> ---
> ++-+---++
> | key|doubled_value|op_type| op_time|
> ++-+---++
> |a960f663-d13a-49c...|0|  1|2024-03-11 12:17:...|
> ++-+---++
>
> onQueryProgress
> ---
> Batch: 2
> ---
> ++-+---++
> | key|doubled_value|op_type| op_time|
> ++-+---++
> |a960f663-d13a-49c...|2|  1|2024-03-11 12:17:...|
> ++-+---++
>
> I am afraid it is not working. Not even printing anything
>
> Cheers
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my kno
> wledge but of course cannot be guaranteed . It is essential to note that,
> as with any advice, quote "one test result is worth one-thousand expert op
> inions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Mon, 11 Mar 2024 at 05:07, 刘唯  wrote:
>
>> *now -> not
>>
>> 刘唯  于2024年3月10日周日 22:04写道:
>>
>>> Have you tried using microbatch_data.get("processedRowsPerSecond")?
>>> Camel case now snake case
>>>
>>> Mich Talebzadeh  于2024年3月10日周日 11:46写道:
>>>
>>>>
>>>> There is a paper from Databricks on this subject
>>>>
>>>>
>>>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>>>>
>>>> But having tested it, there seems to be a bug there that I reported to
>>>> Databricks forum as well (in answer to a user question)
>>>>
>>>> I have come to a conclusion that this is a bug. In general there is a b
>>>> ug in obtaining individual values from the dictionary. For example, a b
>>>> ug in the way Spark Streaming is populating the processe
>>>> d_rows_per_sec

Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-06 Thread
This indeed looks like a bug. I will take some time to look into it.

Mich Talebzadeh  于2024年4月3日周三 01:55写道:

>
> hm. you are getting below
>
> AnalysisException: Append output mode not supported when there are
> streaming aggregations on streaming DataFrames/DataSets without watermark;
>
> The problem seems to be that you are using the append output mode when
> writing the streaming query results to Kafka. This mode is designed for
> scenarios where you want to append new data to an existing dataset at the
> sink (in this case, the "sink" topic in Kafka). However, your query
> involves a streaming aggregation: group by provinceId, window('createTime',
> '1 hour', '30 minutes'). The problem is that Spark Structured Streaming
> requires a watermark to ensure exactly-once processing when using
> aggregations with append mode. Your code already defines a watermark on the
> "createTime" column with a delay of 10 seconds (withWatermark("createTime",
> "10 seconds")). However, the error message indicates it is missing on the
> start column. Try adding watermark to "start" Column: Modify your code as
> below  to include a watermark on the "start" column generated by the
> window function:
>
> from pyspark.sql.functions import col, from_json, explode, window, sum,
> watermark
>
> streaming_df = session.readStream \
>   .format("kafka") \
>   .option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("subscribe", "payment_msg") \
>   .option("startingOffsets", "earliest") \
>   .load() \
>   .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value")) \
>   .select("parsed_value.*") \
>   .withWatermark("createTime", "10 seconds")  # Existing watermark on
> createTime
>
> *# Modified section with watermark on 'start' column*
> streaming_df = streaming_df.groupBy(
>   col("provinceId"),
>   window(col("createTime"), "1 hour", "30 minutes")
> ).agg(
>   sum(col("payAmount")).alias("totalPayAmount")
> ).withWatermark(expr("start"), "10 seconds")  # Watermark on
> window-generated 'start'
>
> # Rest of the code remains the same
> streaming_df.createOrReplaceTempView("streaming_df")
>
> spark.sql("""
> SELECT
>   window.start, window.end, provinceId, totalPayAmount
> FROM streaming_df
> ORDER BY window.start
> """) \
> .writeStream \
> .format("kafka") \
> .option("checkpointLocation", "checkpoint") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("topic", "sink") \
> .start()
>
> Try and see how it goes
>
> HTH
>
> Mich Talebzadeh,
>
> Technologist | Solutions Architect | Data Engineer  | Generative AI
>
> London
> United Kingdom
>
>
>view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner Von Braun)".
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Tue, 2 Apr 2024 at 22:43, Chloe He 
> wrote:
>
>> Hi Mich,
>>
>> Thank you so much for your response. I really appreciate your help!
>>
>> You mentioned "defining the watermark using the withWatermark function on
>> the streaming_df before creating the temporary view” - I believe this is
>> what I’m doing and it’s not working for me. Here is the exact code snippet
>> that I’m running:
>>
>> ```
>> >>> streaming_df = session.readStream\
>> .format("kafka")\
>> .option("kafka.bootstrap.servers", "localhost:9092")\
>> .option("subscribe", "payment_msg")\
>> .option("startingOffsets","earliest")\
>> .load()\
>> .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))\
>> .select("parsed_value.*")\
>> .withWatermark("createTime", "10 seconds")
>>
>> >>> streaming_df.createOrReplaceTempView("streaming_df”)
>>
>> >>> spark.sql("""
>> SELECT
>> window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
>> FROM streaming_df
>> GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>> ORDER BY window.start
>> """)\
>>   .withWatermark("start", "10 seconds")\
>>   .writeStream\
>>   .format("kafka") \
>>   .option("checkpointLocation", "checkpoint") \
>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>   .option("topic", "sink") \
>>   .start()
>>
>> AnalysisException: Append output mode not 

Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-09 Thread
Sorry this is not a bug but essentially a user error. Spark throws a really
confusing error and I'm also confused. Please see the reply in the ticket
for how to make things correct.
https://issues.apache.org/jira/browse/SPARK-47718

刘唯  于2024年4月6日周六 11:41写道:

> This indeed looks like a bug. I will take some time to look into it.
>
> Mich Talebzadeh  于2024年4月3日周三 01:55写道:
>
>>
>> hm. you are getting below
>>
>> AnalysisException: Append output mode not supported when there are
>> streaming aggregations on streaming DataFrames/DataSets without watermark;
>>
>> The problem seems to be that you are using the append output mode when
>> writing the streaming query results to Kafka. This mode is designed for
>> scenarios where you want to append new data to an existing dataset at the
>> sink (in this case, the "sink" topic in Kafka). However, your query
>> involves a streaming aggregation: group by provinceId, window('createTime',
>> '1 hour', '30 minutes'). The problem is that Spark Structured Streaming
>> requires a watermark to ensure exactly-once processing when using
>> aggregations with append mode. Your code already defines a watermark on the
>> "createTime" column with a delay of 10 seconds (withWatermark("createTime",
>> "10 seconds")). However, the error message indicates it is missing on the
>> start column. Try adding watermark to "start" Column: Modify your code as
>> below  to include a watermark on the "start" column generated by the
>> window function:
>>
>> from pyspark.sql.functions import col, from_json, explode, window, sum,
>> watermark
>>
>> streaming_df = session.readStream \
>>   .format("kafka") \
>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>   .option("subscribe", "payment_msg") \
>>   .option("startingOffsets", "earliest") \
>>   .load() \
>>   .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value")) \
>>   .select("parsed_value.*") \
>>   .withWatermark("createTime", "10 seconds")  # Existing watermark on
>> createTime
>>
>> *# Modified section with watermark on 'start' column*
>> streaming_df = streaming_df.groupBy(
>>   col("provinceId"),
>>   window(col("createTime"), "1 hour", "30 minutes")
>> ).agg(
>>   sum(col("payAmount")).alias("totalPayAmount")
>> ).withWatermark(expr("start"), "10 seconds")  # Watermark on
>> window-generated 'start'
>>
>> # Rest of the code remains the same
>> streaming_df.createOrReplaceTempView("streaming_df")
>>
>> spark.sql("""
>> SELECT
>>   window.start, window.end, provinceId, totalPayAmount
>> FROM streaming_df
>> ORDER BY window.start
>> """) \
>> .writeStream \
>> .format("kafka") \
>> .option("checkpointLocation", "checkpoint") \
>> .option("kafka.bootstrap.servers", "localhost:9092") \
>> .option("topic", "sink") \
>> .start()
>>
>> Try and see how it goes
>>
>> HTH
>>
>> Mich Talebzadeh,
>>
>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>>
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> Disclaimer: The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner Von Braun)".
>>
>> Mich Talebzadeh,
>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun