*now -> not
刘唯 <[email protected]> 于2024年3月10日周日 22:04写道:
> Have you tried using microbatch_data.get("processedRowsPerSecond")?
> Camel case now snake case
>
> Mich Talebzadeh <[email protected]> 于2024年3月10日周日 11:46写道:
>
>>
>> There is a paper from Databricks on this subject
>>
>>
>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>>
>> But having tested it, there seems to be a bug there that I reported to
>> Databricks forum as well (in answer to a user question)
>>
>> I have come to a conclusion that this is a bug. In general there is a bug
>> in obtaining individual values from the dictionary. For example, a bug in
>> the way Spark Streaming is populating the processed_rows_per_second key
>> within the microbatch_data -> microbatch_data = event.progres dictionary or
>> any other key. I have explored various debugging steps, and even though the
>> key seems to exist, the value might not be getting set. Note that the
>> dictionary itself prints the elements correctly. This is with regard to
>> method onQueryProgress(self, event) in class
>> MyListener(StreamingQueryListener):
>>
>> For example with print(microbatch_data), you get all printed as below
>>
>> onQueryProgress
>> microbatch_data received
>> {
>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
>> "name" : null,
>> "timestamp" : "2024-03-10T09:21:27.233Z",
>> "batchId" : 21,
>> "numInputRows" : 1,
>> "inputRowsPerSecond" : 100.0,
>> "processedRowsPerSecond" : 5.347593582887701,
>> "durationMs" : {
>> "addBatch" : 37,
>> "commitOffsets" : 41,
>> "getBatch" : 0,
>> "latestOffset" : 0,
>> "queryPlanning" : 5,
>> "triggerExecution" : 187,
>> "walCommit" : 104
>> },
>> "stateOperators" : [ ],
>> "sources" : [ {
>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
>> numPartitions=default",
>> "startOffset" : 20,
>> "endOffset" : 21,
>> "latestOffset" : 21,
>> "numInputRows" : 1,
>> "inputRowsPerSecond" : 100.0,
>> "processedRowsPerSecond" : 5.347593582887701
>> } ],
>> "sink" : {
>> "description" :
>> "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
>> "numOutputRows" : 1
>> }
>> }
>> However, the observed behaviour (i.e. processed_rows_per_second is either
>> None or not being updated correctly).
>>
>> The spark version I used for my test is 3.4
>>
>> Sample code uses format=rate for simulating a streaming process. You can
>> test the code yourself, all in one
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import col
>> from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener
>> from pyspark.sql.functions import col, round, current_timestamp, lit
>> import uuid
>>
>> def process_data(df):
>>
>> processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\
>> withColumn("doubled_value", col("value") * 2). \
>> withColumn("op_type", lit(1)). \
>> withColumn("op_time", current_timestamp())
>>
>> return processed_df
>>
>> # Create a Spark session
>> appName = "testListener"
>> spark = SparkSession.builder.appName(appName).getOrCreate()
>>
>> # Define the schema for the streaming data
>> schema = "key string timestamp timestamp, value long"
>>
>> # Define my listener.
>> class MyListener(StreamingQueryListener):
>> def onQueryStarted(self, event):
>> print("onQueryStarted")
>> print(f"'{event.name}' [{event.id}] got started!")
>> def onQueryProgress(self, event):
>> print("onQueryProgress")
>> # Access micro-batch data
>> microbatch_data = event.progress
>> print("microbatch_data received") # Check if data is received
>> print(microbatch_data)
>> processed_rows_per_second =
>> microbatch_data.get("processed_rows_per_second")
>> if processed_rows_per_second is not None: # Check if value exists
>> print("processed_rows_per_second retrieved")
>> print(f"Processed rows per second:
>> {processed_rows_per_second}")
>> else:
>> print("processed_rows_per_second not retrieved!")
>> def onQueryTerminated(self, event):
>> print("onQueryTerminated")
>> if event.exception:
>> print(f"Query terminated with exception: {event.exception}")
>> else:
>> print("Query successfully terminated.")
>> # Add my listener.
>>
>> listener_instance = MyListener()
>> spark.streams.addListener(listener_instance)
>>
>>
>> # Create a streaming DataFrame with the rate source
>> streaming_df = (
>> spark.readStream
>> .format("rate")
>> .option("rowsPerSecond", 1)
>> .load()
>> )
>>
>> # Apply processing function to the streaming DataFrame
>> processed_streaming_df = process_data(streaming_df)
>>
>> # Define the output sink (for example, console sink)
>> query = (
>> processed_streaming_df.select( \
>> col("key").alias("key") \
>> , col("doubled_value").alias("doubled_value") \
>> , col("op_type").alias("op_type") \
>> , col("op_time").alias("op_time")). \
>> writeStream.\
>> outputMode("append").\
>> format("console"). \
>> start()
>> )
>>
>> # Wait for the streaming query to terminate
>> query.awaitTermination()
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>> view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>> https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>
>