Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/747#issuecomment-113881056 I like this very much, Kudos Gyula and Paris, this is good work. This text should definitely go into the Wiki, for reference. I have a few thoughts and ideas on how we may be able to push this even one step further. Let's figure out how many of those actually mean changes, and which ones can simply be added after this is merged. ### Current State Interface I would like we keep the current interface in parallel to the new one. People seem to like it quite a bit (from the demos in the past days) and I think the additional code paths to maintain for that are not too many. The current interface also comes in handy for states that cannot be maintained per key, for example the probabilistic counters and aggregates that explicitly do not maintain an entry per key, but only bitmaps across all keys. I think it would be an okay initial restriction if they are non-re-partitionable. ### State Backing / Checkpointing & Restoring This design makes the assumption that the state is held in the Flink operators and is checkpointed to an outside storage, via the StateCheckpointer. How about we generalize this a bit, to make it pluggable where the state is actually stored? From recent feedback, I heard that people find it useful to store the state actually in an external key/value store (habe, cassandra, ...) and have Flink' checkpointing algorithm coordinate the group commits. This would mean: - Having the state in Flink is one implementation of the state store. The current state backend becomes something that is used when the state is held in Flink. - Having the state in a key/value store would mean that we group commit changes into that key/value store (upon shanpshots) and that we keep inside Flink only caches of recent keys (for faster read access). That way, we solve out-of-core state and this also subsumes the lazy state fetching, as an operator would start with an empty local cache and pull key state as needed. No special code required. - In the long run, we will probably want to have a lightweight key-value store that we start with the system in streaming mode. ### API integration My feeling here is that we should not have special constructs like "setStatePartitioner()", but introduce something like a "KeyedDataStream", and key-state operators work only on that type of data stream. GroupBy data streams are a special case (stronger) of the KeyedDataStream, so groupBy/window and mapWindow, or reduce, have always the ability to hold key state. For map/flatMap, one would need to write `stream.keyBy("userid").map(...)` ### Asynchronous state checkpoints This interface is a good basis for asynchronous checkpoints, because we can do a copy-on-write per key if it is changed while the checkpoint writing is still in progress. Since we have multiple concurrent checkpoints (possibly), we need to design this carefully to allow us to have a sequence of deltas against also other deltas, not only against the full state. This is only relevant when the state is in Flink. When the state is backed externally (key/value store), the checkpoint conceptually only group commits and starts a new bulk transaction, which does not involve any data movement. ### Incremental state checkpoints Again, this is only relevant when the state is in Flink. Here, this interface is also a good basis, since we can persist the state of the keys that changes. We cannot go incremental per key with that, which remains to be seen whether it would be relevant to do that. In the case of long windows, we certainly want incremental snapshots of the individual window buffers, but it is unclear whether we need to expose such a thing to the user. ### (Very vague so far) Shared replicated state (via CRDTs) I have been thinking a bit whether it would make sense to allow parallel instances to have shared state that is updated asynchronously. This would not make sense for general state/data types, but where the types can be implemented by CRDTs (conflict-free replicated distributed datatypes), this may be very feasible and quite useful. Parallel operators could maintain shared replicated counters or histograms that way.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---