Appreciate all your comments! Replying below.
@Luke: > Having cache tokens per key would be very expensive indeed and I believe we > should go with a single cache token "per" bundle. Thanks for your comments on the PR. I was thinking to propose something along this lines of having cache tokens valid for a particular checkpointing "epoch". That would require even less token renewal than the per-bundle approach. @Thomas, thanks for the input. Some remarks: > Wouldn't it be simpler to have the runner just track a unique ID for each > worker and use that to communicate if the cache is valid or not? We do not need a unique id per worker. If a cache token is valid for a particular worker, it is also valid for another worker. That is with the assumption that key ranges are always disjoint between the workers. > * When the bundle is started, the runner tells the worker if the cache has > become invalid (since it knows if another worker has mutated state) This is simply done by not transferring the particular cache token. No need to declare it invalid explicitly. > * When the worker sends mutation requests to the runner, it includes its own > ID (or the runner already has it as contextual information). No need to wait > for a response. Mutations of cached values can be freely done as long as the cache token associated with the state is valid for a particular bundle. Only the first time, the Runner needs to wait on the response to store the cache token. This can also be done asynchronously. > * When the bundle is finished, the runner records the last writer (only if a > change occurred) I believe this is not necessary because there will only be one writer at a time for a particular bundle and key range, hence only one writer holds a valid cache token for a particular state and key range. @Reuven: > Dataflow divides the keyspace up into lexicographic ranges, and creates a > cache token per range. State is always processed partitioned by the Flink workers (hash-based, not lexicopgrahical). I don't think that matters though because the key ranges do not overlap between the workers. Flink does not support dynamically repartitioning the key ranges. Even in case of fine-grained recovery of workers and their key ranges, we would simply generate new cache tokens for a particular worker. Thanks, Max On 21.08.19 09:33, Reuven Lax wrote: > Dataflow does something like this, however since work is > load balanced across workers a per-worker id doesn't work very well. > Dataflow divides the keyspace up into lexicographic ranges, and creates > a cache token per range. > > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise <t...@apache.org > <mailto:t...@apache.org>> wrote: > > Commenting here vs. on the PR since related to the overall approach. > > Wouldn't it be simpler to have the runner just track a unique ID for > each worker and use that to communicate if the cache is valid or not? > > * When the bundle is started, the runner tells the worker if the > cache has become invalid (since it knows if another worker has > mutated state) > * When the worker sends mutation requests to the runner, it includes > its own ID (or the runner already has it as contextual information). > No need to wait for a response. > * When the bundle is finished, the runner records the last writer > (only if a change occurred) > > Whenever current worker ID and last writer ID doesn't match, cache > is invalid. > > Thomas > > > On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik <lc...@google.com > <mailto:lc...@google.com>> wrote: > > Having cache tokens per key would be very expensive indeed and I > believe we should go with a single cache token "per" bundle. > > On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels > <m...@apache.org <mailto:m...@apache.org>> wrote: > > Maybe a Beam Python expert can chime in for Rakesh's question? > > Luke, I was assuming cache tokens to be per key and state > id. During > implementing an initial support on the Runner side, I > realized that we > probably want cache tokens to only be per state id. Note > that if we had > per-key cache tokens, the number of cache tokens would > approach the > total number of keys in an application. > > If anyone wants to have a look, here is a first version of > the Runner > side for cache tokens. Note that I only implemented cache > tokens for > BagUserState for now, but it can be easily added for side > inputs as well. > > https://github.com/apache/beam/pull/9374 > > -Max > >