Hello All - I've a structured Streaming job which has a trigger of 10 minutes, and I'm using watermark to account for late data coming in. However, the watermark is not working - and instead of a single record with total aggregated value, I see 2 records.
Here is the code : ``` 1) StructuredStreaming - Reading from Kafka every 10 mins df_stream = self.spark.readStream.format('kafka') \ .option("kafka.security.protocol", "SSL") \ .option("kafka.ssl.truststore.location", self.ssl_truststore_location) \ .option("kafka.ssl.truststore.password", self.ssl_truststore_password) \ .option("kafka.ssl.keystore.location", self.ssl_keystore_location_bandwidth_intermediate) \ .option("kafka.ssl.keystore.password", self.ssl_keystore_password_bandwidth_intermediate) \ .option("kafka.bootstrap.servers", self.kafkaBrokers) \ .option("subscribe", topic) \ .option("startingOffsets", "latest") \ .option("failOnDataLoss", "false") \ .option("kafka.metadata.max.age.ms", "1000") \ .option("kafka.ssl.keystore.type", "PKCS12") \ .option("kafka.ssl.truststore.type", "PKCS12") \ .load() 2. calling foreachBatch(self.process) # note - outputMode is set to "update" (tried setting outputMode = append as well) # 03/09 ::: outputMode - update instead of append query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", "topic").writeStream \ .outputMode("update") \ .trigger(processingTime='10 minutes') \ .option("truncate", "false") \ .option("checkpointLocation", self.checkpoint) \ .foreachBatch(self.process) \ .start() self.process - where i do the bulk of the processing, which calls the function 'aggIntfLogs' In function aggIntfLogs - i'm using watermark of 15 mins, and doing groupBy to calculate the sum of sentOctets & recvdOctets def aggIntfLogs(ldf): if ldf and ldf.count() > 0: ldf = ldf.select('applianceName', 'timeslot', 'sentOctets', 'recvdOctets','ts', 'customer') \ .withColumn('sentOctets', ldf["sentOctets"].cast(LongType())) \ .withColumn('recvdOctets', ldf["recvdOctets"].cast(LongType())) \ .withWatermark("ts", "15 minutes") ldf = ldf.groupBy("applianceName", "timeslot", "customer", window(col("ts"), "15 minutes")) \ .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \ .fillna(0) return ldf return ldf Dataframe 'ldf' returned from the function aggIntfLogs - is written to Kafka topic ``` I was expecting that using the watermark will account for late coming data .. i.e. the sentOctets & recvdOctets are calculated for the consolidated data (including late-coming data, since the late coming data comes within 15 mins), however, I'm seeing 2 records for some of the data (i.e. key - applianceName/timeslot/customer) i.e. the aggregated data is calculated individually for the records and I see 2 records instead of single record accounting for late coming data within watermark. What needs to be done to fix this & make this work as desired? tia! Here is the Stackoverflow link as well - https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected