Just looking at the code

in here


ldf = ldf.groupBy("applianceName", "timeslot", "customer",

window(col("ts"), "15 minutes")) \
                    .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
                    .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
                    .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
                    .fillna(0)

What does ldf.printSchema() returns


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 10 Mar 2023 at 07:16, karan alang <karan.al...@gmail.com> wrote:

>
> Hello All -
>
> I've a structured Streaming job which has a trigger of 10 minutes, and I'm
> using watermark to account for late data coming in. However, the watermark
> is not working - and instead of a single record with total aggregated
> value, I see 2 records.
>
> Here is the code :
>
> ```
>
> 1) StructuredStreaming - Reading from Kafka every 10 mins
>
>
>         df_stream = self.spark.readStream.format('kafka') \
>             .option("kafka.security.protocol", "SSL") \
>             .option("kafka.ssl.truststore.location", 
> self.ssl_truststore_location) \
>             .option("kafka.ssl.truststore.password", 
> self.ssl_truststore_password) \
>             .option("kafka.ssl.keystore.location", 
> self.ssl_keystore_location_bandwidth_intermediate) \
>             .option("kafka.ssl.keystore.password", 
> self.ssl_keystore_password_bandwidth_intermediate) \
>             .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>             .option("subscribe", topic) \
>             .option("startingOffsets", "latest") \
>             .option("failOnDataLoss", "false") \
>             .option("kafka.metadata.max.age.ms", "1000") \
>             .option("kafka.ssl.keystore.type", "PKCS12") \
>             .option("kafka.ssl.truststore.type", "PKCS12") \
>             .load()
>
> 2. calling foreachBatch(self.process)
>         # note - outputMode is set to "update" (tried setting outputMode = 
> append as well)
>
>         # 03/09 ::: outputMode - update instead of append
>         query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", 
> "topic").writeStream \
>             .outputMode("update") \
>             .trigger(processingTime='10 minutes') \
>             .option("truncate", "false") \
>             .option("checkpointLocation", self.checkpoint) \
>             .foreachBatch(self.process) \
>             .start()
>
>
> self.process - where i do the bulk of the  processing, which calls the 
> function  'aggIntfLogs'
>
> In function aggIntfLogs - i'm using watermark of 15 mins, and doing  groupBy 
> to calculate the sum of sentOctets & recvdOctets
>
>
>         def aggIntfLogs(ldf):
>             if ldf and ldf.count() > 0:
>
>                 ldf = ldf.select('applianceName', 'timeslot', 'sentOctets', 
> 'recvdOctets','ts', 'customer') \
>                     .withColumn('sentOctets', 
> ldf["sentOctets"].cast(LongType())) \
>                     .withColumn('recvdOctets', 
> ldf["recvdOctets"].cast(LongType())) \
>                     .withWatermark("ts", "15 minutes")
>
>                 ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>                                                              
> window(col("ts"), "15 minutes")) \
>                     .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>                     .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>                     .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>                     .fillna(0)
>                 return ldf
>             return ldf
>
>
>         Dataframe 'ldf' returned from the function aggIntfLogs - is written 
> to Kafka topic
>
> ```
>
> I was expecting that using the watermark will account for late coming data
> .. i.e. the sentOctets & recvdOctets are calculated for the consolidated
> data
> (including late-coming data, since the late coming data comes within 15
> mins), however, I'm seeing 2 records for some of the data (i.e. key -
> applianceName/timeslot/customer) i.e. the aggregated data is calculated
> individually for the records and I see 2 records instead of single record
> accounting for late coming data within watermark.
>
> What needs to be done to fix this & make this work as desired?
>
> tia!
>
>
> Here is the Stackoverflow link as well -
>
>
> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>
>
>
>

Reply via email to