You are right that probably the best solution would be to be able to use different state backends for different operators, I hope it gets implemented at some point. Meanwhile I'll take a look at the methods in org.apache.flink.runtime.state.KeyGroupRangeAssignment, maybe I can find a workaround good enough for me.
Thanks, Gerard On Tue, Feb 20, 2018 at 3:56 PM, Stefan Richter <s.rich...@data-artisans.com > wrote: > Hi, > > ok, now I understand your goal a bit better. If would still like to point > out that it may take a bit more than it looks like. Just to name one > example, you probably also want to support asynchronous snapshots which is > most likely difficult when using just a hashmap. I think the proper > solution for you (and also something that we are considering to support in > the future) is that different backends could be supported for different > operators in a job. But that is currently not possible. I still want to > answer your other question: you could currently compute all things about > key-groups and their assignment to operators by using the methods > from org.apache.flink.runtime.state.KeyGroupRangeAssignment. > > Best, > Stefan > > > Am 20.02.2018 um 14:52 schrieb Gerard Garcia <ger...@talaia.io>: > > Hi Stefan, thanks > > Yes, we are also using keyed state in other operators the problem is that > serialization is quite expensive and in some of them we would prefer to > avoid it by storing the state in memory (for our use case one specific > operator with in memory state gives at least a 30% throughput improvement). > When we are not operating in a keyed stream is easy, basically all the > operators have the same in memory state, what we would like to do is the > same but when we are operating in a keyed stream. Does it make more sense > now? > > We are using rocksdb as state backend and as far as I know elements get > always serialized when stored in the state and I'm not sure if there is > even some disk access (maybe not synchronously) that could hurt performance. > > Gerard > > On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter < > s.rich...@data-artisans.com> wrote: > >> Hi, >> >> from what I read, I get the impression that you attempt to implement you >> own "keyed state" with a hashmap? Why not using the keyed state that is >> already provided by Flink and gives you efficient rescaling etc. out of the >> box? Please see [1] for the details. >> >> [1] https://ci.apache.org/projects/flink/flink-docs-master/ >> dev/stream/state/state.html#using-managed-keyed-state >> >> Am 20.02.2018 um 13:44 schrieb gerardg <ger...@talaia.io>: >> >> Hello, >> >> To improve performance we have " keyed state" in the operator's memory, >> basically we keep a Map which contains the state per each of the keys. The >> problem comes when we want to restore the state after a failure or after >> rescaling the operator. What we are doing is sending the concatenation of >> all the state to every operator using an union redistribution and then we >> restore the "in memory state" every time we see a new key. Then, after a >> while, we just clear the redistributed state. This is somewhat complex and >> prone to errors so we would like to find an alternative way of doing this. >> >> As far as I know Flink knows which keys belong to each operator >> (distributing key groups) so I guess it would be possible to somehow >> calculate the key id from each of the stored keys and restore the in >> memory >> state at once if we could access to the key groups mapping. Is that >> possible? We could patch Flink if necessary to access that information. >> >> Thanks, >> >> Gerard >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/ >> >> >> > >