Hi, what about use "Top1 + Agg" or "UDAF" for your scene.
The main idea I think is that when the event changed from status A to C, Flink needs to send a `DELETE` data to downstream to delete the old data and send a new one to downstream again. And `TOP1` will keep the newest one with same `Id`. Please tell me if this plan works. -- Best! Xuyang At 2022-06-14 01:55:14, "Dheeraj Taneja" <dheerajtaneja...@gmail.com> wrote: Hello, I have a stream of events that are coming over Kafka. Each event progresses through a series of statuses. I want to display aggregated output of how many events are in a particular status. If suppose an event has progressed from status A to Status C then that event needs to be only counted for the last status that it was in. Below is an example data and expected output. What is the most effective way of doing this in Flink? Sample Data Event1: Event(Id1, statusA, 2022-06-09T16:15:08Z) Event2: Event(Id2, statusA, 2022-06-09T16:20:08Z) Event3: Event(Id1, statusB, 2022-06-09T16:25:08Z) Event4: Event(id1, statusC, 2022-06-09T16:26:08Z) Event4: Event(id3, statusC, 2022-06-09T16:30:08Z) Outcome Status A - 1 (Id2) Status B - 0 (none) Status C - 2 (Id1 & Id3)