Hi I want to write something in Structured streaming:
1. I have a dataset which has 3 columns: id, last_update_timestamp, attribute 2. I am receiving the data through Kinesis I want to deduplicate records based on last_updated. In batch, it looks like: spark.sql("select * from (Select *, row_number() OVER(Partition by id order by last_updated desc) rank from table1) tmp where rank =1") But now I would like to do it in Structured Stream. I need to maintain the state of id as per the highest last_updated, across the triggers, for a certain period (24 hours). Questions: 1. Should I use mapGroupsWithState or is there any other (SQL?) solution? Can anyone help me to write it? 2. Is mapGroupsWithState supported in Python? Just to ensure we cover bases, I have already tried using dropDuplicates, but it is keeping the 1st record encountered for an Id, not updating the state: unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData") dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.header.id ').alias('id'), get_json_object(unpackedDF.jsonData, '$.header.last_updated').cast('timestamp').alias('last_updated'), unpackedDF.jsonData) dedupDF = dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24 hours') So it is not working. Any help is appreciated. -- Best Regards, Ayan Guha