Hi Yassine,

have you thought about using a ListState? As far as I know, it keeps at least the insertion order. You could sort it once your trigger event has arrived. If you use a RocksDB as state backend, 100+ GB of state should not be a problem. Have you thought about using Flink's CEP library? It might fit to your needs without implementing a custom process function.

I hope that helps.

Timo


Am 07/03/17 um 19:23 schrieb Yassine MARZOUGUI:
Hi all,

I want to label events in a stream based on a condition on some future events. For example my stream contains events of type A and B and and I would like to assign a label 1 to an event E of type A if an event of type B happens within a duration x of E. I am using event time and my events can be out of order. For this I'm using ProcessFunction which looks suitable for my use case. In order to handle out of order events, I'm keeping events of type A in a state and once an event of type B is received, I fire an event time timer in which I loop through events of type A in the state having a timestamps < timer.timestamp, label them and remove them from the state. Currently the state is simply a value state containing a TreeMap<Timestamp, EventA>. I'm keeping events sorted in order to effectively get events older than the timer timestamp. I wonder If that's the appropriate data structure to use in the value state to buffer events and be able to handle out of orderness, or if there is a more effective implementation, especially that the state may grow to reach ~100 GB sometimes?

Any insight is appreciated.

Thanks,
Yassine




Reply via email to