Have you tried using microbatch_data.get("processedRowsPerSecond")?
Camel case now snake case

Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月10日周日 11:46写道:

>
> There is a paper from Databricks on this subject
>
>
> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>
> But having tested it, there seems to be a bug there that I reported to
> Databricks forum as well (in answer to a user question)
>
> I have come to a conclusion that this is a bug. In general there is a bug
> in obtaining individual values from the dictionary. For example, a bug in
> the way Spark Streaming is populating the processed_rows_per_second key
> within the microbatch_data -> microbatch_data = event.progres dictionary or
> any other key. I have explored various debugging steps, and even though the
> key seems to exist, the value might not be getting set. Note that the
> dictionary itself prints the elements correctly. This is with regard to
> method onQueryProgress(self, event) in class
> MyListener(StreamingQueryListener):
>
> For example with print(microbatch_data), you get all printed as below
>
> onQueryProgress
> microbatch_data received
> {
> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
> "name" : null,
> "timestamp" : "2024-03-10T09:21:27.233Z",
> "batchId" : 21,
> "numInputRows" : 1,
> "inputRowsPerSecond" : 100.0,
> "processedRowsPerSecond" : 5.347593582887701,
> "durationMs" : {
> "addBatch" : 37,
> "commitOffsets" : 41,
> "getBatch" : 0,
> "latestOffset" : 0,
> "queryPlanning" : 5,
> "triggerExecution" : 187,
> "walCommit" : 104
> },
> "stateOperators" : [ ],
> "sources" : [ {
> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
> numPartitions=default",
> "startOffset" : 20,
> "endOffset" : 21,
> "latestOffset" : 21,
> "numInputRows" : 1,
> "inputRowsPerSecond" : 100.0,
> "processedRowsPerSecond" : 5.347593582887701
> } ],
> "sink" : {
> "description" :
> "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
> "numOutputRows" : 1
> }
> }
> However, the observed behaviour (i.e. processed_rows_per_second is either
> None or not being updated correctly).
>
> The spark version I used for my test is 3.4
>
> Sample code uses format=rate for simulating a streaming process. You can
> test the code yourself, all in one
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import col
> from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener
> from pyspark.sql.functions import col, round, current_timestamp, lit
> import uuid
>
> def process_data(df):
>
>     processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\
>                       withColumn("doubled_value", col("value") * 2). \
>                       withColumn("op_type", lit(1)). \
>                       withColumn("op_time", current_timestamp())
>
>     return processed_df
>
> # Create a Spark session
> appName = "testListener"
> spark = SparkSession.builder.appName(appName).getOrCreate()
>
> # Define the schema for the streaming data
> schema = "key string timestamp timestamp, value long"
>
> # Define my listener.
> class MyListener(StreamingQueryListener):
>     def onQueryStarted(self, event):
>         print("onQueryStarted")
>         print(f"'{event.name}' [{event.id}] got started!")
>     def onQueryProgress(self, event):
>         print("onQueryProgress")
>         # Access micro-batch data
>         microbatch_data = event.progress
>         print("microbatch_data received")  # Check if data is received
>         print(microbatch_data)
>         processed_rows_per_second =
> microbatch_data.get("processed_rows_per_second")
>         if processed_rows_per_second is not None:  # Check if value exists
>            print("processed_rows_per_second retrieved")
>            print(f"Processed rows per second: {processed_rows_per_second}")
>         else:
>            print("processed_rows_per_second not retrieved!")
>     def onQueryTerminated(self, event):
>         print("onQueryTerminated")
>         if event.exception:
>             print(f"Query terminated with exception: {event.exception}")
>         else:
>             print("Query successfully terminated.")
>     # Add my listener.
>
> listener_instance = MyListener()
> spark.streams.addListener(listener_instance)
>
>
> # Create a streaming DataFrame with the rate source
> streaming_df = (
>     spark.readStream
>     .format("rate")
>     .option("rowsPerSecond", 1)
>     .load()
> )
>
> # Apply processing function to the streaming DataFrame
> processed_streaming_df = process_data(streaming_df)
>
> # Define the output sink (for example, console sink)
> query = (
>     processed_streaming_df.select( \
>                           col("key").alias("key") \
>                         , col("doubled_value").alias("doubled_value") \
>                         , col("op_type").alias("op_type") \
>                         , col("op_time").alias("op_time")). \
>                         writeStream.\
>                         outputMode("append").\
>                         format("console"). \
>                         start()
> )
>
> # Wait for the streaming query to terminate
> query.awaitTermination()
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>

Reply via email to