Hi

I want to write something in Structured streaming:

1. I have a dataset which has 3 columns: id, last_update_timestamp,
attribute
2. I am receiving the data through Kinesis

I want to deduplicate records based on last_updated. In batch, it looks
like:

spark.sql("select * from (Select *, row_number() OVER(Partition by id order
by last_updated desc) rank  from table1) tmp where rank =1")

But now I would like to do it in Structured Stream. I need to maintain the
state of id as per the highest last_updated, across the triggers, for a
certain period (24 hours).

Questions:

1. Should I use mapGroupsWithState or is there any other (SQL?) solution?
Can anyone help me to write it?
2. Is mapGroupsWithState supported in Python?

 Just to ensure we cover bases, I have already tried using dropDuplicates,
but it is keeping the 1st record encountered for an Id, not updating the
state:

unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.header.id
').alias('id'),
                          get_json_object(unpackedDF.jsonData,
'$.header.last_updated').cast('timestamp').alias('last_updated'),
                          unpackedDF.jsonData)

dedupDF =
dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
hours')


So it is not working. Any help is appreciated.

-- 
Best Regards,
Ayan Guha

Reply via email to