OK

ts is the timestamp right?

This is a similar code that works out the average temperature with time
frame of 5 minutes.

Note the comments and catch error with try:

        try:

            # construct a streaming dataframe streamingDataFrame that
subscribes to topic temperature
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", "temperature") \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))


            resultC = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.timestamp").alias("timestamp") \
                   , col("parsed_value.temperature").alias("temperature"))

            """
            We work out the window and the AVG(temperature) in the window's
timeframe below
            This should return back the following Dataframe as struct

             root
             |-- window: struct (nullable = false)
             |    |-- start: timestamp (nullable = true)
             |    |-- end: timestamp (nullable = true)
             |-- avg(temperature): double (nullable = true)

            """
            resultM = resultC. \
                     withWatermark("timestamp", "5 minutes"). \
                     groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \
                     avg('temperature')

            # We take the above Dataframe and flatten it to get the columns
aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature"
            resultMF = resultM. \
                       select( \

F.col("window.start").alias("startOfWindowFrame") \
                          , F.col("window.end").alias("endOfWindowFrame") \
                          ,
F.col("avg(temperature)").alias("AVGTemperature"))

            resultMF.printSchema()

            result = resultMF. \
                     writeStream. \
                     outputMode('complete'). \
                     option("numRows", 1000). \
                     option("truncate", "false"). \
                     format('console'). \
                     option('checkpointLocation', checkpoint_path). \
                     queryName("temperature"). \
                     start()

        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)



HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 11 Mar 2023 at 04:33, karan alang <karan.al...@gmail.com> wrote:

> Hi Mich -
> Here is the output of the ldf.printSchema() & ldf.show() commands.
>
> ldf.printSchema()
>
> root
>  |-- applianceName: string (nullable = true)
>  |-- timeslot: long (nullable = true)
>  |-- customer: string (nullable = true)
>  |-- window: struct (nullable = false)
>  |    |-- start: timestamp (nullable = true)
>  |    |-- end: timestamp (nullable = true)
>  |-- sentOctets: long (nullable = true)
>  |-- recvdOctets: long (nullable = true)
>
>
>  ldf.show() :
>
>
>  
> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
> |applianceName     |timeslot    |customer|window
>                        |sentOctets|recvdOctets|
>
> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
> |abc1              |2797514    |cust1     |{2023-03-11 04:15:00,
> 2023-03-11 04:30:00}|21459264  |32211859   |
> |pqrq              |2797513    |cust1     |{2023-03-11 04:15:00,
> 2023-03-11 04:30:00}|17775527  |31331093   |
> |xyz                |2797514    |cust1     |{2023-03-11 04:15:00,
> 2023-03-11 04:30:00}|12808015  |24191707   |
>
> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>
> Also, any comment on the outputMode ? I've set it to 'update', since I'm
> using aggregation.
>
> thanks!
>
> On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>>
>> Just looking at the code
>>
>>
>> in here
>>
>>
>> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>                                                              
>> window(col("ts"), "15 minutes")) \
>>                     .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>                     .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>                     .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>>                     .fillna(0)
>>
>> What does ldf.printSchema() returns
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 10 Mar 2023 at 07:16, karan alang <karan.al...@gmail.com> wrote:
>>
>>>
>>> Hello All -
>>>
>>> I've a structured Streaming job which has a trigger of 10 minutes, and
>>> I'm using watermark to account for late data coming in. However, the
>>> watermark is not working - and instead of a single record with total
>>> aggregated value, I see 2 records.
>>>
>>> Here is the code :
>>>
>>> ```
>>>
>>> 1) StructuredStreaming - Reading from Kafka every 10 mins
>>>
>>>
>>>         df_stream = self.spark.readStream.format('kafka') \
>>>             .option("kafka.security.protocol", "SSL") \
>>>             .option("kafka.ssl.truststore.location", 
>>> self.ssl_truststore_location) \
>>>             .option("kafka.ssl.truststore.password", 
>>> self.ssl_truststore_password) \
>>>             .option("kafka.ssl.keystore.location", 
>>> self.ssl_keystore_location_bandwidth_intermediate) \
>>>             .option("kafka.ssl.keystore.password", 
>>> self.ssl_keystore_password_bandwidth_intermediate) \
>>>             .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>>             .option("subscribe", topic) \
>>>             .option("startingOffsets", "latest") \
>>>             .option("failOnDataLoss", "false") \
>>>             .option("kafka.metadata.max.age.ms", "1000") \
>>>             .option("kafka.ssl.keystore.type", "PKCS12") \
>>>             .option("kafka.ssl.truststore.type", "PKCS12") \
>>>             .load()
>>>
>>> 2. calling foreachBatch(self.process)
>>>         # note - outputMode is set to "update" (tried setting outputMode = 
>>> append as well)
>>>
>>>         # 03/09 ::: outputMode - update instead of append
>>>         query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", 
>>> "topic").writeStream \
>>>             .outputMode("update") \
>>>             .trigger(processingTime='10 minutes') \
>>>             .option("truncate", "false") \
>>>             .option("checkpointLocation", self.checkpoint) \
>>>             .foreachBatch(self.process) \
>>>             .start()
>>>
>>>
>>> self.process - where i do the bulk of the  processing, which calls the 
>>> function  'aggIntfLogs'
>>>
>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing  
>>> groupBy to calculate the sum of sentOctets & recvdOctets
>>>
>>>
>>>         def aggIntfLogs(ldf):
>>>             if ldf and ldf.count() > 0:
>>>
>>>                 ldf = ldf.select('applianceName', 'timeslot', 'sentOctets', 
>>> 'recvdOctets','ts', 'customer') \
>>>                     .withColumn('sentOctets', 
>>> ldf["sentOctets"].cast(LongType())) \
>>>                     .withColumn('recvdOctets', 
>>> ldf["recvdOctets"].cast(LongType())) \
>>>                     .withWatermark("ts", "15 minutes")
>>>
>>>                 ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>>                                                              
>>> window(col("ts"), "15 minutes")) \
>>>                     .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>                     .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>>                     .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>>>                     .fillna(0)
>>>                 return ldf
>>>             return ldf
>>>
>>>
>>>         Dataframe 'ldf' returned from the function aggIntfLogs - is written 
>>> to Kafka topic
>>>
>>> ```
>>>
>>> I was expecting that using the watermark will account for late coming
>>> data .. i.e. the sentOctets & recvdOctets are calculated for the
>>> consolidated data
>>> (including late-coming data, since the late coming data comes within 15
>>> mins), however, I'm seeing 2 records for some of the data (i.e. key -
>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>>> individually for the records and I see 2 records instead of single record
>>> accounting for late coming data within watermark.
>>>
>>> What needs to be done to fix this & make this work as desired?
>>>
>>> tia!
>>>
>>>
>>> Here is the Stackoverflow link as well -
>>>
>>>
>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>>
>>>
>>>
>>>

Reply via email to