Hi Xiaogang, Indeed, the MapState is what I was looking for in order to have efficient sorted state, as it would faciliate many use cases like this one, or joining streams, etc. I searched a bit and found your contribution <https://github.com/apache/flink/pull/3336> of MapState for the next 1.3 release, I'll see how it works for me. Thank you for pointing this out, very helpful!
Best, Yassine 2017-03-16 18:50 GMT+01:00 SHI Xiaogang <shixiaoga...@gmail.com>: > Hi Yassine, > > If I understand correctly, you are needing sorted states which > unfortunately are not supported in Flink now. > We have some ideas to provide such sorted states to facilitate the > development of user applications. But it is still under discussion due to > the concerns on back compatibility. > > Currently, I think we can work around the problem with MapStates in > RocksDB statebackends. > In RocksDB statebackends, each entry in MapState corresponds to an entry > in RocksDB. The key of a RocksDB entry is formatted as " > keyGroup#key#keyLen#namespace#namespaceLen#mapKey" > > The entries in RocksDB are sorted in the lexicographical order. In the > cases where the map keys are typed Timestamp/Long, the entries in the > MapState will be iterated as the same order in a sorted map. Thus, you can > find all the events whose timestamps are smaller than the given one. > > The solution is quite tricky because it does not work when Heap > statebackends are used. But given that the state may grow up to ~100GB, > RocksDB statebackends are strongly recommended. > > May the information helps you. > > Regards, > Xiaogang > > 2017-03-09 23:19 GMT+08:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > >> Hi Timo, >> >> I thought about the ListState but quickly discarded It as it keeps the >> insersion order and not events order. After a second thought I think I will >> reconsider it since my events are occaionally out-of-order. Didn't know >> that Flink CEP operators 'next' and 'within', can handle event time, so I >> think I will give it a try! Thank you! >> >> Best, >> Yassine >> >> 2017-03-08 9:55 GMT+01:00 Timo Walther <twal...@apache.org>: >> >>> Hi Yassine, >>> >>> have you thought about using a ListState? As far as I know, it keeps at >>> least the insertion order. You could sort it once your trigger event has >>> arrived. >>> If you use a RocksDB as state backend, 100+ GB of state should not be a >>> problem. Have you thought about using Flink's CEP library? It might fit to >>> your needs without implementing a custom process function. >>> >>> I hope that helps. >>> >>> Timo >>> >>> >>> Am 07/03/17 um 19:23 schrieb Yassine MARZOUGUI: >>> >>> Hi all, >>>> >>>> I want to label events in a stream based on a condition on some future >>>> events. >>>> For example my stream contains events of type A and B and and I would >>>> like to assign a label 1 to an event E of type A if an event of type B >>>> happens within a duration x of E. I am using event time and my events can >>>> be out of order. >>>> For this I'm using ProcessFunction which looks suitable for my use >>>> case. In order to handle out of order events, I'm keeping events of type A >>>> in a state and once an event of type B is received, I fire an event time >>>> timer in which I loop through events of type A in the state having a >>>> timestamps < timer.timestamp, label them and remove them from the state. >>>> Currently the state is simply a value state containing a >>>> TreeMap<Timestamp, EventA>. I'm keeping events sorted in order to >>>> effectively get events older than the timer timestamp. >>>> I wonder If that's the appropriate data structure to use in the value >>>> state to buffer events and be able to handle out of orderness, or if there >>>> is a more effective implementation, especially that the state may grow to >>>> reach ~100 GB sometimes? >>>> >>>> Any insight is appreciated. >>>> >>>> Thanks, >>>> Yassine >>>> >>>> >>>> >>>> >>> >> >