Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/747#issuecomment-113881056
  
    I like this very much, Kudos Gyula and Paris, this is good work. This text 
should definitely go into the Wiki, for reference.
    
    I have a few thoughts and ideas on how we may be able to push this even one 
step further. Let's figure out how many of those actually mean changes, and 
which ones can simply be added after this is merged.
    
    ### Current State Interface
    
    I would like we keep the current interface in parallel to the new one. 
People seem to like it quite a bit (from the demos in the past days) and I 
think the additional code paths to maintain for that are not too many. The 
current interface also comes in handy for states that cannot be maintained per 
key, for example the probabilistic counters and aggregates that explicitly do 
not maintain an entry per key, but only bitmaps across all keys.
    
    I think it would be an okay initial restriction if they are 
non-re-partitionable.
    
    ### State Backing / Checkpointing & Restoring
    
    This design makes the assumption that the state is held in the Flink 
operators and is checkpointed to an outside storage, via the StateCheckpointer.
    
    How about we generalize this a bit, to make it pluggable where the state is 
actually stored? From recent feedback, I heard that people find it useful to 
store the state actually in an external key/value store (habe, cassandra, ...) 
and have Flink' checkpointing algorithm coordinate the group commits.
    
    This would mean:
    
      - Having the state in Flink is one implementation of the state store. The 
current state backend becomes something that is used when the state is held in 
Flink.
      
      - Having the state in a key/value store would mean that we group commit 
changes into that key/value store (upon shanpshots) and that we keep inside 
Flink only caches of recent keys (for faster read access). That way, we solve 
out-of-core state and this also subsumes the lazy state fetching, as an 
operator would start with an empty local cache and pull key state as needed. No 
special code required.
    
      - In the long run, we will probably want to have a lightweight key-value 
store that we start with the system in streaming mode.
    
      
    ### API integration
    
    My feeling here is that we should not have special constructs like 
"setStatePartitioner()", but introduce something like a "KeyedDataStream", and 
key-state operators work only on that type of data stream. GroupBy data streams 
are a special case (stronger) of the KeyedDataStream, so groupBy/window and 
mapWindow, or reduce, have always the ability to hold key state. For 
map/flatMap, one would need to write `stream.keyBy("userid").map(...)`
    
    
    ### Asynchronous state checkpoints
    
    This interface is a good basis for asynchronous checkpoints, because we can 
do a copy-on-write per key if it is changed while the checkpoint writing is 
still in progress. Since we have multiple concurrent checkpoints (possibly), we 
need to design this carefully to allow us to have a sequence of deltas against 
also other deltas, not only against the full state.
    
    This is only relevant when the state is in Flink. When the state is backed 
externally (key/value store), the checkpoint conceptually only group commits 
and starts a new bulk transaction, which does not involve any data movement.
    
    
    ### Incremental state checkpoints
    
    Again, this is only relevant when the state is in Flink. Here, this 
interface is also a good basis, since we can persist the state of the keys that 
changes. We cannot go incremental per key with that, which remains to be seen 
whether it would be relevant to do that. In the case of long windows, we 
certainly want incremental snapshots of the individual  window buffers, but it 
is unclear whether we need to expose such a thing to the user.
    
    
    ### (Very vague so far) Shared replicated state (via CRDTs)
    
    I have been thinking a bit whether it would make sense to allow parallel 
instances to have shared state
    that is updated asynchronously. This would not make sense for general 
state/data types, but where the types can be implemented by CRDTs 
(conflict-free replicated distributed datatypes), this may be very feasible and 
quite useful. Parallel operators could maintain shared replicated counters or 
histograms that way.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to