Re: Efficiently processing sparse events in a time windows

2020-09-29 Thread Steven Murdoch
Thanks David, this is very helpful. I'm glad that it's not just that I had 
missed something obvious from the (generally very clear) documentation. I found 
various features that felt almost right (e.g. the priority queue behind Timers) 
but nothing that did the job. The temporal state idea does sound a very handy 
feature to have.

On Thu, 24 Sep 2020, at 08:50, David Anderson wrote:
> Steven,
> 
> I'm pretty sure this is a scenario that doesn't have an obvious good 
> solution. As you have discovered, the window API isn't much help; using a 
> process function does make sense. The challenge is finding a data structure 
> to use in keyed state that can be efficiently accessed and updated.
> 
> One option would be to use MapState, where the keys are timestamps (longs) 
> and the values are lists of the events with the given timestamps (or just a 
> count of those events, if that's sufficient). If you then use the RocksDB 
> state backend, you can leverage an implementation detail of that state 
> backend, which is that you can iterate over the entries in order, sorted by 
> the key (the serialized, binary key), which in the case of keys that are 
> longs, will do the right thing. Also, with the RocksDB state backend, you 
> only have to do ser/de to access and update individual entries -- and not the 
> entire map.
> 
> It's not exactly pretty to rely on this, and some of us have been giving some 
> thought to adding a temporal state type to Flink that would make these 
> scenarios feasible to implement efficiently on all of the state backends, but 
> for now, this may be the best solution.
> 
> Regards,
> David
> 
> On Wed, Sep 23, 2020 at 12:42 PM Steven Murdoch  
> wrote:
>> Hello,
>> 
>> I am trying to do something that seems like it should be quite simple but I 
>> haven’t found an efficient way to do this with Flink and I expect I’m 
>> missing something obvious here. 
>> 
>> The task is that I would like to process a sequence of events when a certain 
>> number appear within a keyed event-time window. There will be many keys but 
>> events within each keyed window will normally be quite sparse. 
>> 
>> My first guess was to use Flink’s sliding windowing functionality. However 
>> my concern is that events are duplicated for each window. I would like to be 
>> precise about timing so every event would trigger hundreds of copies of an 
>> event in hundreds of windows, most which are then discarded because there 
>> are insufficient events. 
>> 
>> My next guess was to use a process function, and maintain a queue of events 
>> as the state. When an event occurred I would add it to the queue and then 
>> remove any events which fell off the end of my window. I thought ListState 
>> would help here, but that appears to not allow items to be removed.
>> 
>> I then thought about using a ValueState with some queue data structure. 
>> However my understanding is that changes to a ValueState result in the 
>> entire object being copied and so would be quite inefficient and best 
>> avoided. 
>> 
>> Finally I thought about trying to just maintain a series of timers – 
>> incrementing on an event and decrementing on its expiry. However I then hit 
>> the problem of timer coalescing. If an event occurs at the same time as its 
>> predecessor, the timer will not get set so the counter will get incremented 
>> but never decremented. 
>> 
>> What I’m doing seems like it would be a common task but none of the options 
>> look good, so I feel I’m missing something. Could anyone offer some advice 
>> on how to handle this case?
>> 
>> Thanks in advance. 
>> 
>> Best wishes,
>> Steven


Efficiently processing sparse events in a time windows

2020-09-23 Thread Steven Murdoch
Hello,

I am trying to do something that seems like it should be quite simple but I 
haven’t found an efficient way to do this with Flink and I expect I’m missing 
something obvious here. 

The task is that I would like to process a sequence of events when a certain 
number appear within a keyed event-time window. There will be many keys but 
events within each keyed window will normally be quite sparse. 

My first guess was to use Flink’s sliding windowing functionality. However my 
concern is that events are duplicated for each window. I would like to be 
precise about timing so every event would trigger hundreds of copies of an 
event in hundreds of windows, most which are then discarded because there are 
insufficient events. 

My next guess was to use a process function, and maintain a queue of events as 
the state. When an event occurred I would add it to the queue and then remove 
any events which fell off the end of my window. I thought ListState would help 
here, but that appears to not allow items to be removed.

I then thought about using a ValueState with some queue data structure. However 
my understanding is that changes to a ValueState result in the entire object 
being copied and so would be quite inefficient and best avoided. 

Finally I thought about trying to just maintain a series of timers – 
incrementing on an event and decrementing on its expiry. However I then hit the 
problem of timer coalescing. If an event occurs at the same time as its 
predecessor, the timer will not get set so the counter will get incremented but 
never decremented. 

What I’m doing seems like it would be a common task but none of the options 
look good, so I feel I’m missing something. Could anyone offer some advice on 
how to handle this case?

Thanks in advance. 

Best wishes,
Steven