Hello All -
I've a structured Streaming job which has a trigger of 10 minutes, and I'm
using watermark to account for late data coming in. However, the watermark
is not working - and instead of a single record with total aggregated
value, I see 2 records.
Here is the code :
```
1) StructuredStreaming - Reading from Kafka every 10 mins
df_stream = self.spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location",
self.ssl_truststore_location) \
.option("kafka.ssl.truststore.password",
self.ssl_truststore_password) \
.option("kafka.ssl.keystore.location",
self.ssl_keystore_location_bandwidth_intermediate) \
.option("kafka.ssl.keystore.password",
self.ssl_keystore_password_bandwidth_intermediate) \
.option("kafka.bootstrap.servers", self.kafkaBrokers) \
.option("subscribe", topic) \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("kafka.metadata.max.age.ms", "1000") \
.option("kafka.ssl.keystore.type", "PKCS12") \
.option("kafka.ssl.truststore.type", "PKCS12") \
.load()
2. calling foreachBatch(self.process)
# note - outputMode is set to "update" (tried setting
outputMode = append as well)
# 03/09 ::: outputMode - update instead of append
query = df_stream.selectExpr("CAST(value AS STRING)",
"timestamp", "topic").writeStream \
.outputMode("update") \
.trigger(processingTime='10 minutes') \
.option("truncate", "false") \
.option("checkpointLocation", self.checkpoint) \
.foreachBatch(self.process) \
.start()
self.process - where i do the bulk of the processing, which calls the
function 'aggIntfLogs'
In function aggIntfLogs - i'm using watermark of 15 mins, and doing
groupBy to calculate the sum of sentOctets & recvdOctets
def aggIntfLogs(ldf):
if ldf and ldf.count() > 0:
ldf = ldf.select('applianceName', 'timeslot',
'sentOctets', 'recvdOctets','ts', 'customer') \
.withColumn('sentOctets',
ldf["sentOctets"].cast(LongType())) \
.withColumn('recvdOctets',
ldf["recvdOctets"].cast(LongType())) \
.withWatermark("ts", "15 minutes")
ldf = ldf.groupBy("applianceName", "timeslot", "customer",
window(col("ts"), "15 minutes")) \
.agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
.withColumnRenamed('sum(sentOctets)', 'sentOctets') \
.withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
.fillna(0)
return ldf
return ldf
Dataframe 'ldf' returned from the function aggIntfLogs - is
written to Kafka topic
```
I was expecting that using the watermark will account for late coming data
.. i.e. the sentOctets & recvdOctets are calculated for the consolidated
data
(including late-coming data, since the late coming data comes within 15
mins), however, I'm seeing 2 records for some of the data (i.e. key -
applianceName/timeslot/customer) i.e. the aggregated data is calculated
individually for the records and I see 2 records instead of single record
accounting for late coming data within watermark.
What needs to be done to fix this & make this work as desired?
tia!
Here is the Stackoverflow link as well -
https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected