You are right that probably the best solution would be to be able to use
different state backends for different operators, I hope it gets
implemented at some point. Meanwhile I'll take a look at the methods in
org.apache.flink.runtime.state.KeyGroupRangeAssignment, maybe I can find a
workaround good enough for me.

Thanks,

Gerard

On Tue, Feb 20, 2018 at 3:56 PM, Stefan Richter <s.rich...@data-artisans.com
> wrote:

> Hi,
>
> ok, now I understand your goal a bit better. If would still like to point
> out that it may take a bit more than it looks like. Just to name one
> example, you probably also want to support asynchronous snapshots which is
> most likely difficult when using just a hashmap. I think the proper
> solution for you (and also something that we are considering to support in
> the future) is that different backends could be supported for different
> operators in a job. But that is currently not possible. I still want to
> answer your other question: you could currently compute all things about
> key-groups and their assignment to operators by using the methods
> from org.apache.flink.runtime.state.KeyGroupRangeAssignment.
>
> Best,
> Stefan
>
>
> Am 20.02.2018 um 14:52 schrieb Gerard Garcia <ger...@talaia.io>:
>
> Hi Stefan, thanks
>
> Yes, we are also using keyed state in other operators the problem is that
> serialization is quite expensive and in some of them we would prefer to
> avoid it by storing the state in memory (for our use case one specific
> operator with in memory state gives at least a 30% throughput improvement).
> When we are not operating in a keyed stream is easy, basically all the
> operators have the same in memory state, what we would like to do is the
> same but when we are operating in a keyed stream. Does it make more sense
> now?
>
> We are using rocksdb as state backend and as far as I know elements get
> always serialized when stored in the state and I'm not sure if there is
> even some disk access (maybe not synchronously) that could hurt performance.
>
> Gerard
>
> On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> from what I read, I get the impression that you attempt to implement you
>> own "keyed state" with a hashmap? Why not using the keyed state that is
>> already provided by Flink and gives you efficient rescaling etc. out of the
>> box? Please see [1] for the details.
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/stream/state/state.html#using-managed-keyed-state
>>
>> Am 20.02.2018 um 13:44 schrieb gerardg <ger...@talaia.io>:
>>
>> Hello,
>>
>> To improve performance we have " keyed state" in the operator's memory,
>> basically we keep a Map which contains the state per each of the keys. The
>> problem comes when we want to restore the state after a failure or after
>> rescaling the operator. What we are doing is sending the concatenation of
>> all the state to every operator using an union redistribution and then we
>> restore the "in memory state" every time we see a new key. Then, after a
>> while, we just clear the redistributed state. This is somewhat complex and
>> prone to errors so we would like to find an alternative way of doing this.
>>
>> As far as I know Flink knows which keys belong to each operator
>> (distributing key groups) so I guess it would be possible to somehow
>> calculate the key id from each of the stored keys and restore the in
>> memory
>> state at once if we could access to the key groups mapping. Is that
>> possible? We could patch Flink if necessary to access that information.
>>
>> Thanks,
>>
>> Gerard
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>>
>>
>
>

Reply via email to