I'm looking to implement a state sharing mechanism between subtasks (of one or multiple tasks). Our use case is to align watermarks between subtasks of one or multiple sources to prevent some data fetchers to race ahead of others and cause massive state buffering in Flink.
Each subtask would share a small state (probably just a key and couple longs). The state would be updated periodically (perhaps every 30s). Other subtasks should see these changes with similar latency. It is essentially a hash table to which every node contributes a distinct key. An initial idea was to implement this using ZooKeeper ephemeral nodes. But since there is no way to read all child nodes in one sweep, state access becomes very chatty. With lets's say 512 subtasks we would end up with 512 * 512 reads per interval (1 to list children, N-1 to fetch data, per subtask). My next stop will be a group communication mechanism like JGroups or Akka (following looks like a potential good fit: https://doc.akka.io/docs/akka/2.5/distributed-data.html?language=java). But before that I wanted to check if others already had a similar need and possibly experience/implementation to share? There are probably more use cases related to discovery etc. Perhaps Flink could provide a state primitive, if there is broader interest in the community? Thanks, Thomas