*now -> not 刘唯 <z920631...@gmail.com> 于2024年3月10日周日 22:04写道:
> Have you tried using microbatch_data.get("processedRowsPerSecond")? > Camel case now snake case > > Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月10日周日 11:46写道: > >> >> There is a paper from Databricks on this subject >> >> >> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html >> >> But having tested it, there seems to be a bug there that I reported to >> Databricks forum as well (in answer to a user question) >> >> I have come to a conclusion that this is a bug. In general there is a bug >> in obtaining individual values from the dictionary. For example, a bug in >> the way Spark Streaming is populating the processed_rows_per_second key >> within the microbatch_data -> microbatch_data = event.progres dictionary or >> any other key. I have explored various debugging steps, and even though the >> key seems to exist, the value might not be getting set. Note that the >> dictionary itself prints the elements correctly. This is with regard to >> method onQueryProgress(self, event) in class >> MyListener(StreamingQueryListener): >> >> For example with print(microbatch_data), you get all printed as below >> >> onQueryProgress >> microbatch_data received >> { >> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66", >> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967", >> "name" : null, >> "timestamp" : "2024-03-10T09:21:27.233Z", >> "batchId" : 21, >> "numInputRows" : 1, >> "inputRowsPerSecond" : 100.0, >> "processedRowsPerSecond" : 5.347593582887701, >> "durationMs" : { >> "addBatch" : 37, >> "commitOffsets" : 41, >> "getBatch" : 0, >> "latestOffset" : 0, >> "queryPlanning" : 5, >> "triggerExecution" : 187, >> "walCommit" : 104 >> }, >> "stateOperators" : [ ], >> "sources" : [ { >> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0, >> numPartitions=default", >> "startOffset" : 20, >> "endOffset" : 21, >> "latestOffset" : 21, >> "numInputRows" : 1, >> "inputRowsPerSecond" : 100.0, >> "processedRowsPerSecond" : 5.347593582887701 >> } ], >> "sink" : { >> "description" : >> "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c", >> "numOutputRows" : 1 >> } >> } >> However, the observed behaviour (i.e. processed_rows_per_second is either >> None or not being updated correctly). >> >> The spark version I used for my test is 3.4 >> >> Sample code uses format=rate for simulating a streaming process. You can >> test the code yourself, all in one >> from pyspark.sql import SparkSession >> from pyspark.sql.functions import col >> from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener >> from pyspark.sql.functions import col, round, current_timestamp, lit >> import uuid >> >> def process_data(df): >> >> processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\ >> withColumn("doubled_value", col("value") * 2). \ >> withColumn("op_type", lit(1)). \ >> withColumn("op_time", current_timestamp()) >> >> return processed_df >> >> # Create a Spark session >> appName = "testListener" >> spark = SparkSession.builder.appName(appName).getOrCreate() >> >> # Define the schema for the streaming data >> schema = "key string timestamp timestamp, value long" >> >> # Define my listener. >> class MyListener(StreamingQueryListener): >> def onQueryStarted(self, event): >> print("onQueryStarted") >> print(f"'{event.name}' [{event.id}] got started!") >> def onQueryProgress(self, event): >> print("onQueryProgress") >> # Access micro-batch data >> microbatch_data = event.progress >> print("microbatch_data received") # Check if data is received >> print(microbatch_data) >> processed_rows_per_second = >> microbatch_data.get("processed_rows_per_second") >> if processed_rows_per_second is not None: # Check if value exists >> print("processed_rows_per_second retrieved") >> print(f"Processed rows per second: >> {processed_rows_per_second}") >> else: >> print("processed_rows_per_second not retrieved!") >> def onQueryTerminated(self, event): >> print("onQueryTerminated") >> if event.exception: >> print(f"Query terminated with exception: {event.exception}") >> else: >> print("Query successfully terminated.") >> # Add my listener. >> >> listener_instance = MyListener() >> spark.streams.addListener(listener_instance) >> >> >> # Create a streaming DataFrame with the rate source >> streaming_df = ( >> spark.readStream >> .format("rate") >> .option("rowsPerSecond", 1) >> .load() >> ) >> >> # Apply processing function to the streaming DataFrame >> processed_streaming_df = process_data(streaming_df) >> >> # Define the output sink (for example, console sink) >> query = ( >> processed_streaming_df.select( \ >> col("key").alias("key") \ >> , col("doubled_value").alias("doubled_value") \ >> , col("op_type").alias("op_type") \ >> , col("op_time").alias("op_time")). \ >> writeStream.\ >> outputMode("append").\ >> format("console"). \ >> start() >> ) >> >> # Wait for the streaming query to terminate >> query.awaitTermination() >> >> HTH >> >> Mich Talebzadeh, >> Dad | Technologist | Solutions Architect | Engineer >> London >> United Kingdom >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >> >