I think the biggest challenge here in terms of using external store is to be able to (periodically) scan through keys even if their corresponding values are not updated and check if they should be fired/GCed.
The implementation I'm working on in Spark runner does that by using Spark's *updateStateByKey* which scans through all keys each microbatch - this is far from optimal, but much like with an external store, Spark's execution model works on microbatches such that processing of each microbatch is done in a new execution context (so you can't hold a heap of timers and periodically check them for instance, and stateful operators are all keyed). I am actually thinking of an external store solution for internal use at PayPal for such cases as Jingsong mentioned. One way I thought of that was by using a store that is either ordered or quick to provide order queries (top...) and index by the Timer while pointing to the key, and periodically querying to check the top candidates to fire - if a key should fire you retrieve it from the actual data store and fire/GC. Of course if a key is to be updated you could update Timers (while your at it..). HBase provides lexical ordering on keys and could be good for that (with some salting and manipulation to the keys), or even MySQL (if your key cardinality is not too big). I wonder if we want to perhaps abstract such thing as part of the runner API since it sounds like it might be relevant to most runners.. +Kenneth Knowles <[email protected]> WDYT ? On Wed, Jan 25, 2017 at 12:38 PM Aljoscha Krettek <[email protected]> wrote: > Hi Jingsong, > you're right, it is indeed somewhat tricky to find a good data structure > for out-of-core timers. That's why we have them in memory in Flink for now > and that's also why I'm afraid I don't have any good advice for you right > now. We're aware of the problem in Flink but we're not yet working on a > concrete solution. > > Cheers, > Aljoscha > > On Tue, 24 Jan 2017 at 21:42 Dan Halperin <[email protected]> wrote: > > > Hi Jingsong, > > > > Sorry for the delayed response; this email ended up being misclassified > by > > my mail server and I missed it. Maybe Kenn or Aljoscha has suggestions on > > how runners can best implement timers? > > > > Dan > > > > On Thu, Jan 19, 2017 at 9:55 PM, lzljs3620320 <[email protected]> > > wrote: > > > > > Hi there, > > > I'm working on the beam integration for an internal system at Alibaba. > > Now > > > most of the runners put timers in memory, such as Flink, Apex, etc. (I > > do not know > > > the implementation of Google Dataflow).But in our scene, unbounded > data > > > has a large number of keys,which will lead to OOM(timers in memory). So > > > we want to store timers in state(RocksDb in disk).The problem is how to > > > extract fired event time timers when refresh the input > > > watermark. Do we have to scan all keys and timers(Now timer is composed > > of > > > Key, id, namespace, timestamp, domain)?Is there a better > > > implement? I'm wondering if you could give me some advice on how to > > implement > > > timers in state efficiently. Thank you! > > > Best,Jingsong Lee > > >
