Hello All -

I've a structured Streaming job which has a trigger of 10 minutes, and I'm
using watermark to account for late data coming in. However, the watermark
is not working - and instead of a single record with total aggregated
value, I see 2 records.

Here is the code :

```

1) StructuredStreaming - Reading from Kafka every 10 mins


        df_stream = self.spark.readStream.format('kafka') \
            .option("kafka.security.protocol", "SSL") \
            .option("kafka.ssl.truststore.location",
self.ssl_truststore_location) \
            .option("kafka.ssl.truststore.password",
self.ssl_truststore_password) \
            .option("kafka.ssl.keystore.location",
self.ssl_keystore_location_bandwidth_intermediate) \
            .option("kafka.ssl.keystore.password",
self.ssl_keystore_password_bandwidth_intermediate) \
            .option("kafka.bootstrap.servers", self.kafkaBrokers) \
            .option("subscribe", topic) \
            .option("startingOffsets", "latest") \
            .option("failOnDataLoss", "false") \
            .option("kafka.metadata.max.age.ms", "1000") \
            .option("kafka.ssl.keystore.type", "PKCS12") \
            .option("kafka.ssl.truststore.type", "PKCS12") \
            .load()

2. calling foreachBatch(self.process)
        # note - outputMode is set to "update" (tried setting
outputMode = append as well)

        # 03/09 ::: outputMode - update instead of append
        query = df_stream.selectExpr("CAST(value AS STRING)",
"timestamp", "topic").writeStream \
            .outputMode("update") \
            .trigger(processingTime='10 minutes') \
            .option("truncate", "false") \
            .option("checkpointLocation", self.checkpoint) \
            .foreachBatch(self.process) \
            .start()


self.process - where i do the bulk of the  processing, which calls the
function  'aggIntfLogs'

In function aggIntfLogs - i'm using watermark of 15 mins, and doing
groupBy to calculate the sum of sentOctets & recvdOctets


        def aggIntfLogs(ldf):
            if ldf and ldf.count() > 0:

                ldf = ldf.select('applianceName', 'timeslot',
'sentOctets', 'recvdOctets','ts', 'customer') \
                    .withColumn('sentOctets',
ldf["sentOctets"].cast(LongType())) \
                    .withColumn('recvdOctets',
ldf["recvdOctets"].cast(LongType())) \
                    .withWatermark("ts", "15 minutes")

                ldf = ldf.groupBy("applianceName", "timeslot", "customer",

window(col("ts"), "15 minutes")) \
                    .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
                    .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
                    .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
                    .fillna(0)
                return ldf
            return ldf


        Dataframe 'ldf' returned from the function aggIntfLogs - is
written to Kafka topic

```

I was expecting that using the watermark will account for late coming data
.. i.e. the sentOctets & recvdOctets are calculated for the consolidated
data
(including late-coming data, since the late coming data comes within 15
mins), however, I'm seeing 2 records for some of the data (i.e. key -
applianceName/timeslot/customer) i.e. the aggregated data is calculated
individually for the records and I see 2 records instead of single record
accounting for late coming data within watermark.

What needs to be done to fix this & make this work as desired?

tia!


Here is the Stackoverflow link as well -

https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected

Reply via email to