Appreciate all your comments! Replying below.

@Luke:

> Having cache tokens per key would be very expensive indeed and I believe we 
> should go with a single cache token "per" bundle.

Thanks for your comments on the PR. I was thinking to propose something
along this lines of having cache tokens valid for a particular
checkpointing "epoch". That would require even less token renewal than
the per-bundle approach.


@Thomas, thanks for the input. Some remarks:

> Wouldn't it be simpler to have the runner just track a unique ID for each 
> worker and use that to communicate if the cache is valid or not?

We do not need a unique id per worker. If a cache token is valid for a
particular worker, it is also valid for another worker. That is with the
assumption that key ranges are always disjoint between the workers.

> * When the bundle is started, the runner tells the worker if the cache has 
> become invalid (since it knows if another worker has mutated state)

This is simply done by not transferring the particular cache token. No
need to declare it invalid explicitly.

> * When the worker sends mutation requests to the runner, it includes its own 
> ID (or the runner already has it as contextual information). No need to wait 
> for a response.

Mutations of cached values can be freely done as long as the cache token
associated with the state is valid for a particular bundle. Only the
first time, the Runner needs to wait on the response to store the cache
token. This can also be done asynchronously.

> * When the bundle is finished, the runner records the last writer (only if a 
> change occurred)

I believe this is not necessary because there will only be one writer at
a time for a particular bundle and key range, hence only one writer
holds a valid cache token for a particular state and key range.


@Reuven:

>  Dataflow divides the keyspace up into lexicographic ranges, and creates a 
> cache token per range. 

State is always processed partitioned by the Flink workers (hash-based,
not lexicopgrahical). I don't think that matters though because the key
ranges do not overlap between the workers. Flink does not support
dynamically repartitioning the key ranges. Even in case of fine-grained
recovery of workers and their key ranges, we would simply generate new
cache tokens for a particular worker.


Thanks,
Max

On 21.08.19 09:33, Reuven Lax wrote:
> Dataflow does something like this, however since work is
> load balanced across workers a per-worker id doesn't work very well.
> Dataflow divides the keyspace up into lexicographic ranges, and creates
> a cache token per range. 
> 
> On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise <t...@apache.org
> <mailto:t...@apache.org>> wrote:
> 
>     Commenting here vs. on the PR since related to the overall approach.
> 
>     Wouldn't it be simpler to have the runner just track a unique ID for
>     each worker and use that to communicate if the cache is valid or not?
> 
>     * When the bundle is started, the runner tells the worker if the
>     cache has become invalid (since it knows if another worker has
>     mutated state)
>     * When the worker sends mutation requests to the runner, it includes
>     its own ID (or the runner already has it as contextual information).
>     No need to wait for a response.
>     * When the bundle is finished, the runner records the last writer
>     (only if a change occurred)
> 
>     Whenever current worker ID and last writer ID doesn't match, cache
>     is invalid.
> 
>     Thomas
> 
> 
>     On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik <lc...@google.com
>     <mailto:lc...@google.com>> wrote:
> 
>         Having cache tokens per key would be very expensive indeed and I
>         believe we should go with a single cache token "per" bundle.
> 
>         On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels
>         <m...@apache.org <mailto:m...@apache.org>> wrote:
> 
>             Maybe a Beam Python expert can chime in for Rakesh's question?
> 
>             Luke, I was assuming cache tokens to be per key and state
>             id. During
>             implementing an initial support on the Runner side, I
>             realized that we
>             probably want cache tokens to only be per state id. Note
>             that if we had
>             per-key cache tokens, the number of cache tokens would
>             approach the
>             total number of keys in an application.
> 
>             If anyone wants to have a look, here is a first version of
>             the Runner
>             side for cache tokens. Note that I only implemented cache
>             tokens for
>             BagUserState for now, but it can be easily added for side
>             inputs as well.
> 
>             https://github.com/apache/beam/pull/9374
> 
>             -Max
> 
> 

Reply via email to