Hi, Thank you for your advice
This is the amended code def onQueryProgress(self, event): print("onQueryProgress") # Access micro-batch data microbatch_data = event.progress #print("microbatch_data received") # Check if data is received #print(microbatch_data) #processed_rows_per_second = microbatch_data.get("processed_rows_per_second") processed_rows_per_second = microbatch_data.get("processedRowsPerSecond") print("CPC", processed_rows_per_second) if processed_rows_per_second is not None: # Check if value exists print("ocessed_rows_per_second retrieved") print(f"Processed rows per second: {processed_rows_per_second}") else: print("processed_rows_per_second not retrieved!") This is the output onQueryStarted 'None' [c1a910e6-41bb-493f-b15b-7863d07ff3fe] got started! SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder". SLF4J: Defaulting to no-operation MDCAdapter implementation. SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details. ------------------------------------------- Batch: 0 ------------------------------------------- +---+-------------+-------+-------+ |key|doubled_value|op_type|op_time| +---+-------------+-------+-------+ +---+-------------+-------+-------+ onQueryProgress ------------------------------------------- Batch: 1 ------------------------------------------- +--------------------+-------------+-------+--------------------+ | key|doubled_value|op_type| op_time| +--------------------+-------------+-------+--------------------+ |a960f663-d13a-49c...| 0| 1|2024-03-11 12:17:...| +--------------------+-------------+-------+--------------------+ onQueryProgress ------------------------------------------- Batch: 2 ------------------------------------------- +--------------------+-------------+-------+--------------------+ | key|doubled_value|op_type| op_time| +--------------------+-------------+-------+--------------------+ |a960f663-d13a-49c...| 2| 1|2024-03-11 12:17:...| +--------------------+-------------+-------+--------------------+ I am afraid it is not working. Not even printing anything Cheers Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 11 Mar 2024 at 05:07, 刘唯 <z920631...@gmail.com> wrote: > *now -> not > > 刘唯 <z920631...@gmail.com> 于2024年3月10日周日 22:04写道: > >> Have you tried using microbatch_data.get("processedRowsPerSecond")? >> Camel case now snake case >> >> Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月10日周日 11:46写道: >> >>> >>> There is a paper from Databricks on this subject >>> >>> >>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html >>> >>> But having tested it, there seems to be a bug there that I reported to >>> Databricks forum as well (in answer to a user question) >>> >>> I have come to a conclusion that this is a bug. In general there is a >>> bug in obtaining individual values from the dictionary. For example, a bug >>> in the way Spark Streaming is populating the processed_rows_per_second key >>> within the microbatch_data -> microbatch_data = event.progres dictionary or >>> any other key. I have explored various debugging steps, and even though the >>> key seems to exist, the value might not be getting set. Note that the >>> dictionary itself prints the elements correctly. This is with regard to >>> method onQueryProgress(self, event) in class >>> MyListener(StreamingQueryListener): >>> >>> For example with print(microbatch_data), you get all printed as below >>> >>> onQueryProgress >>> microbatch_data received >>> { >>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66", >>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967", >>> "name" : null, >>> "timestamp" : "2024-03-10T09:21:27.233Z", >>> "batchId" : 21, >>> "numInputRows" : 1, >>> "inputRowsPerSecond" : 100.0, >>> "processedRowsPerSecond" : 5.347593582887701, >>> "durationMs" : { >>> "addBatch" : 37, >>> "commitOffsets" : 41, >>> "getBatch" : 0, >>> "latestOffset" : 0, >>> "queryPlanning" : 5, >>> "triggerExecution" : 187, >>> "walCommit" : 104 >>> }, >>> "stateOperators" : [ ], >>> "sources" : [ { >>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, >>> numPartitions=default", >>> "startOffset" : 20, >>> "endOffset" : 21, >>> "latestOffset" : 21, >>> "numInputRows" : 1, >>> "inputRowsPerSecond" : 100.0, >>> "processedRowsPerSecond" : 5.347593582887701 >>> } ], >>> "sink" : { >>> "description" : >>> "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c", >>> "numOutputRows" : 1 >>> } >>> } >>> However, the observed behaviour (i.e. processed_rows_per_second is >>> either None or not being updated correctly). >>> >>> The spark version I used for my test is 3.4 >>> >>> Sample code uses format=rate for simulating a streaming process. You can >>> test the code yourself, all in one >>> from pyspark.sql import SparkSession >>> from pyspark.sql.functions import col >>> from pyspark.sql.streaming import DataStreamWriter, >>> StreamingQueryListener >>> from pyspark.sql.functions import col, round, current_timestamp, lit >>> import uuid >>> >>> def process_data(df): >>> >>> processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\ >>> withColumn("doubled_value", col("value") * 2). \ >>> withColumn("op_type", lit(1)). \ >>> withColumn("op_time", current_timestamp()) >>> >>> return processed_df >>> >>> # Create a Spark session >>> appName = "testListener" >>> spark = SparkSession.builder.appName(appName).getOrCreate() >>> >>> # Define the schema for the streaming data >>> schema = "key string timestamp timestamp, value long" >>> >>> # Define my listener. >>> class MyListener(StreamingQueryListener): >>> def onQueryStarted(self, event): >>> print("onQueryStarted") >>> print(f"'{event.name}' [{event.id}] got started!") >>> def onQueryProgress(self, event): >>> print("onQueryProgress") >>> # Access micro-batch data >>> microbatch_data = event.progress >>> print("microbatch_data received") # Check if data is received >>> print(microbatch_data) >>> processed_rows_per_second = >>> microbatch_data.get("processed_rows_per_second") >>> if processed_rows_per_second is not None: # Check if value >>> exists >>> print("processed_rows_per_second retrieved") >>> print(f"Processed rows per second: >>> {processed_rows_per_second}") >>> else: >>> print("processed_rows_per_second not retrieved!") >>> def onQueryTerminated(self, event): >>> print("onQueryTerminated") >>> if event.exception: >>> print(f"Query terminated with exception: {event.exception}") >>> else: >>> print("Query successfully terminated.") >>> # Add my listener. >>> >>> listener_instance = MyListener() >>> spark.streams.addListener(listener_instance) >>> >>> >>> # Create a streaming DataFrame with the rate source >>> streaming_df = ( >>> spark.readStream >>> .format("rate") >>> .option("rowsPerSecond", 1) >>> .load() >>> ) >>> >>> # Apply processing function to the streaming DataFrame >>> processed_streaming_df = process_data(streaming_df) >>> >>> # Define the output sink (for example, console sink) >>> query = ( >>> processed_streaming_df.select( \ >>> col("key").alias("key") \ >>> , col("doubled_value").alias("doubled_value") \ >>> , col("op_type").alias("op_type") \ >>> , col("op_time").alias("op_time")). \ >>> writeStream.\ >>> outputMode("append").\ >>> format("console"). \ >>> start() >>> ) >>> >>> # Wait for the streaming query to terminate >>> query.awaitTermination() >>> >>> HTH >>> >>> Mich Talebzadeh, >>> Dad | Technologist | Solutions Architect | Engineer >>> London >>> United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* The information provided is correct to the best of my >>> knowledge but of course cannot be guaranteed . It is essential to note >>> that, as with any advice, quote "one test result is worth one-thousand >>> expert opinions (Werner >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>> >>