Steven,

I'm pretty sure this is a scenario that doesn't have an obvious good
solution. As you have discovered, the window API isn't much help; using a
process function does make sense. The challenge is finding a data structure
to use in keyed state that can be efficiently accessed and updated.

One option would be to use MapState, where the keys are timestamps (longs)
and the values are lists of the events with the given timestamps (or just a
count of those events, if that's sufficient). If you then use the RocksDB
state backend, you can leverage an implementation detail of that state
backend, which is that you can iterate over the entries in order, sorted by
the key (the serialized, binary key), which in the case of keys that are
longs, will do the right thing. Also, with the RocksDB state backend, you
only have to do ser/de to access and update individual entries -- and not
the entire map.

It's not exactly pretty to rely on this, and some of us have been giving
some thought to adding a temporal state type to Flink that would make these
scenarios feasible to implement efficiently on all of the state backends,
but for now, this may be the best solution.

Regards,
David

On Wed, Sep 23, 2020 at 12:42 PM Steven Murdoch <ste...@lists.murdoch.is>
wrote:

> Hello,
>
> I am trying to do something that seems like it should be quite simple but
> I haven’t found an efficient way to do this with Flink and I expect I’m
> missing something obvious here.
>
> The task is that I would like to process a sequence of events when a
> certain number appear within a keyed event-time window. There will be many
> keys but events within each keyed window will normally be quite sparse.
>
> My first guess was to use Flink’s sliding windowing functionality. However
> my concern is that events are duplicated for each window. I would like to
> be precise about timing so every event would trigger hundreds of copies of
> an event in hundreds of windows, most which are then discarded because
> there are insufficient events.
>
> My next guess was to use a process function, and maintain a queue of
> events as the state. When an event occurred I would add it to the queue and
> then remove any events which fell off the end of my window. I thought
> ListState would help here, but that appears to not allow items to be
> removed.
>
> I then thought about using a ValueState with some queue data structure.
> However my understanding is that changes to a ValueState result in the
> entire object being copied and so would be quite inefficient and best
> avoided.
>
> Finally I thought about trying to just maintain a series of timers –
> incrementing on an event and decrementing on its expiry. However I then hit
> the problem of timer coalescing. If an event occurs at the same time as its
> predecessor, the timer will not get set so the counter will get incremented
> but never decremented.
>
> What I’m doing seems like it would be a common task but none of the
> options look good, so I feel I’m missing something. Could anyone offer some
> advice on how to handle this case?
>
> Thanks in advance.
>
> Best wishes,
> Steven
>

Reply via email to