Hi Ken,

So you have a queue where elements are sorted by timestamp and score, and when 
the time (event time I suppose) passes 
that of the timestamp of an element, you want to fetch the element and:
 if the score is too low you archive it 
 if the score is OK you emit it.

If I get it right, then if your stream is keyed you have a queue and an 
“archive” state per key, 
if not, you have a global queue for all elements, which can be seen as a keyed 
stream on a dummy key, right?
By the way, timers in Flink have to be associated with a key, so I suppose that 
if you are using timers you are in the first case (keyed stream).

In this case, why do you need access to the state of all the keys?

Also it may be worth having a look at the CEP operator in the Flink codebase.
There you also have a queue per key, where events are sorted by timestamp, and 
at each watermark, 
elements with timestamps smaller than the watermark are processed.

Hope this helps,
Kostas

> On Apr 28, 2017, at 4:08 AM, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> 
> Hi Kostas,
> 
> Thanks for responding. Details in-line below.
> 
>> On Apr 27, 2017, at 1:19am, Kostas Kloudas <k.klou...@data-artisans.com 
>> <mailto:k.klou...@data-artisans.com>> wrote:
>> 
>> Hi Ken,
>> 
>> Unfortunately, iterating over all keys is not currently supported.
>> 
>> Do you have your own custom operator (because you mention “from within the 
>> operator…”) or
>> you have a process function (because you mention the “onTimer” method)?
> 
> Currently it’s a process function, but I might be able to just use a regular 
> operator.
> 
>> Also, could you describe your use case a bit more?  You have a periodic 
>> timer per key and when
>> a timer for a given key fires you want to have access to the state of all 
>> the keys?
> 
> The timer bit is because I’m filling an async queue, and thus need to trigger 
> emitting tuples to the operator’s output stream independent of inbound tuples.
> The main problems I’m trying to solve (without requiring a separate scalable 
> DB infrastructure) are:
> 
>  - entries have an associated “earliest processing time”. I don’t want to 
> send these through the system until that time trigger has passed.
>  - entries have an associated “score”. I want to favor processing high 
> scoring entries over low scoring entries.
>  - if an entry’s score is too low, I want to archive it, versus constantly 
> re-evaluate it using the above two factors.
> 
> I’ve got my own custom DB that is working for the above, and scales to target 
> sizes of 1B+ entries per server by using a mixture of RAM and disk.

> But having to checkpoint it isn’t trivial.
> 
> So I thought that if there was a way to (occasionally) iterate over the keys 
> in the state backend, I could get what I needed with the minimum effort.
> 
> But sounds like that’s not possible currently.
> 
> Thanks,
> 
> — Ken
> 
> 
> 
>>> On Apr 27, 2017, at 3:02 AM, Ken Krugler <kkrugler_li...@transpac.com 
>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>> 
>>> Is there a way to iterate over all of the key/value entries in the state 
>>> backend, from within the operator that’s making use of the same?
>>> 
>>> E.g. I’ve got a ReducingState, and on a timed interval (inside of the 
>>> onTimer method) I need to iterate over all KV state and emit the N “best” 
>>> entries.
>>> 
>>> What’s the recommended approach?
>>> 
>>> Thanks,
>>> 
>>> — Ken
>>> 
>> 
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr

Reply via email to