Hi Mich - Here is the output of the ldf.printSchema() & ldf.show() commands.
ldf.printSchema() root |-- applianceName: string (nullable = true) |-- timeslot: long (nullable = true) |-- customer: string (nullable = true) |-- window: struct (nullable = false) | |-- start: timestamp (nullable = true) | |-- end: timestamp (nullable = true) |-- sentOctets: long (nullable = true) |-- recvdOctets: long (nullable = true) ldf.show() : +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ |applianceName |timeslot |customer|window |sentOctets|recvdOctets| +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ |abc1 |2797514 |cust1 |{2023-03-11 04:15:00, 2023-03-11 04:30:00}|21459264 |32211859 | |pqrq |2797513 |cust1 |{2023-03-11 04:15:00, 2023-03-11 04:30:00}|17775527 |31331093 | |xyz |2797514 |cust1 |{2023-03-11 04:15:00, 2023-03-11 04:30:00}|12808015 |24191707 | +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ Also, any comment on the outputMode ? I've set it to 'update', since I'm using aggregation. thanks! On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > > Just looking at the code > > > in here > > > ldf = ldf.groupBy("applianceName", "timeslot", "customer", > > window(col("ts"), "15 minutes")) \ > .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ > .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ > .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \ > .fillna(0) > > What does ldf.printSchema() returns > > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 10 Mar 2023 at 07:16, karan alang <karan.al...@gmail.com> wrote: > >> >> Hello All - >> >> I've a structured Streaming job which has a trigger of 10 minutes, and >> I'm using watermark to account for late data coming in. However, the >> watermark is not working - and instead of a single record with total >> aggregated value, I see 2 records. >> >> Here is the code : >> >> ``` >> >> 1) StructuredStreaming - Reading from Kafka every 10 mins >> >> >> df_stream = self.spark.readStream.format('kafka') \ >> .option("kafka.security.protocol", "SSL") \ >> .option("kafka.ssl.truststore.location", >> self.ssl_truststore_location) \ >> .option("kafka.ssl.truststore.password", >> self.ssl_truststore_password) \ >> .option("kafka.ssl.keystore.location", >> self.ssl_keystore_location_bandwidth_intermediate) \ >> .option("kafka.ssl.keystore.password", >> self.ssl_keystore_password_bandwidth_intermediate) \ >> .option("kafka.bootstrap.servers", self.kafkaBrokers) \ >> .option("subscribe", topic) \ >> .option("startingOffsets", "latest") \ >> .option("failOnDataLoss", "false") \ >> .option("kafka.metadata.max.age.ms", "1000") \ >> .option("kafka.ssl.keystore.type", "PKCS12") \ >> .option("kafka.ssl.truststore.type", "PKCS12") \ >> .load() >> >> 2. calling foreachBatch(self.process) >> # note - outputMode is set to "update" (tried setting outputMode = >> append as well) >> >> # 03/09 ::: outputMode - update instead of append >> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", >> "topic").writeStream \ >> .outputMode("update") \ >> .trigger(processingTime='10 minutes') \ >> .option("truncate", "false") \ >> .option("checkpointLocation", self.checkpoint) \ >> .foreachBatch(self.process) \ >> .start() >> >> >> self.process - where i do the bulk of the processing, which calls the >> function 'aggIntfLogs' >> >> In function aggIntfLogs - i'm using watermark of 15 mins, and doing groupBy >> to calculate the sum of sentOctets & recvdOctets >> >> >> def aggIntfLogs(ldf): >> if ldf and ldf.count() > 0: >> >> ldf = ldf.select('applianceName', 'timeslot', 'sentOctets', >> 'recvdOctets','ts', 'customer') \ >> .withColumn('sentOctets', >> ldf["sentOctets"].cast(LongType())) \ >> .withColumn('recvdOctets', >> ldf["recvdOctets"].cast(LongType())) \ >> .withWatermark("ts", "15 minutes") >> >> ldf = ldf.groupBy("applianceName", "timeslot", "customer", >> >> window(col("ts"), "15 minutes")) \ >> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ >> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \ >> .fillna(0) >> return ldf >> return ldf >> >> >> Dataframe 'ldf' returned from the function aggIntfLogs - is written >> to Kafka topic >> >> ``` >> >> I was expecting that using the watermark will account for late coming >> data .. i.e. the sentOctets & recvdOctets are calculated for the >> consolidated data >> (including late-coming data, since the late coming data comes within 15 >> mins), however, I'm seeing 2 records for some of the data (i.e. key - >> applianceName/timeslot/customer) i.e. the aggregated data is calculated >> individually for the records and I see 2 records instead of single record >> accounting for late coming data within watermark. >> >> What needs to be done to fix this & make this work as desired? >> >> tia! >> >> >> Here is the Stackoverflow link as well - >> >> >> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected >> >> >> >>