Thanks David, this is very helpful. I'm glad that it's not just that I had
missed something obvious from the (generally very clear) documentation. I found
various features that felt almost right (e.g. the priority queue behind Timers)
but nothing that did the job. The temporal state idea does sound a very handy
feature to have.
On Thu, 24 Sep 2020, at 08:50, David Anderson wrote:
> Steven,
>
> I'm pretty sure this is a scenario that doesn't have an obvious good
> solution. As you have discovered, the window API isn't much help; using a
> process function does make sense. The challenge is finding a data structure
> to use in keyed state that can be efficiently accessed and updated.
>
> One option would be to use MapState, where the keys are timestamps (longs)
> and the values are lists of the events with the given timestamps (or just a
> count of those events, if that's sufficient). If you then use the RocksDB
> state backend, you can leverage an implementation detail of that state
> backend, which is that you can iterate over the entries in order, sorted by
> the key (the serialized, binary key), which in the case of keys that are
> longs, will do the right thing. Also, with the RocksDB state backend, you
> only have to do ser/de to access and update individual entries -- and not the
> entire map.
>
> It's not exactly pretty to rely on this, and some of us have been giving some
> thought to adding a temporal state type to Flink that would make these
> scenarios feasible to implement efficiently on all of the state backends, but
> for now, this may be the best solution.
>
> Regards,
> David
>
> On Wed, Sep 23, 2020 at 12:42 PM Steven Murdoch
> wrote:
>> Hello,
>>
>> I am trying to do something that seems like it should be quite simple but I
>> haven’t found an efficient way to do this with Flink and I expect I’m
>> missing something obvious here.
>>
>> The task is that I would like to process a sequence of events when a certain
>> number appear within a keyed event-time window. There will be many keys but
>> events within each keyed window will normally be quite sparse.
>>
>> My first guess was to use Flink’s sliding windowing functionality. However
>> my concern is that events are duplicated for each window. I would like to be
>> precise about timing so every event would trigger hundreds of copies of an
>> event in hundreds of windows, most which are then discarded because there
>> are insufficient events.
>>
>> My next guess was to use a process function, and maintain a queue of events
>> as the state. When an event occurred I would add it to the queue and then
>> remove any events which fell off the end of my window. I thought ListState
>> would help here, but that appears to not allow items to be removed.
>>
>> I then thought about using a ValueState with some queue data structure.
>> However my understanding is that changes to a ValueState result in the
>> entire object being copied and so would be quite inefficient and best
>> avoided.
>>
>> Finally I thought about trying to just maintain a series of timers –
>> incrementing on an event and decrementing on its expiry. However I then hit
>> the problem of timer coalescing. If an event occurs at the same time as its
>> predecessor, the timer will not get set so the counter will get incremented
>> but never decremented.
>>
>> What I’m doing seems like it would be a common task but none of the options
>> look good, so I feel I’m missing something. Could anyone offer some advice
>> on how to handle this case?
>>
>> Thanks in advance.
>>
>> Best wishes,
>> Steven