> Just to clarify, the repeated list of cache tokens in the process
> bundle request is used to validate reading *and* stored when writing?
> In that sense, should they just be called version identifiers or
> something like that?
We could call them version identifiers, though cache tokens were always
a means to identify versions of a state.
On 28.08.19 11:10, Maximilian Michels wrote:
>> cachetools sounds like a fine choice to me.
>
> For the first version I've implemented a simple LRU cache. If you want
> to have a look:
>
https://github.com/apache/beam/pull/9418/files#diff-ed2d70e99442b6e1668e30409d3383a6R60
>
>
>> Open up a PR for the proto changes and we can work through any minor
>> comments there.
>
> Proto changes: https://github.com/apache/beam/pull/9440
>
>
> Thanks,
> Max
>
> On 27.08.19 23:00, Robert Bradshaw wrote:
>> Just to clarify, the repeated list of cache tokens in the process
>> bundle request is used to validate reading *and* stored when writing?
>> In that sense, should they just be called version identifiers or
>> something like that?
>>
>> On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels <m...@apache.org>
>> wrote:
>>>
>>> Thanks. Updated:
>>>
>>> message ProcessBundleRequest {
>>> // (Required) A reference to the process bundle descriptor that
>>> must be
>>> // instantiated and executed by the SDK harness.
>>> string process_bundle_descriptor_reference = 1;
>>>
>>> // A cache token which can be used by an SDK to check for the
>>> validity
>>> // of cached elements which have a cache token associated.
>>> message CacheToken {
>>>
>>> // A flag to indicate a cache token is valid for user state.
>>> message UserState {}
>>>
>>> // A flag to indicate a cache token is valid for a side input.
>>> message SideInput {
>>> // The id of a side input.
>>> string side_input = 1;
>>> }
>>>
>>> // The scope of a cache token.
>>> oneof type {
>>> UserState user_state = 1;
>>> SideInput side_input = 2;
>>> }
>>>
>>> // The cache token identifier which should be globally unique.
>>> bytes token = 10;
>>> }
>>>
>>> // (Optional) A list of cache tokens that can be used by an SDK
>>> to reuse
>>> // cached data returned by the State API across multiple bundles.
>>> repeated CacheToken cache_tokens = 2;
>>> }
>>>
>>> On 27.08.19 19:22, Lukasz Cwik wrote:
>>>
>>> SideInputState -> SideInput (side_input_state -> side_input)
>>> + more comments around the messages and the fields.
>>>
>>>
>>> On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels <m...@apache.org>
>>> wrote:
>>>>
>>>> We would have to differentiate cache tokens for user state and side
>>>> inputs. How about something like this?
>>>>
>>>> message ProcessBundleRequest {
>>>> // (Required) A reference to the process bundle descriptor that
>>>> must be
>>>> // instantiated and executed by the SDK harness.
>>>> string process_bundle_descriptor_reference = 1;
>>>>
>>>> message CacheToken {
>>>>
>>>> message UserState {
>>>> }
>>>>
>>>> message SideInputState {
>>>> string side_input_id = 1;
>>>> }
>>>>
>>>> oneof type {
>>>> UserState user_state = 1;
>>>> SideInputState side_input_state = 2;
>>>> }
>>>>
>>>> bytes token = 10;
>>>> }
>>>>
>>>> // (Optional) A list of cache tokens that can be used by an SDK
>>>> to reuse
>>>> // cached data returned by the State API across multiple bundles.
>>>> repeated CacheToken cache_tokens = 2;
>>>> }
>>>>
>>>> -Max
>>>>
>>>> On 27.08.19 18:43, Lukasz Cwik wrote:
>>>>
>>>> The bundles view of side inputs should never change during
>>>> processing and should have a point in time snapshot.
>>>>
>>>> I was just trying to say that the cache token for side inputs being
>>>> deferred till side input request time simplified the runners
>>>> implementation since that is conclusively when the runner would
>>>> need to take a look at the side input. Putting them as part of the
>>>> ProcesBundleRequest complicates that but does make the SDK
>>>> implementation significantly simpler which is a win.
>>>>
>>>> On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels <m...@apache.org>
>>>> wrote:
>>>>>
>>>>> Thanks for the quick response.
>>>>>
>>>>> Just to clarify, the issue with versioning side input is also present
>>>>> when supplying the cache tokens on a request basis instead of per
>>>>> bundle. The SDK never knows when the Runner receives a new version of
>>>>> the side input. Like you pointed out, it needs to mark side inputs as
>>>>> stale and generate new cache tokens for the stale side inputs.
>>>>>
>>>>> The difference between per-request tokens and per-bundle tokens
>>>>> would be
>>>>> that the side input can only change after a bundle completes vs.
>>>>> during
>>>>> the bundle. Side inputs are always fuzzy in that regard because
>>>>> there is
>>>>> no precise instance where side inputs are atomically updated,
>>>>> other than
>>>>> the assumption that they eventually will be updated. In that regard
>>>>> per-bundle tokens for side input seem to be fine.
>>>>>
>>>>> All of the above is not an issue for user state, as its cache can
>>>>> remain
>>>>> valid for the lifetime of a Runner<=>SDK Harness connection. A simple
>>>>> solution would be to not cache side input because there are many
>>>>> cases
>>>>> where the caching just adds additional overhead. However, I can also
>>>>> imagine cases where side input is valid forever and caching would be
>>>>> very beneficial.
>>>>>
>>>>> For the first version I want to focus on user state because that's
>>>>> where
>>>>> I see the most benefit for caching. I don't see a problem though
>>>>> for the
>>>>> Runner to detect new side input and reflect that in the cache tokens
>>>>> supplied for a new bundle.
>>>>>
>>>>> -Max
>>>>>
>>>>> On 26.08.19 22:27, Lukasz Cwik wrote:
>>>>>> Your summary below makes sense to me. I can see that recovery from
>>>>>> rolling back doesn't need to be a priority and simplifies the
>>>>>> solution
>>>>>> for user state caching down to one token.
>>>>>>
>>>>>> Providing cache tokens upfront does require the Runner to know what
>>>>>> "version" of everything it may supply to the SDK upfront (instead
>>>>>> of on
>>>>>> request) which would mean that the Runner may need to have a mapping
>>>>>> from cache token to internal version identifier for things like side
>>>>>> inputs which are typically broadcast. The Runner would also need
>>>>>> to poll
>>>>>> to see if the side input has changed in the background to not block
>>>>>> processing bundles with "stale" side input data.
>>>>>>
>>>>>> Ping me once you have the Runner PR updated and I'll take a look
>>>>>> again.
>>>>>>
>>>>>> On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels <m...@apache.org
>>>>>> <mailto:m...@apache.org>> wrote:
>>>>>>
>>>>>> Thank you for the summary Luke. I really appreciate the
>>>>>> effort you put
>>>>>> into this!
>>>>>>
>>>>>> > Based upon your discussion you seem to want option #1
>>>>>>
>>>>>> I'm actually for option #2. The option to cache/invalidate
>>>>>> side inputs
>>>>>> is important, and we should incorporate this in the design.
>>>>>> That's why
>>>>>> option #1 is not flexible enough. However, a first
>>>>>> implementation could
>>>>>> defer caching of side inputs.
>>>>>>
>>>>>> Option #3 was my initial thinking and the first version of
>>>>>> the PR, but I
>>>>>> think we agreed that there wouldn't be much gain from
>>>>>> keeping a cache
>>>>>> token per state id.
>>>>>>
>>>>>> Option #4 is what is specifically documented in the
>>>>>> reference doc and
>>>>>> already part of the Proto, where valid tokens are provided
>>>>>> for each new
>>>>>> bundle and also as part of the response of a get/put/clear.
>>>>>> We mentioned
>>>>>> that the reply does not have to be waited on synchronously
>>>>>> (I mentioned
>>>>>> it even), but it complicates the implementation. The idea
>>>>>> Thomas and I
>>>>>> expressed was that a response is not even necessary if we
>>>>>> assume
>>>>>> validity of the upfront provided cache tokens for the
>>>>>> lifetime of a
>>>>>> bundle and that cache tokens will be invalidated as soon as
>>>>>> the Runner
>>>>>> fails in any way. This is naturally the case for Flink
>>>>>> because it will
>>>>>> simply "forget" its current cache tokens.
>>>>>>
>>>>>> I currently envision the following schema:
>>>>>>
>>>>>> Runner
>>>>>> ======
>>>>>>
>>>>>> - Runner generates a globally unique cache token, one for
>>>>>> user state and
>>>>>> one for each side input
>>>>>>
>>>>>> - The token is supplied to the SDK Harness for each bundle
>>>>>> request
>>>>>>
>>>>>> - For the lifetime of a Runner<=>SDK Harness connection this
>>>>>> cache token
>>>>>> will not change
>>>>>> - Runner will generate a new token if the connection/key
>>>>>> space changes
>>>>>> between Runner and SDK Harness
>>>>>>
>>>>>>
>>>>>> SDK
>>>>>> ===
>>>>>>
>>>>>> - For each bundle the SDK worker stores the list of valid
>>>>>> cache tokens
>>>>>> - The SDK Harness keep a global cache across all its (local)
>>>>>> workers
>>>>>> which is a LRU cache: state_key => (cache_token, value)
>>>>>> - get: Lookup cache using the valid cache token for the
>>>>>> state. If no
>>>>>> match, then fetch from Runner and use the already available
>>>>>> token for
>>>>>> caching
>>>>>> - put: Put value in cache with a valid cache token, put
>>>>>> value to pending
>>>>>> writes which will be flushed out latest when the bundle ends
>>>>>> - clear: same as put but clear cache
>>>>>>
>>>>>> It does look like this is not too far off from what you were
>>>>>> describing.
>>>>>> The main difference is that we just work with a single cache
>>>>>> token. In
>>>>>> my opinion we do not need the second cache token for writes,
>>>>>> as long as
>>>>>> we ensure that we generate a new cache token if the
>>>>>> bundle/checkpoint fails.
>>>>>>
>>>>>> I have a draft PR
>>>>>> for the Runner: https://github.com/apache/beam/pull/9374
>>>>>> for the SDK: https://github.com/apache/beam/pull/9418
>>>>>>
>>>>>> Note that the Runner PR needs to be updated to fully
>>>>>> reflected the above
>>>>>> scheme. The SDK implementation is WIP. I want to make sure
>>>>>> that we
>>>>>> clarify the design before this gets finalized.
>>>>>>
>>>>>> Thanks again for all your comments. Much appreciated!
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> On 26.08.19 19:58, Lukasz Cwik wrote:
>>>>>> > There were originally a couple of ideas around how
>>>>>> caching could
>>>>>> work:
>>>>>> > 1) One cache token for the entire bundle that is supplied up
>>>>>> front. The
>>>>>> > SDK caches everything using the given token. All
>>>>>> reads/clear/append for
>>>>>> > all types of state happen under this token. Anytime a
>>>>>> side input
>>>>>> > changes, key processing partition range changes or a
>>>>>> bundle fails to
>>>>>> > process, the runner chooses a new cache token effectively
>>>>>> invalidating
>>>>>> > everything in the past>
>>>>>> > 2) One cache token per type of state that is supplied up
>>>>>> front.
>>>>>> > The SDK caches all requests for a given type using the
>>>>>> given cache
>>>>>> > token. The runner can selectively choose which type to
>>>>>> keep and
>>>>>> which to
>>>>>> > invalidate. Bundle failure and key processing partition
>>>>>> changes
>>>>>> > invalidate all user state, side input change invalidates
>>>>>> all side
>>>>>> inputs.
>>>>>> >
>>>>>> > 3) One cache token per state id that is supplied up front.
>>>>>> > The SDK caches all requests for the given state id using the
>>>>>> given cache
>>>>>> > token. The runner can selectively choose which to
>>>>>> invalidate and
>>>>>> which
>>>>>> > to keep. Bundle failure and key processing partition changes
>>>>>> invalidate
>>>>>> > all user state, side input changes only invalidate the
>>>>>> side input
>>>>>> that
>>>>>> > changed.
>>>>>> >
>>>>>> > 4) A cache token on each read/clear/append that is
>>>>>> supplied on the
>>>>>> > response of the call with an initial valid set that is
>>>>>> supplied at
>>>>>> > start. The runner can selectively choose which to keep on
>>>>>> start.
>>>>>> Bundle
>>>>>> > failure allows runners to "roll back" to a known good
>>>>>> state by
>>>>>> selecting
>>>>>> > the previous valid cache token as part of the initial
>>>>>> set. Key
>>>>>> > processing partition changes allow runners to keep cached
>>>>>> state that
>>>>>> > hasn't changed since it can be tied to a version number
>>>>>> of the state
>>>>>> > itself as part of the initial set. Side input changes
>>>>>> only invalidate
>>>>>> > the side input that changed.
>>>>>> >
>>>>>> > Based upon your discussion you seem to want option #1 which
>>>>>> doesn't work
>>>>>> > well with side inputs clearing cached state. If we want
>>>>>> to have user
>>>>>> > state survive a changing side input, we would want one of
>>>>>> the other
>>>>>> > options. I do agree that supplying the cache token
>>>>>> upfront is
>>>>>> > significantly simpler. Currently the protos are setup for
>>>>>> #4 since it
>>>>>> > was the most flexible and at the time the pros outweighed
>>>>>> the cons.
>>>>>> >
>>>>>> > I don't understand why you think you need to wait for a
>>>>>> response
>>>>>> for the
>>>>>> > append/clear to get its cache token since the only reason
>>>>>> you
>>>>>> need the
>>>>>> > cache token is that you want to use that cached data when
>>>>>> processing a
>>>>>> > different bundle. I was thinking that the flow on the SDK
>>>>>> side
>>>>>> would be
>>>>>> > something like (assuming there is a global cache of cache
>>>>>> token
>>>>>> -> (map
>>>>>> > of state key -> data))
>>>>>> > 1) Create a local cache of (map of state key -> data)
>>>>>> using the
>>>>>> initial
>>>>>> > set of valid cache tokens
>>>>>> > 2) Make all mutations in place on local cache without
>>>>>> waiting for
>>>>>> response.
>>>>>> > 3) When response comes back, update global cache with new
>>>>>> cache
>>>>>> token ->
>>>>>> > (map of state key -> data)) (this is when the data
>>>>>> becomes visible to
>>>>>> > other bundles that start processing)
>>>>>> > 4) Before the bundle finishes processing, wait for all
>>>>>> outstanding state
>>>>>> > calls to finish.
>>>>>> >
>>>>>> > To implement caching on the runner side, you would keep
>>>>>> track of
>>>>>> at most
>>>>>> > 2 cache tokens per state key, one cache token represents
>>>>>> the initial
>>>>>> > value when the bundle started while the second represents
>>>>>> the
>>>>>> modified
>>>>>> > state. If the bundle succeeds the runner passes in the
>>>>>> set of tokens
>>>>>> > which represent the new state, if the bundle fails you
>>>>>> process
>>>>>> using the
>>>>>> > original ones.
>>>>>> >
>>>>>> > After thinking through the implementation again, we could
>>>>>> supply two
>>>>>> > cache tokens for each state id, the first being the set
>>>>>> of initial
>>>>>> > tokens if no writes happen while the second represents
>>>>>> the token
>>>>>> to use
>>>>>> > if the SDK changes the state. This gives us the
>>>>>> simplification
>>>>>> where we
>>>>>> > don't need to wait for the response before we update the
>>>>>> global cache
>>>>>> > making a typical blocking cache much easier to do. We
>>>>>> also get the
>>>>>> > benefit that runners can supply either the same cache
>>>>>> token for a
>>>>>> state
>>>>>> > id or different ones. If the runner supplies the same one
>>>>>> then its
>>>>>> > telling the SDK to make modifications in place without
>>>>>> any rollback
>>>>>> > (which is good on memory since we are reducing copies of
>>>>>> stuff) or if
>>>>>> > the runner supplies two different ones then its telling
>>>>>> the SDK
>>>>>> to keep
>>>>>> > the old data around. If we went through with this new
>>>>>> option the SDK
>>>>>> > side logic would be (assuming there is a global cache of
>>>>>> cache
>>>>>> token ->
>>>>>> > (map of state key -> data)):
>>>>>> >
>>>>>> > 1) Create an empty local set of state ids that are dirty
>>>>>> when
>>>>>> starting a
>>>>>> > new bundle (dirty set)
>>>>>> >
>>>>>> > For reads/gets:
>>>>>> > 2A) If the request is a read (get), use dirty set to
>>>>>> choose which
>>>>>> cache
>>>>>> > token to lookup and use in the global cache. If the
>>>>>> global cache is
>>>>>> > missing data issue the appropriate request providing the
>>>>>> result.
>>>>>> >
>>>>>> > For writes/appends/clear:
>>>>>> > 2B) if the cache tokens are different for the state id,
>>>>>> add the
>>>>>> state id
>>>>>> > to the dirty set if it isn't there and perform the
>>>>>> appropriate
>>>>>> > modification to convert the old cached state data to the new
>>>>>> state data
>>>>>> > 3B) modify the global caches data
>>>>>> > 4B) issue the request to the runner
>>>>>> > 5B*) add this request to the set of requests to block on
>>>>>> before
>>>>>> > completing the bundle.
>>>>>> >
>>>>>> > (* Note, there was another idea to update the process bundle
>>>>>> response to
>>>>>> > contain the id of the last state request which would
>>>>>> allow the
>>>>>> runner to
>>>>>> > know when it has seen the last state request allowing the
>>>>>> SDK to not
>>>>>> > block at all when finishing the bundle)
>>>>>> >
>>>>>> > On Thu, Aug 22, 2019 at 10:11 AM Maximilian Michels
>>>>>> <m...@apache.org <mailto:m...@apache.org>
>>>>>> > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>>>>>> >
>>>>>> > Just to give a quick update here. Rakesh, Thomas, and
>>>>>> I had a
>>>>>> discussion
>>>>>> > about async writes from the Python SDK to the Runner.
>>>>>> Robert
>>>>>> was also
>>>>>> > present for some parts of the discussion.
>>>>>> >
>>>>>> > We concluded that blocking writes with the need to
>>>>>> refresh
>>>>>> the cache
>>>>>> > token each time are not going to provide enough
>>>>>> throughput/latency.
>>>>>> >
>>>>>> > We figured that it will be enough to use a single
>>>>>> cache token per
>>>>>> > Runner<=>SDK Harness connection. This cache token
>>>>>> will be
>>>>>> provided by
>>>>>> > the Runner in the ProcessBundleRequest. Writes will
>>>>>> not yield
>>>>>> a new
>>>>>> > cache token. The advantage is that we can use one
>>>>>> cache token
>>>>>> for the
>>>>>> > life time of the bundle and also across bundles,
>>>>>> unless the
>>>>>> Runner
>>>>>> > switches to a new Runner<=>SDK Harness connection;
>>>>>> then the
>>>>>> Runner would
>>>>>> > have to generate a new cache token.
>>>>>> >
>>>>>> > We might require additional cache tokens for the side
>>>>>> inputs.
>>>>>> For now,
>>>>>> > I'm planning to only tackle user state which seems to
>>>>>> be the
>>>>>> area where
>>>>>> > users have expressed the most need for caching.
>>>>>> >
>>>>>> > -Max
>>>>>> >
>>>>>> > On 21.08.19 20:05, Maximilian Michels wrote:
>>>>>> > >> There is probably a misunderstanding here: I'm
>>>>>> suggesting
>>>>>> to use
>>>>>> > a worker ID instead of cache tokens, not additionally.
>>>>>> > >
>>>>>> > > Ah! Misread that. We need a changing token to
>>>>>> indicate that the
>>>>>> > cache is
>>>>>> > > stale, e.g. checkpoint has failed / restoring from
>>>>>> an old
>>>>>> > checkpoint. If
>>>>>> > > the _Runner_ generates a new unique token/id for
>>>>>> workers
>>>>>> which outlast
>>>>>> > > the Runner, then this should work fine. I don't
>>>>>> think it is
>>>>>> safe
>>>>>> > for the
>>>>>> > > worker to supply the id. The Runner should be in
>>>>>> control of
>>>>>> cache
>>>>>> > tokens
>>>>>> > > to avoid invalid tokens.
>>>>>> > >
>>>>>> > >> In the PR the token is modified as part of
>>>>>> updating the state.
>>>>>> > Doesn't the SDK need the new token to update it's
>>>>>> cache entry
>>>>>> also?
>>>>>> > That's where it would help the SDK to know the new
>>>>>> token upfront.
>>>>>> > >
>>>>>> > > If the state is updated in the Runner, a new token
>>>>>> has to be
>>>>>> > generated.
>>>>>> > > The old one is not valid anymore. The SDK will use
>>>>>> the updated
>>>>>> > token to
>>>>>> > > store the new value in the cache. I understand that
>>>>>> it would be
>>>>>> > nice to
>>>>>> > > know the token upfront. That could be possible with
>>>>>> some token
>>>>>> > > generation scheme. On the other hand, writes can be
>>>>>> asynchronous and
>>>>>> > > thus not block the UDF.
>>>>>> > >
>>>>>> > >> But I believe there is no need to change the token
>>>>>> in first
>>>>>> > place, unless bundles for the same key (ranges) can be
>>>>>> processed by
>>>>>> > different workers.
>>>>>> > >
>>>>>> > > That's certainly possible, e.g. two workers A and B
>>>>>> take turn
>>>>>> > processing
>>>>>> > > a certain key range, one bundle after another:
>>>>>> > >
>>>>>> > > You process a bundle with a token T with A, then
>>>>>> worker B
>>>>>> takes over.
>>>>>> > > Both have an entry with cache token T. So B goes on to
>>>>>> modify the
>>>>>> > state
>>>>>> > > and uses the same cache token T. Then A takes over
>>>>>> again. A
>>>>>> would
>>>>>> > have a
>>>>>> > > stale cache entry but T would still be a valid
>>>>>> cache token.
>>>>>> > >
>>>>>> > >> Indeed the fact that Dataflow can dynamically
>>>>>> split and merge
>>>>>> > these ranges is what makes it trickier. If Flink does
>>>>>> not
>>>>>> > repartition the ranges, then things are much easier.
>>>>>> > >
>>>>>> > > Flink does not dynamically repartition key ranges
>>>>>> (yet). If
>>>>>> it started
>>>>>> > > to support that, we would invalidate the cache
>>>>>> tokens for
>>>>>> the changed
>>>>>> > > partitions.
>>>>>> > >
>>>>>> > >
>>>>>> > > I'd suggest the following cache token generation
>>>>>> scheme:
>>>>>> > >
>>>>>> > > One cache token per key range for user state and
>>>>>> one cache
>>>>>> token for
>>>>>> > > each side input. On writes to user state or
>>>>>> changing side
>>>>>> input, the
>>>>>> > > associated cache token will be renewed.
>>>>>> > >
>>>>>> > > On the SDK side, it should be sufficient to let the
>>>>>> SDK
>>>>>> > re-associate all
>>>>>> > > its cached data belonging to a valid cache token
>>>>>> with a new
>>>>>> cache
>>>>>> > token
>>>>>> > > returned by a successful write. This has to happen
>>>>>> in the
>>>>>> active scope
>>>>>> > > (i.e. user state, or a particular side input).
>>>>>> > >
>>>>>> > > If the key range changes, new cache tokens have to
>>>>>> generated. This
>>>>>> > > should happen automatically because the Runner does
>>>>>> not
>>>>>> checkpoint
>>>>>> > cache
>>>>>> > > tokens and will generate new ones when it restarts
>>>>>> from an
>>>>>> earlier
>>>>>> > > checkpoint.
>>>>>> > >
>>>>>> > > The current PR needs to be changed to (1) only keep a
>>>>>> single cache
>>>>>> > token
>>>>>> > > per user state and key range (2) add support for cache
>>>>>> tokens for each
>>>>>> > > side input.
>>>>>> > >
>>>>>> > > Hope that makes sense.
>>>>>> > >
>>>>>> > > -Max
>>>>>> > >
>>>>>> > > On 21.08.19 17:27, Reuven Lax wrote:
>>>>>> > >>
>>>>>> > >>
>>>>>> > >> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels
>>>>>> > <m...@apache.org <mailto:m...@apache.org>
>>>>>> <mailto:m...@apache.org <mailto:m...@apache.org>>
>>>>>> > >> <mailto:m...@apache.org <mailto:m...@apache.org>
>>>>>> <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
>>>>>> > >>
>>>>>> > >> Appreciate all your comments! Replying below.
>>>>>> > >>
>>>>>> > >>
>>>>>> > >> @Luke:
>>>>>> > >>
>>>>>> > >> > Having cache tokens per key would be very
>>>>>> expensive
>>>>>> indeed
>>>>>> > and I
>>>>>> > >> believe we should go with a single cache token
>>>>>> "per"
>>>>>> bundle.
>>>>>> > >>
>>>>>> > >> Thanks for your comments on the PR. I was
>>>>>> thinking to
>>>>>> propose
>>>>>> > something
>>>>>> > >> along this lines of having cache tokens valid
>>>>>> for a
>>>>>> particular
>>>>>> > >> checkpointing "epoch". That would require even
>>>>>> less token
>>>>>> > renewal than
>>>>>> > >> the per-bundle approach.
>>>>>> > >>
>>>>>> > >>
>>>>>> > >> @Thomas, thanks for the input. Some remarks:
>>>>>> > >>
>>>>>> > >> > Wouldn't it be simpler to have the runner
>>>>>> just track a
>>>>>> > unique ID
>>>>>> > >> for each worker and use that to communicate if
>>>>>> the
>>>>>> cache is
>>>>>> > valid or
>>>>>> > >> not?
>>>>>> > >>
>>>>>> > >> We do not need a unique id per worker. If a
>>>>>> cache token is
>>>>>> > valid for a
>>>>>> > >> particular worker, it is also valid for another
>>>>>> worker. That
>>>>>> > is with the
>>>>>> > >> assumption that key ranges are always disjoint
>>>>>> between the
>>>>>> > workers.
>>>>>> > >>
>>>>>> > >> > * When the bundle is started, the runner
>>>>>> tells the
>>>>>> worker
>>>>>> > if the
>>>>>> > >> cache has become invalid (since it knows if
>>>>>> another
>>>>>> worker has
>>>>>> > >> mutated state)
>>>>>> > >>
>>>>>> > >> This is simply done by not transferring the
>>>>>> particular
>>>>>> cache
>>>>>> > token. No
>>>>>> > >> need to declare it invalid explicitly.
>>>>>> > >>
>>>>>> > >> > * When the worker sends mutation requests to
>>>>>> the
>>>>>> runner, it
>>>>>> > >> includes its own ID (or the runner already has
>>>>>> it as
>>>>>> contextual
>>>>>> > >> information). No need to wait for a response.
>>>>>> > >>
>>>>>> > >> Mutations of cached values can be freely done
>>>>>> as long
>>>>>> as the
>>>>>> > cache token
>>>>>> > >> associated with the state is valid for a
>>>>>> particular
>>>>>> bundle.
>>>>>> > Only the
>>>>>> > >> first time, the Runner needs to wait on the
>>>>>> response
>>>>>> to store
>>>>>> > the cache
>>>>>> > >> token. This can also be done asynchronously.
>>>>>> > >>
>>>>>> > >> > * When the bundle is finished, the runner
>>>>>> records
>>>>>> the last
>>>>>> > writer
>>>>>> > >> (only if a change occurred)
>>>>>> > >>
>>>>>> > >> I believe this is not necessary because there
>>>>>> will only be
>>>>>> > one writer at
>>>>>> > >> a time for a particular bundle and key range,
>>>>>> hence
>>>>>> only one
>>>>>> > writer
>>>>>> > >> holds a valid cache token for a particular
>>>>>> state and
>>>>>> key range.
>>>>>> > >>
>>>>>> > >>
>>>>>> > >> @Reuven:
>>>>>> > >>
>>>>>> > >> > Dataflow divides the keyspace up into
>>>>>> lexicographic
>>>>>> > ranges, and
>>>>>> > >> creates a cache token per range.
>>>>>> > >>
>>>>>> > >> State is always processed partitioned by the
>>>>>> Flink workers
>>>>>> > (hash-based,
>>>>>> > >> not lexicopgrahical). I don't think that
>>>>>> matters though
>>>>>> > because the key
>>>>>> > >> ranges do not overlap between the workers.
>>>>>> Flink does
>>>>>> not support
>>>>>> > >> dynamically repartitioning the key ranges.
>>>>>> Even in case of
>>>>>> > fine-grained
>>>>>> > >> recovery of workers and their key ranges, we
>>>>>> would simply
>>>>>> > generate new
>>>>>> > >> cache tokens for a particular worker.
>>>>>> > >>
>>>>>> > >>
>>>>>> > >> Dataflow's ranges are also hash based. When I said
>>>>>> lexicographical, I
>>>>>> > >> meant lexicographical based on the hexadecimal
>>>>>> hash value.
>>>>>> > >>
>>>>>> > >> Indeed the fact that Dataflow can dynamically
>>>>>> split and
>>>>>> merge these
>>>>>> > >> ranges is what makes it trickier. If Flink does not
>>>>>> repartition the
>>>>>> > >> ranges, then things are much easier.
>>>>>> > >>
>>>>>> > >>
>>>>>> > >>
>>>>>> > >> Thanks,
>>>>>> > >> Max
>>>>>> > >>
>>>>>> > >> On 21.08.19 09:33, Reuven Lax wrote:
>>>>>> > >> > Dataflow does something like this, however
>>>>>> since work is
>>>>>> > >> > load balanced across workers a per-worker id
>>>>>> doesn't
>>>>>> work
>>>>>> > very well.
>>>>>> > >> > Dataflow divides the keyspace up into
>>>>>> lexicographic
>>>>>> ranges, and
>>>>>> > >> creates
>>>>>> > >> > a cache token per range.
>>>>>> > >> >
>>>>>> > >> > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise
>>>>>> > <t...@apache.org <mailto:t...@apache.org>
>>>>>> <mailto:t...@apache.org <mailto:t...@apache.org>>
>>>>>> > >> <mailto:t...@apache.org <mailto:t...@apache.org>
>>>>>> <mailto:t...@apache.org <mailto:t...@apache.org>>>
>>>>>> > >> > <mailto:t...@apache.org <mailto:t...@apache.org>
>>>>>> <mailto:t...@apache.org <mailto:t...@apache.org>>
>>>>>> > <mailto:t...@apache.org <mailto:t...@apache.org>
>>>>>> <mailto:t...@apache.org <mailto:t...@apache.org>>>>> wrote:
>>>>>> > >> >
>>>>>> > >> > Commenting here vs. on the PR since
>>>>>> related to
>>>>>> the overall
>>>>>> > >> approach.
>>>>>> > >> >
>>>>>> > >> > Wouldn't it be simpler to have the
>>>>>> runner just
>>>>>> track a
>>>>>> > unique
>>>>>> > >> ID for
>>>>>> > >> > each worker and use that to communicate
>>>>>> if the
>>>>>> cache is
>>>>>> > valid
>>>>>> > >> or not?
>>>>>> > >> >
>>>>>> > >> > * When the bundle is started, the runner
>>>>>> tells the
>>>>>> > worker if the
>>>>>> > >> > cache has become invalid (since it knows
>>>>>> if another
>>>>>> > worker has
>>>>>> > >> > mutated state)
>>>>>> > >> > * When the worker sends mutation
>>>>>> requests to the
>>>>>> runner, it
>>>>>> > >> includes
>>>>>> > >> > its own ID (or the runner already has it as
>>>>>> contextual
>>>>>> > >> information).
>>>>>> > >> > No need to wait for a response.
>>>>>> > >> > * When the bundle is finished, the runner
>>>>>> records the
>>>>>> > last writer
>>>>>> > >> > (only if a change occurred)
>>>>>> > >> >
>>>>>> > >> > Whenever current worker ID and last
>>>>>> writer ID
>>>>>> doesn't
>>>>>> > match, cache
>>>>>> > >> > is invalid.
>>>>>> > >> >
>>>>>> > >> > Thomas
>>>>>> > >> >
>>>>>> > >> >
>>>>>> > >> > On Tue, Aug 20, 2019 at 11:42 AM Lukasz
>>>>>> Cwik
>>>>>> > <lc...@google.com <mailto:lc...@google.com>
>>>>>> <mailto:lc...@google.com <mailto:lc...@google.com>>
>>>>>> > >> <mailto:lc...@google.com
>>>>>> <mailto:lc...@google.com>
>>>>>> <mailto:lc...@google.com <mailto:lc...@google.com>>>
>>>>>> > >> > <mailto:lc...@google.com
>>>>>> <mailto:lc...@google.com> <mailto:lc...@google.com
>>>>>> <mailto:lc...@google.com>>
>>>>>> > <mailto:lc...@google.com <mailto:lc...@google.com>
>>>>>> <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote:
>>>>>> > >> >
>>>>>> > >> > Having cache tokens per key would be
>>>>>> very
>>>>>> expensive
>>>>>> > indeed
>>>>>> > >> and I
>>>>>> > >> > believe we should go with a single
>>>>>> cache token
>>>>>> > "per" bundle.
>>>>>> > >> >
>>>>>> > >> > On Mon, Aug 19, 2019 at 11:36 AM
>>>>>> Maximilian
>>>>>> Michels
>>>>>> > >> > <m...@apache.org <mailto:m...@apache.org>
>>>>>> <mailto:m...@apache.org <mailto:m...@apache.org>>
>>>>>> > <mailto:m...@apache.org <mailto:m...@apache.org>
>>>>>> <mailto:m...@apache.org <mailto:m...@apache.org>>>
>>>>>> > >> <mailto:m...@apache.org <mailto:m...@apache.org>
>>>>>> <mailto:m...@apache.org <mailto:m...@apache.org>>
>>>>>> > <mailto:m...@apache.org <mailto:m...@apache.org>
>>>>>> <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
>>>>>> > >> >
>>>>>> > >> > Maybe a Beam Python expert can
>>>>>> chime in for
>>>>>> > Rakesh's
>>>>>> > >> question?
>>>>>> > >> >
>>>>>> > >> > Luke, I was assuming cache
>>>>>> tokens to be
>>>>>> per key
>>>>>> > and state
>>>>>> > >> > id. During
>>>>>> > >> > implementing an initial support
>>>>>> on the
>>>>>> Runner
>>>>>> > side, I
>>>>>> > >> > realized that we
>>>>>> > >> > probably want cache tokens to
>>>>>> only be
>>>>>> per state
>>>>>> > id. Note
>>>>>> > >> > that if we had
>>>>>> > >> > per-key cache tokens, the number
>>>>>> of cache
>>>>>> > tokens would
>>>>>> > >> > approach the
>>>>>> > >> > total number of keys in an
>>>>>> application.
>>>>>> > >> >
>>>>>> > >> > If anyone wants to have a look,
>>>>>> here is
>>>>>> a first
>>>>>> > version of
>>>>>> > >> > the Runner
>>>>>> > >> > side for cache tokens. Note that
>>>>>> I only
>>>>>> > implemented cache
>>>>>> > >> > tokens for
>>>>>> > >> > BagUserState for now, but it can
>>>>>> be easily
>>>>>> > added for side
>>>>>> > >> > inputs as well.
>>>>>> > >> >
>>>>>> > >> > https://github.com/apache/beam/pull/9374
>>>>>> > >> >
>>>>>> > >> > -Max
>>>>>> > >> >
>>>>>> > >> >
>>>>>> > >>
>>>>>> >
>>>>>>