*now -> not

刘唯 <z920631...@gmail.com> 于2024年3月10日周日 22:04写道:

> Have you tried using microbatch_data.get("processedRowsPerSecond")?
> Camel case now snake case
>
> Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年3月10日周日 11:46写道:
>
>>
>> There is a paper from Databricks on this subject
>>
>>
>> https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
>>
>> But having tested it, there seems to be a bug there that I reported to
>> Databricks forum as well (in answer to a user question)
>>
>> I have come to a conclusion that this is a bug. In general there is a bug
>> in obtaining individual values from the dictionary. For example, a bug in
>> the way Spark Streaming is populating the processed_rows_per_second key
>> within the microbatch_data -> microbatch_data = event.progres dictionary or
>> any other key. I have explored various debugging steps, and even though the
>> key seems to exist, the value might not be getting set. Note that the
>> dictionary itself prints the elements correctly. This is with regard to
>> method onQueryProgress(self, event) in class
>> MyListener(StreamingQueryListener):
>>
>> For example with print(microbatch_data), you get all printed as below
>>
>> onQueryProgress
>> microbatch_data received
>> {
>> "id" : "941e4cb6-f4ee-41f8-b662-af6dda61dc66",
>> "runId" : "691d5eb2-140e-48c0-949a-7efbe0fa0967",
>> "name" : null,
>> "timestamp" : "2024-03-10T09:21:27.233Z",
>> "batchId" : 21,
>> "numInputRows" : 1,
>> "inputRowsPerSecond" : 100.0,
>> "processedRowsPerSecond" : 5.347593582887701,
>> "durationMs" : {
>> "addBatch" : 37,
>> "commitOffsets" : 41,
>> "getBatch" : 0,
>> "latestOffset" : 0,
>> "queryPlanning" : 5,
>> "triggerExecution" : 187,
>> "walCommit" : 104
>> },
>> "stateOperators" : [ ],
>> "sources" : [ {
>> "description" : "RateStreamV2[rowsPerSecond=1, rampUpTimeSeconds=0,
>> numPartitions=default",
>> "startOffset" : 20,
>> "endOffset" : 21,
>> "latestOffset" : 21,
>> "numInputRows" : 1,
>> "inputRowsPerSecond" : 100.0,
>> "processedRowsPerSecond" : 5.347593582887701
>> } ],
>> "sink" : {
>> "description" :
>> "org.apache.spark.sql.execution.streaming.ConsoleTable$@430a977c",
>> "numOutputRows" : 1
>> }
>> }
>> However, the observed behaviour (i.e. processed_rows_per_second is either
>> None or not being updated correctly).
>>
>> The spark version I used for my test is 3.4
>>
>> Sample code uses format=rate for simulating a streaming process. You can
>> test the code yourself, all in one
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import col
>> from pyspark.sql.streaming import DataStreamWriter, StreamingQueryListener
>> from pyspark.sql.functions import col, round, current_timestamp, lit
>> import uuid
>>
>> def process_data(df):
>>
>>     processed_df = df.withColumn("key", lit(str(uuid.uuid4()))).\
>>                       withColumn("doubled_value", col("value") * 2). \
>>                       withColumn("op_type", lit(1)). \
>>                       withColumn("op_time", current_timestamp())
>>
>>     return processed_df
>>
>> # Create a Spark session
>> appName = "testListener"
>> spark = SparkSession.builder.appName(appName).getOrCreate()
>>
>> # Define the schema for the streaming data
>> schema = "key string timestamp timestamp, value long"
>>
>> # Define my listener.
>> class MyListener(StreamingQueryListener):
>>     def onQueryStarted(self, event):
>>         print("onQueryStarted")
>>         print(f"'{event.name}' [{event.id}] got started!")
>>     def onQueryProgress(self, event):
>>         print("onQueryProgress")
>>         # Access micro-batch data
>>         microbatch_data = event.progress
>>         print("microbatch_data received")  # Check if data is received
>>         print(microbatch_data)
>>         processed_rows_per_second =
>> microbatch_data.get("processed_rows_per_second")
>>         if processed_rows_per_second is not None:  # Check if value exists
>>            print("processed_rows_per_second retrieved")
>>            print(f"Processed rows per second:
>> {processed_rows_per_second}")
>>         else:
>>            print("processed_rows_per_second not retrieved!")
>>     def onQueryTerminated(self, event):
>>         print("onQueryTerminated")
>>         if event.exception:
>>             print(f"Query terminated with exception: {event.exception}")
>>         else:
>>             print("Query successfully terminated.")
>>     # Add my listener.
>>
>> listener_instance = MyListener()
>> spark.streams.addListener(listener_instance)
>>
>>
>> # Create a streaming DataFrame with the rate source
>> streaming_df = (
>>     spark.readStream
>>     .format("rate")
>>     .option("rowsPerSecond", 1)
>>     .load()
>> )
>>
>> # Apply processing function to the streaming DataFrame
>> processed_streaming_df = process_data(streaming_df)
>>
>> # Define the output sink (for example, console sink)
>> query = (
>>     processed_streaming_df.select( \
>>                           col("key").alias("key") \
>>                         , col("doubled_value").alias("doubled_value") \
>>                         , col("op_type").alias("op_type") \
>>                         , col("op_time").alias("op_time")). \
>>                         writeStream.\
>>                         outputMode("append").\
>>                         format("console"). \
>>                         start()
>> )
>>
>> # Wait for the streaming query to terminate
>> query.awaitTermination()
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>
>

Reply via email to