Have you tried using microbatch_data.get("processedRowsPerSecond")? Camel case now snake case
Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月10日周日 11:46写道: > > There is a paper from Databricks on this subject > > > https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html > > But having tested it, there seems to be a bug there that I reported to > Databricks forum as well (in answer to a user question) > > I have come to a conclusion that this is a bug. In general there is a bug > in obtaining individual values from the dictionary. For example, a bug in > the way Spark Streaming is populating the processed_rows_per_second key > within the microbatch_data -> microbatch_data = event.progres dictionary or > any other key. I have explored various debugging steps, and even though the > key seems to exist, the value might not be getting set. Note that the > dictionary itself prints the elements correctly. This is with regard to > method onQueryProgress(self, event) in class > MyListener(StreamingQueryListener): > > For example with print(microbatch_data), you get all printed as below > > onQueryProgress > microbatch_data received > { > "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66", > "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967", > "name" : null, > "timestamp" : "2024-03-10T09:21:27.233Z", > "batchId" : 21, > "numInputRows" : 1, > "inputRowsPerSecond" : 100.0, > "processedRowsPerSecond" : 5.347593582887701, > "durationMs" : { > "addBatch" : 37, > "commitOffsets" : 41, > "getBatch" : 0, > "latestOffset" : 0, > "queryPlanning" : 5, > "triggerExecution" : 187, > "walCommit" : 104 > }, > "stateOperators" : [ ], > "sources" : [ { > "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, > numPartitions=default", > "startOffset" : 20, > "endOffset" : 21, > "latestOffset" : 21, > "numInputRows" : 1, > "inputRowsPerSecond" : 100.0, > "processedRowsPerSecond" : 5.347593582887701 > } ], > "sink" : { > "description" : > "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c", > "numOutputRows" : 1 > } > } > However, the observed behaviour (i.e. processed_rows_per_second is either > None or not being updated correctly). > > The spark version I used for my test is 3.4 > > Sample code uses format=rate for simulating a streaming process. You can > test the code yourself, all in one > from pyspark.sql import SparkSession > from pyspark.sql.functions import col > from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener > from pyspark.sql.functions import col, round, current_timestamp, lit > import uuid > > def process_data(df): > > processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\ > withColumn("doubled_value", col("value") * 2). \ > withColumn("op_type", lit(1)). \ > withColumn("op_time", current_timestamp()) > > return processed_df > > # Create a Spark session > appName = "testListener" > spark = SparkSession.builder.appName(appName).getOrCreate() > > # Define the schema for the streaming data > schema = "key string timestamp timestamp, value long" > > # Define my listener. > class MyListener(StreamingQueryListener): > def onQueryStarted(self, event): > print("onQueryStarted") > print(f"'{event.name}' [{event.id}] got started!") > def onQueryProgress(self, event): > print("onQueryProgress") > # Access micro-batch data > microbatch_data = event.progress > print("microbatch_data received") # Check if data is received > print(microbatch_data) > processed_rows_per_second = > microbatch_data.get("processed_rows_per_second") > if processed_rows_per_second is not None: # Check if value exists > print("processed_rows_per_second retrieved") > print(f"Processed rows per second: {processed_rows_per_second}") > else: > print("processed_rows_per_second not retrieved!") > def onQueryTerminated(self, event): > print("onQueryTerminated") > if event.exception: > print(f"Query terminated with exception: {event.exception}") > else: > print("Query successfully terminated.") > # Add my listener. > > listener_instance = MyListener() > spark.streams.addListener(listener_instance) > > > # Create a streaming DataFrame with the rate source > streaming_df = ( > spark.readStream > .format("rate") > .option("rowsPerSecond", 1) > .load() > ) > > # Apply processing function to the streaming DataFrame > processed_streaming_df = process_data(streaming_df) > > # Define the output sink (for example, console sink) > query = ( > processed_streaming_df.select( \ > col("key").alias("key") \ > , col("doubled_value").alias("doubled_value") \ > , col("op_type").alias("op_type") \ > , col("op_time").alias("op_time")). \ > writeStream.\ > outputMode("append").\ > format("console"). \ > start() > ) > > # Wait for the streaming query to terminate > query.awaitTermination() > > HTH > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >