I've tried to put the current design into code. Any feedback appreciated for 
these changes to enable caching of user state:

Proto: https://github.com/apache/beam/pull/9440
Runner: https://github.com/apache/beam/pull/9374
Python SDK: https://github.com/apache/beam/pull/9418

Thanks,
Max

On 28.08.19 11:48, Maximilian Michels wrote:
> Just to clarify, the repeated list of cache tokens in the process
> bundle request is used to validate reading *and* stored when writing?
> In that sense, should they just be called version identifiers or
> something like that?

We could call them version identifiers, though cache tokens were always
a means to identify versions of a state.

On 28.08.19 11:10, Maximilian Michels wrote:
>> cachetools sounds like a fine choice to me.
>
> For the first version I've implemented a simple LRU cache. If you want
> to have a look:
> 
https://github.com/apache/beam/pull/9418/files#diff-ed2d70e99442b6e1668e30409d3383a6R60
>
>
>> Open up a PR for the proto changes and we can work through any minor
>> comments there.
>
> Proto changes: https://github.com/apache/beam/pull/9440
>
>
> Thanks,
> Max
>
> On 27.08.19 23:00, Robert Bradshaw wrote:
>> Just to clarify, the repeated list of cache tokens in the process
>> bundle request is used to validate reading *and* stored when writing?
>> In that sense, should they just be called version identifiers or
>> something like that?
>>
>> On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels <m...@apache.org>
>> wrote:
>>>
>>> Thanks. Updated:
>>>
>>> message ProcessBundleRequest {
>>>    // (Required) A reference to the process bundle descriptor that
>>> must be
>>>    // instantiated and executed by the SDK harness.
>>>    string process_bundle_descriptor_reference = 1;
>>>
>>>    // A cache token which can be used by an SDK to check for the
>>> validity
>>>    // of cached elements which have a cache token associated.
>>>    message CacheToken {
>>>
>>>      // A flag to indicate a cache token is valid for user state.
>>>      message UserState {}
>>>
>>>      // A flag to indicate a cache token is valid for a side input.
>>>      message SideInput {
>>>        // The id of a side input.
>>>        string side_input = 1;
>>>      }
>>>
>>>      // The scope of a cache token.
>>>      oneof type {
>>>        UserState user_state = 1;
>>>        SideInput side_input = 2;
>>>      }
>>>
>>>      // The cache token identifier which should be globally unique.
>>>      bytes token = 10;
>>>    }
>>>
>>>    // (Optional) A list of cache tokens that can be used by an SDK
>>> to reuse
>>>    // cached data returned by the State API across multiple bundles.
>>>    repeated CacheToken cache_tokens = 2;
>>> }
>>>
>>> On 27.08.19 19:22, Lukasz Cwik wrote:
>>>
>>> SideInputState -> SideInput (side_input_state -> side_input)
>>> + more comments around the messages and the fields.
>>>
>>>
>>> On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels <m...@apache.org>
>>> wrote:
>>>>
>>>> We would have to differentiate cache tokens for user state and side
>>>> inputs. How about something like this?
>>>>
>>>> message ProcessBundleRequest {
>>>>    // (Required) A reference to the process bundle descriptor that
>>>> must be
>>>>    // instantiated and executed by the SDK harness.
>>>>    string process_bundle_descriptor_reference = 1;
>>>>
>>>>    message CacheToken {
>>>>
>>>>      message UserState {
>>>>      }
>>>>
>>>>      message SideInputState {
>>>>        string side_input_id = 1;
>>>>      }
>>>>
>>>>      oneof type {
>>>>        UserState user_state = 1;
>>>>        SideInputState side_input_state = 2;
>>>>      }
>>>>
>>>>      bytes token = 10;
>>>>    }
>>>>
>>>>    // (Optional) A list of cache tokens that can be used by an SDK
>>>> to reuse
>>>>    // cached data returned by the State API across multiple bundles.
>>>>    repeated CacheToken cache_tokens = 2;
>>>> }
>>>>
>>>> -Max
>>>>
>>>> On 27.08.19 18:43, Lukasz Cwik wrote:
>>>>
>>>> The bundles view of side inputs should never change during
>>>> processing and should have a point in time snapshot.
>>>>
>>>> I was just trying to say that the cache token for side inputs being
>>>> deferred till side input request time simplified the runners
>>>> implementation since that is conclusively when the runner would
>>>> need to take a look at the side input. Putting them as part of the
>>>> ProcesBundleRequest complicates that but does make the SDK
>>>> implementation significantly simpler which is a win.
>>>>
>>>> On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels <m...@apache.org>
>>>> wrote:
>>>>>
>>>>> Thanks for the quick response.
>>>>>
>>>>> Just to clarify, the issue with versioning side input is also present
>>>>> when supplying the cache tokens on a request basis instead of per
>>>>> bundle. The SDK never knows when the Runner receives a new version of
>>>>> the side input. Like you pointed out, it needs to mark side inputs as
>>>>> stale and generate new cache tokens for the stale side inputs.
>>>>>
>>>>> The difference between per-request tokens and per-bundle tokens
>>>>> would be
>>>>> that the side input can only change after a bundle completes vs.
>>>>> during
>>>>> the bundle. Side inputs are always fuzzy in that regard because
>>>>> there is
>>>>> no precise instance where side inputs are atomically updated,
>>>>> other than
>>>>> the assumption that they eventually will be updated. In that regard
>>>>> per-bundle tokens for side input seem to be fine.
>>>>>
>>>>> All of the above is not an issue for user state, as its cache can
>>>>> remain
>>>>> valid for the lifetime of a Runner<=>SDK Harness connection. A simple
>>>>> solution would be to not cache side input because there are many
>>>>> cases
>>>>> where the caching just adds additional overhead. However, I can also
>>>>> imagine cases where side input is valid forever and caching would be
>>>>> very beneficial.
>>>>>
>>>>> For the first version I want to focus on user state because that's
>>>>> where
>>>>> I see the most benefit for caching. I don't see a problem though
>>>>> for the
>>>>> Runner to detect new side input and reflect that in the cache tokens
>>>>> supplied for a new bundle.
>>>>>
>>>>> -Max
>>>>>
>>>>> On 26.08.19 22:27, Lukasz Cwik wrote:
>>>>>> Your summary below makes sense to me. I can see that recovery from
>>>>>> rolling back doesn't need to be a priority and simplifies the
>>>>>> solution
>>>>>> for user state caching down to one token.
>>>>>>
>>>>>> Providing cache tokens upfront does require the Runner to know what
>>>>>> "version" of everything it may supply to the SDK upfront (instead
>>>>>> of on
>>>>>> request) which would mean that the Runner may need to have a mapping
>>>>>> from cache token to internal version identifier for things like side
>>>>>> inputs which are typically broadcast. The Runner would also need
>>>>>> to poll
>>>>>> to see if the side input has changed in the background to not block
>>>>>> processing bundles with "stale" side input data.
>>>>>>
>>>>>> Ping me once you have the Runner PR updated and I'll take a look
>>>>>> again.
>>>>>>
>>>>>> On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels <m...@apache.org
>>>>>> <mailto:m...@apache.org>> wrote:
>>>>>>
>>>>>>      Thank you for the summary Luke. I really appreciate the
>>>>>> effort you put
>>>>>>      into this!
>>>>>>
>>>>>>       > Based upon your discussion you seem to want option #1
>>>>>>
>>>>>>      I'm actually for option #2. The option to cache/invalidate
>>>>>> side inputs
>>>>>>      is important, and we should incorporate this in the design.
>>>>>> That's why
>>>>>>      option #1 is not flexible enough. However, a first
>>>>>> implementation could
>>>>>>      defer caching of side inputs.
>>>>>>
>>>>>>      Option #3 was my initial thinking and the first version of
>>>>>> the PR, but I
>>>>>>      think we agreed that there wouldn't be much gain from
>>>>>> keeping a cache
>>>>>>      token per state id.
>>>>>>
>>>>>>      Option #4 is what is specifically documented in the
>>>>>> reference doc and
>>>>>>      already part of the Proto, where valid tokens are provided
>>>>>> for each new
>>>>>>      bundle and also as part of the response of a get/put/clear.
>>>>>> We mentioned
>>>>>>      that the reply does not have to be waited on synchronously
>>>>>> (I mentioned
>>>>>>      it even), but it complicates the implementation. The idea
>>>>>> Thomas and I
>>>>>>      expressed was that a response is not even necessary if we
>>>>>> assume
>>>>>>      validity of the upfront provided cache tokens for the
>>>>>> lifetime of a
>>>>>>      bundle and that cache tokens will be invalidated as soon as
>>>>>> the Runner
>>>>>>      fails in any way. This is naturally the case for Flink
>>>>>> because it will
>>>>>>      simply "forget" its current cache tokens.
>>>>>>
>>>>>>      I currently envision the following schema:
>>>>>>
>>>>>>      Runner
>>>>>>      ======
>>>>>>
>>>>>>      - Runner generates a globally unique cache token, one for
>>>>>> user state and
>>>>>>      one for each side input
>>>>>>
>>>>>>      - The token is supplied to the SDK Harness for each bundle
>>>>>> request
>>>>>>
>>>>>>      - For the lifetime of a Runner<=>SDK Harness connection this
>>>>>> cache token
>>>>>>      will not change
>>>>>>      - Runner will generate a new token if the connection/key
>>>>>> space changes
>>>>>>      between Runner and SDK Harness
>>>>>>
>>>>>>
>>>>>>      SDK
>>>>>>      ===
>>>>>>
>>>>>>      - For each bundle the SDK worker stores the list of valid
>>>>>> cache tokens
>>>>>>      - The SDK Harness keep a global cache across all its (local)
>>>>>> workers
>>>>>>      which is a LRU cache: state_key => (cache_token, value)
>>>>>>      - get: Lookup cache using the valid cache token for the
>>>>>> state. If no
>>>>>>      match, then fetch from Runner and use the already available
>>>>>> token for
>>>>>>      caching
>>>>>>      - put: Put value in cache with a valid cache token, put
>>>>>> value to pending
>>>>>>      writes which will be flushed out latest when the bundle ends
>>>>>>      - clear: same as put but clear cache
>>>>>>
>>>>>>      It does look like this is not too far off from what you were
>>>>>> describing.
>>>>>>      The main difference is that we just work with a single cache
>>>>>> token. In
>>>>>>      my opinion we do not need the second cache token for writes,
>>>>>> as long as
>>>>>>      we ensure that we generate a new cache token if the
>>>>>>      bundle/checkpoint fails.
>>>>>>
>>>>>>      I have a draft PR
>>>>>>         for the Runner: https://github.com/apache/beam/pull/9374
>>>>>>         for the SDK: https://github.com/apache/beam/pull/9418
>>>>>>
>>>>>>      Note that the Runner PR needs to be updated to fully
>>>>>> reflected the above
>>>>>>      scheme. The SDK implementation is WIP. I want to make sure
>>>>>> that we
>>>>>>      clarify the design before this gets finalized.
>>>>>>
>>>>>>      Thanks again for all your comments. Much appreciated!
>>>>>>
>>>>>>      Cheers,
>>>>>>      Max
>>>>>>
>>>>>>      On 26.08.19 19:58, Lukasz Cwik wrote:
>>>>>>       > There were originally a couple of ideas around how
>>>>>> caching could
>>>>>>      work:
>>>>>>       > 1) One cache token for the entire bundle that is supplied up
>>>>>>      front. The
>>>>>>       > SDK caches everything using the given token. All
>>>>>>      reads/clear/append for
>>>>>>       > all types of state happen under this token. Anytime a
>>>>>> side input
>>>>>>       > changes, key processing partition range changes or a
>>>>>> bundle fails to
>>>>>>       > process, the runner chooses a new cache token effectively
>>>>>>      invalidating
>>>>>>       > everything in the past>
>>>>>>       > 2) One cache token per type of state that is supplied up
>>>>>> front.
>>>>>>       > The SDK caches all requests for a given type using the
>>>>>> given cache
>>>>>>       > token. The runner can selectively choose which type to
>>>>>> keep and
>>>>>>      which to
>>>>>>       > invalidate. Bundle failure and key processing partition
>>>>>> changes
>>>>>>       > invalidate all user state, side input change invalidates
>>>>>> all side
>>>>>>      inputs.
>>>>>>       >
>>>>>>       > 3) One cache token per state id that is supplied up front.
>>>>>>       > The SDK caches all requests for the given state id using the
>>>>>>      given cache
>>>>>>       > token. The runner can selectively choose which to
>>>>>> invalidate and
>>>>>>      which
>>>>>>       > to keep. Bundle failure and key processing partition changes
>>>>>>      invalidate
>>>>>>       > all user state, side input changes only invalidate the
>>>>>> side input
>>>>>>      that
>>>>>>       > changed.
>>>>>>       >
>>>>>>       > 4) A cache token on each read/clear/append that is
>>>>>> supplied on the
>>>>>>       > response of the call with an initial valid set that is
>>>>>> supplied at
>>>>>>       > start. The runner can selectively choose which to keep on
>>>>>> start.
>>>>>>      Bundle
>>>>>>       > failure allows runners to "roll back" to a known good
>>>>>> state by
>>>>>>      selecting
>>>>>>       > the previous valid cache token as part of the initial
>>>>>> set. Key
>>>>>>       > processing partition changes allow runners to keep cached
>>>>>> state that
>>>>>>       > hasn't changed since it can be tied to a version number
>>>>>> of the state
>>>>>>       > itself as part of the initial set. Side input changes
>>>>>> only invalidate
>>>>>>       > the side input that changed.
>>>>>>       >
>>>>>>       > Based upon your discussion you seem to want option #1 which
>>>>>>      doesn't work
>>>>>>       > well with side inputs clearing cached state. If we want
>>>>>> to have user
>>>>>>       > state survive a changing side input, we would want one of
>>>>>> the other
>>>>>>       > options. I do agree that supplying the cache token
>>>>>> upfront is
>>>>>>       > significantly simpler. Currently the protos are setup for
>>>>>> #4 since it
>>>>>>       > was the most flexible and at the time the pros outweighed
>>>>>> the cons.
>>>>>>       >
>>>>>>       > I don't understand why you think you need to wait for a
>>>>>> response
>>>>>>      for the
>>>>>>       > append/clear to get its cache token since the only reason
>>>>>> you
>>>>>>      need the
>>>>>>       > cache token is that you want to use that cached data when
>>>>>>      processing a
>>>>>>       > different bundle. I was thinking that the flow on the SDK
>>>>>> side
>>>>>>      would be
>>>>>>       > something like (assuming there is a global cache of cache
>>>>>> token
>>>>>>      -> (map
>>>>>>       > of state key -> data))
>>>>>>       > 1) Create a local cache of (map of state key -> data)
>>>>>> using the
>>>>>>      initial
>>>>>>       > set of valid cache tokens
>>>>>>       > 2) Make all mutations in place on local cache without
>>>>>> waiting for
>>>>>>      response.
>>>>>>       > 3) When response comes back, update global cache with new
>>>>>> cache
>>>>>>      token ->
>>>>>>       > (map of state key -> data)) (this is when the data
>>>>>> becomes visible to
>>>>>>       > other bundles that start processing)
>>>>>>       > 4) Before the bundle finishes processing, wait for all
>>>>>>      outstanding state
>>>>>>       > calls to finish.
>>>>>>       >
>>>>>>       > To implement caching on the runner side, you would keep
>>>>>> track of
>>>>>>      at most
>>>>>>       > 2 cache tokens per state key, one cache token represents
>>>>>> the initial
>>>>>>       > value when the bundle started while the second represents
>>>>>> the
>>>>>>      modified
>>>>>>       > state. If the bundle succeeds the runner passes in the
>>>>>> set of tokens
>>>>>>       > which represent the new state, if the bundle fails you
>>>>>> process
>>>>>>      using the
>>>>>>       > original ones.
>>>>>>       >
>>>>>>       > After thinking through the implementation again, we could
>>>>>> supply two
>>>>>>       > cache tokens for each state id, the first being the set
>>>>>> of initial
>>>>>>       > tokens if no writes happen while the second represents
>>>>>> the token
>>>>>>      to use
>>>>>>       > if the SDK changes the state. This gives us the
>>>>>> simplification
>>>>>>      where we
>>>>>>       > don't need to wait for the response before we update the
>>>>>> global cache
>>>>>>       > making a typical blocking cache much easier to do. We
>>>>>> also get the
>>>>>>       > benefit that runners can supply either the same cache
>>>>>> token for a
>>>>>>      state
>>>>>>       > id or different ones. If the runner supplies the same one
>>>>>> then its
>>>>>>       > telling the SDK to make modifications in place without
>>>>>> any rollback
>>>>>>       > (which is good on memory since we are reducing copies of
>>>>>> stuff) or if
>>>>>>       > the runner supplies two different ones then its telling
>>>>>> the SDK
>>>>>>      to keep
>>>>>>       > the old data around. If we went through with this new
>>>>>> option the SDK
>>>>>>       > side logic would be (assuming there is a global cache of
>>>>>> cache
>>>>>>      token ->
>>>>>>       > (map of state key -> data)):
>>>>>>       >
>>>>>>       > 1) Create an empty local set of state ids that are dirty
>>>>>> when
>>>>>>      starting a
>>>>>>       > new bundle (dirty set)
>>>>>>       >
>>>>>>       > For reads/gets:
>>>>>>       > 2A) If the request is a read (get), use dirty set to
>>>>>> choose which
>>>>>>      cache
>>>>>>       > token to lookup and use in the global cache. If the
>>>>>> global cache is
>>>>>>       > missing data issue the appropriate request providing the
>>>>>> result.
>>>>>>       >
>>>>>>       > For writes/appends/clear:
>>>>>>       > 2B) if the cache tokens are different for the state id,
>>>>>> add the
>>>>>>      state id
>>>>>>       > to the dirty set if it isn't there and perform the
>>>>>> appropriate
>>>>>>       > modification to convert the old cached state data to the new
>>>>>>      state data
>>>>>>       > 3B) modify the global caches data
>>>>>>       > 4B) issue the request to the runner
>>>>>>       > 5B*) add this request to the set of requests to block on
>>>>>> before
>>>>>>       > completing the bundle.
>>>>>>       >
>>>>>>       > (* Note, there was another idea to update the process bundle
>>>>>>      response to
>>>>>>       > contain the id of the last state request which would
>>>>>> allow the
>>>>>>      runner to
>>>>>>       > know when it has seen the last state request allowing the
>>>>>> SDK to not
>>>>>>       > block at all when finishing the bundle)
>>>>>>       >
>>>>>>       > On Thu, Aug 22, 2019 at 10:11 AM Maximilian Michels
>>>>>>      <m...@apache.org <mailto:m...@apache.org>
>>>>>>       > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>>>>>>       >
>>>>>>       >     Just to give a quick update here. Rakesh, Thomas, and
>>>>>> I had a
>>>>>>      discussion
>>>>>>       >     about async writes from the Python SDK to the Runner.
>>>>>> Robert
>>>>>>      was also
>>>>>>       >     present for some parts of the discussion.
>>>>>>       >
>>>>>>       >     We concluded that blocking writes with the need to
>>>>>> refresh
>>>>>>      the cache
>>>>>>       >     token each time are not going to provide enough
>>>>>>      throughput/latency.
>>>>>>       >
>>>>>>       >     We figured that it will be enough to use a single
>>>>>> cache token per
>>>>>>       >     Runner<=>SDK Harness connection. This cache token
>>>>>> will be
>>>>>>      provided by
>>>>>>       >     the Runner in the ProcessBundleRequest. Writes will
>>>>>> not yield
>>>>>>      a new
>>>>>>       >     cache token. The advantage is that we can use one
>>>>>> cache token
>>>>>>      for the
>>>>>>       >     life time of the bundle and also across bundles,
>>>>>> unless the
>>>>>>      Runner
>>>>>>       >     switches to a new Runner<=>SDK Harness connection;
>>>>>> then the
>>>>>>      Runner would
>>>>>>       >     have to generate a new cache token.
>>>>>>       >
>>>>>>       >     We might require additional cache tokens for the side
>>>>>> inputs.
>>>>>>      For now,
>>>>>>       >     I'm planning to only tackle user state which seems to
>>>>>> be the
>>>>>>      area where
>>>>>>       >     users have expressed the most need for caching.
>>>>>>       >
>>>>>>       >     -Max
>>>>>>       >
>>>>>>       >     On 21.08.19 20:05, Maximilian Michels wrote:
>>>>>>       >     >> There is probably a misunderstanding here: I'm
>>>>>> suggesting
>>>>>>      to use
>>>>>>       >     a worker ID instead of cache tokens, not additionally.
>>>>>>       >     >
>>>>>>       >     > Ah! Misread that. We need a changing token to
>>>>>> indicate that the
>>>>>>       >     cache is
>>>>>>       >     > stale, e.g. checkpoint has failed / restoring from
>>>>>> an old
>>>>>>       >     checkpoint. If
>>>>>>       >     > the _Runner_ generates a new unique token/id for
>>>>>> workers
>>>>>>      which outlast
>>>>>>       >     > the Runner, then this should work fine. I don't
>>>>>> think it is
>>>>>>      safe
>>>>>>       >     for the
>>>>>>       >     > worker to supply the id. The Runner should be in
>>>>>> control of
>>>>>>      cache
>>>>>>       >     tokens
>>>>>>       >     > to avoid invalid tokens.
>>>>>>       >     >
>>>>>>       >     >> In the PR the token is modified as part of
>>>>>> updating the state.
>>>>>>       >     Doesn't the SDK need the new token to update it's
>>>>>> cache entry
>>>>>>      also?
>>>>>>       >     That's where it would help the SDK to know the new
>>>>>> token upfront.
>>>>>>       >     >
>>>>>>       >     > If the state is updated in the Runner, a new token
>>>>>> has to be
>>>>>>       >     generated.
>>>>>>       >     > The old one is not valid anymore. The SDK will use
>>>>>> the updated
>>>>>>       >     token to
>>>>>>       >     > store the new value in the cache. I understand that
>>>>>> it would be
>>>>>>       >     nice to
>>>>>>       >     > know the token upfront. That could be possible with
>>>>>> some token
>>>>>>       >     > generation scheme. On the other hand, writes can be
>>>>>>      asynchronous and
>>>>>>       >     > thus not block the UDF.
>>>>>>       >     >
>>>>>>       >     >> But I believe there is no need to change the token
>>>>>> in first
>>>>>>       >     place, unless bundles for the same key (ranges) can be
>>>>>>      processed by
>>>>>>       >     different workers.
>>>>>>       >     >
>>>>>>       >     > That's certainly possible, e.g. two workers A and B
>>>>>> take turn
>>>>>>       >     processing
>>>>>>       >     > a certain key range, one bundle after another:
>>>>>>       >     >
>>>>>>       >     > You process a bundle with a token T with A, then
>>>>>> worker B
>>>>>>      takes over.
>>>>>>       >     > Both have an entry with cache token T. So B goes on to
>>>>>>      modify the
>>>>>>       >     state
>>>>>>       >     > and uses the same cache token T. Then A takes over
>>>>>> again. A
>>>>>>      would
>>>>>>       >     have a
>>>>>>       >     > stale cache entry but T would still be a valid
>>>>>> cache token.
>>>>>>       >     >
>>>>>>       >     >> Indeed the fact that Dataflow can dynamically
>>>>>> split and merge
>>>>>>       >     these ranges is what makes it trickier. If Flink does
>>>>>> not
>>>>>>       >     repartition the ranges, then things are much easier.
>>>>>>       >     >
>>>>>>       >     > Flink does not dynamically repartition key ranges
>>>>>> (yet). If
>>>>>>      it started
>>>>>>       >     > to support that, we would invalidate the cache
>>>>>> tokens for
>>>>>>      the changed
>>>>>>       >     > partitions.
>>>>>>       >     >
>>>>>>       >     >
>>>>>>       >     > I'd suggest the following cache token generation
>>>>>> scheme:
>>>>>>       >     >
>>>>>>       >     > One cache token per key range for user state and
>>>>>> one cache
>>>>>>      token for
>>>>>>       >     > each side input. On writes to user state or
>>>>>> changing side
>>>>>>      input, the
>>>>>>       >     > associated cache token will be renewed.
>>>>>>       >     >
>>>>>>       >     > On the SDK side, it should be sufficient to let the
>>>>>> SDK
>>>>>>       >     re-associate all
>>>>>>       >     > its cached data belonging to a valid cache token
>>>>>> with a new
>>>>>>      cache
>>>>>>       >     token
>>>>>>       >     > returned by a successful write. This has to happen
>>>>>> in the
>>>>>>      active scope
>>>>>>       >     > (i.e. user state, or a particular side input).
>>>>>>       >     >
>>>>>>       >     > If the key range changes, new cache tokens have to
>>>>>>      generated. This
>>>>>>       >     > should happen automatically because the Runner does
>>>>>> not
>>>>>>      checkpoint
>>>>>>       >     cache
>>>>>>       >     > tokens and will generate new ones when it restarts
>>>>>> from an
>>>>>>      earlier
>>>>>>       >     > checkpoint.
>>>>>>       >     >
>>>>>>       >     > The current PR needs to be changed to (1) only keep a
>>>>>>      single cache
>>>>>>       >     token
>>>>>>       >     > per user state and key range (2) add support for cache
>>>>>>      tokens for each
>>>>>>       >     > side input.
>>>>>>       >     >
>>>>>>       >     > Hope that makes sense.
>>>>>>       >     >
>>>>>>       >     > -Max
>>>>>>       >     >
>>>>>>       >     > On 21.08.19 17:27, Reuven Lax wrote:
>>>>>>       >     >>
>>>>>>       >     >>
>>>>>>       >     >> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels
>>>>>>       >     <m...@apache.org <mailto:m...@apache.org>
>>>>>>      <mailto:m...@apache.org <mailto:m...@apache.org>>
>>>>>>       >     >> <mailto:m...@apache.org <mailto:m...@apache.org>
>>>>>>      <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
>>>>>>       >     >>
>>>>>>       >     >>     Appreciate all your comments! Replying below.
>>>>>>       >     >>
>>>>>>       >     >>
>>>>>>       >     >>     @Luke:
>>>>>>       >     >>
>>>>>>       >     >>     > Having cache tokens per key would be very
>>>>>> expensive
>>>>>>      indeed
>>>>>>       >     and I
>>>>>>       >     >>     believe we should go with a single cache token
>>>>>> "per"
>>>>>>      bundle.
>>>>>>       >     >>
>>>>>>       >     >>     Thanks for your comments on the PR. I was
>>>>>> thinking to
>>>>>>      propose
>>>>>>       >     something
>>>>>>       >     >>     along this lines of having cache tokens valid
>>>>>> for a
>>>>>>      particular
>>>>>>       >     >>     checkpointing "epoch". That would require even
>>>>>> less token
>>>>>>       >     renewal than
>>>>>>       >     >>     the per-bundle approach.
>>>>>>       >     >>
>>>>>>       >     >>
>>>>>>       >     >>     @Thomas, thanks for the input. Some remarks:
>>>>>>       >     >>
>>>>>>       >     >>     > Wouldn't it be simpler to have the runner
>>>>>> just track a
>>>>>>       >     unique ID
>>>>>>       >     >>     for each worker and use that to communicate if
>>>>>> the
>>>>>>      cache is
>>>>>>       >     valid or
>>>>>>       >     >>     not?
>>>>>>       >     >>
>>>>>>       >     >>     We do not need a unique id per worker. If a
>>>>>> cache token is
>>>>>>       >     valid for a
>>>>>>       >     >>     particular worker, it is also valid for another
>>>>>>      worker. That
>>>>>>       >     is with the
>>>>>>       >     >>     assumption that key ranges are always disjoint
>>>>>> between the
>>>>>>       >     workers.
>>>>>>       >     >>
>>>>>>       >     >>     > * When the bundle is started, the runner
>>>>>> tells the
>>>>>>      worker
>>>>>>       >     if the
>>>>>>       >     >>     cache has become invalid (since it knows if
>>>>>> another
>>>>>>      worker has
>>>>>>       >     >>     mutated state)
>>>>>>       >     >>
>>>>>>       >     >>     This is simply done by not transferring the
>>>>>> particular
>>>>>>      cache
>>>>>>       >     token. No
>>>>>>       >     >>     need to declare it invalid explicitly.
>>>>>>       >     >>
>>>>>>       >     >>     > * When the worker sends mutation requests to
>>>>>> the
>>>>>>      runner, it
>>>>>>       >     >>     includes its own ID (or the runner already has
>>>>>> it as
>>>>>>      contextual
>>>>>>       >     >>     information). No need to wait for a response.
>>>>>>       >     >>
>>>>>>       >     >>     Mutations of cached values can be freely done
>>>>>> as long
>>>>>>      as the
>>>>>>       >     cache token
>>>>>>       >     >>     associated with the state is valid for a
>>>>>> particular
>>>>>>      bundle.
>>>>>>       >     Only the
>>>>>>       >     >>     first time, the Runner needs to wait on the
>>>>>> response
>>>>>>      to store
>>>>>>       >     the cache
>>>>>>       >     >>     token. This can also be done asynchronously.
>>>>>>       >     >>
>>>>>>       >     >>     > * When the bundle is finished, the runner
>>>>>> records
>>>>>>      the last
>>>>>>       >     writer
>>>>>>       >     >>     (only if a change occurred)
>>>>>>       >     >>
>>>>>>       >     >>     I believe this is not necessary because there
>>>>>> will only be
>>>>>>       >     one writer at
>>>>>>       >     >>     a time for a particular bundle and key range,
>>>>>> hence
>>>>>>      only one
>>>>>>       >     writer
>>>>>>       >     >>     holds a valid cache token for a particular
>>>>>> state and
>>>>>>      key range.
>>>>>>       >     >>
>>>>>>       >     >>
>>>>>>       >     >>     @Reuven:
>>>>>>       >     >>
>>>>>>       >     >>     >  Dataflow divides the keyspace up into
>>>>>> lexicographic
>>>>>>       >     ranges, and
>>>>>>       >     >>     creates a cache token per range.
>>>>>>       >     >>
>>>>>>       >     >>     State is always processed partitioned by the
>>>>>> Flink workers
>>>>>>       >     (hash-based,
>>>>>>       >     >>     not lexicopgrahical). I don't think that
>>>>>> matters though
>>>>>>       >     because the key
>>>>>>       >     >>     ranges do not overlap between the workers.
>>>>>> Flink does
>>>>>>      not support
>>>>>>       >     >>     dynamically repartitioning the key ranges.
>>>>>> Even in case of
>>>>>>       >     fine-grained
>>>>>>       >     >>     recovery of workers and their key ranges, we
>>>>>> would simply
>>>>>>       >     generate new
>>>>>>       >     >>     cache tokens for a particular worker.
>>>>>>       >     >>
>>>>>>       >     >>
>>>>>>       >     >> Dataflow's ranges are also hash based. When I said
>>>>>>      lexicographical, I
>>>>>>       >     >> meant lexicographical based on the hexadecimal
>>>>>> hash value.
>>>>>>       >     >>
>>>>>>       >     >> Indeed the fact that Dataflow can dynamically
>>>>>> split and
>>>>>>      merge these
>>>>>>       >     >> ranges is what makes it trickier. If Flink does not
>>>>>>      repartition the
>>>>>>       >     >> ranges, then things are much easier.
>>>>>>       >     >>
>>>>>>       >     >>
>>>>>>       >     >>
>>>>>>       >     >>     Thanks,
>>>>>>       >     >>     Max
>>>>>>       >     >>
>>>>>>       >     >>     On 21.08.19 09:33, Reuven Lax wrote:
>>>>>>       >     >>     > Dataflow does something like this, however
>>>>>> since work is
>>>>>>       >     >>     > load balanced across workers a per-worker id
>>>>>> doesn't
>>>>>>      work
>>>>>>       >     very well.
>>>>>>       >     >>     > Dataflow divides the keyspace up into
>>>>>> lexicographic
>>>>>>      ranges, and
>>>>>>       >     >>     creates
>>>>>>       >     >>     > a cache token per range.
>>>>>>       >     >>     >
>>>>>>       >     >>     > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise
>>>>>>       >     <t...@apache.org <mailto:t...@apache.org>
>>>>>>      <mailto:t...@apache.org <mailto:t...@apache.org>>
>>>>>>       >     >>     <mailto:t...@apache.org <mailto:t...@apache.org>
>>>>>>      <mailto:t...@apache.org <mailto:t...@apache.org>>>
>>>>>>       >     >>     > <mailto:t...@apache.org <mailto:t...@apache.org>
>>>>>>      <mailto:t...@apache.org <mailto:t...@apache.org>>
>>>>>>       >     <mailto:t...@apache.org <mailto:t...@apache.org>
>>>>>>      <mailto:t...@apache.org <mailto:t...@apache.org>>>>> wrote:
>>>>>>       >     >>     >
>>>>>>       >     >>     >     Commenting here vs. on the PR since
>>>>>> related to
>>>>>>      the overall
>>>>>>       >     >>     approach.
>>>>>>       >     >>     >
>>>>>>       >     >>     >     Wouldn't it be simpler to have the
>>>>>> runner just
>>>>>>      track a
>>>>>>       >     unique
>>>>>>       >     >>     ID for
>>>>>>       >     >>     >     each worker and use that to communicate
>>>>>> if the
>>>>>>      cache is
>>>>>>       >     valid
>>>>>>       >     >>     or not?
>>>>>>       >     >>     >
>>>>>>       >     >>     >     * When the bundle is started, the runner
>>>>>> tells the
>>>>>>       >     worker if the
>>>>>>       >     >>     >     cache has become invalid (since it knows
>>>>>> if another
>>>>>>       >     worker has
>>>>>>       >     >>     >     mutated state)
>>>>>>       >     >>     >     * When the worker sends mutation
>>>>>> requests to the
>>>>>>      runner, it
>>>>>>       >     >>     includes
>>>>>>       >     >>     >     its own ID (or the runner already has it as
>>>>>>      contextual
>>>>>>       >     >>     information).
>>>>>>       >     >>     >     No need to wait for a response.
>>>>>>       >     >>     >     * When the bundle is finished, the runner
>>>>>>      records the
>>>>>>       >     last writer
>>>>>>       >     >>     >     (only if a change occurred)
>>>>>>       >     >>     >
>>>>>>       >     >>     >     Whenever current worker ID and last
>>>>>> writer ID
>>>>>>      doesn't
>>>>>>       >     match, cache
>>>>>>       >     >>     >     is invalid.
>>>>>>       >     >>     >
>>>>>>       >     >>     >     Thomas
>>>>>>       >     >>     >
>>>>>>       >     >>     >
>>>>>>       >     >>     >     On Tue, Aug 20, 2019 at 11:42 AM Lukasz
>>>>>> Cwik
>>>>>>       >     <lc...@google.com <mailto:lc...@google.com>
>>>>>>      <mailto:lc...@google.com <mailto:lc...@google.com>>
>>>>>>       >     >>     <mailto:lc...@google.com
>>>>>> <mailto:lc...@google.com>
>>>>>>      <mailto:lc...@google.com <mailto:lc...@google.com>>>
>>>>>>       >     >>     >     <mailto:lc...@google.com
>>>>>>      <mailto:lc...@google.com> <mailto:lc...@google.com
>>>>>>      <mailto:lc...@google.com>>
>>>>>>       >     <mailto:lc...@google.com <mailto:lc...@google.com>
>>>>>>      <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote:
>>>>>>       >     >>     >
>>>>>>       >     >>     >         Having cache tokens per key would be
>>>>>> very
>>>>>>      expensive
>>>>>>       >     indeed
>>>>>>       >     >>     and I
>>>>>>       >     >>     >         believe we should go with a single
>>>>>> cache token
>>>>>>       >     "per" bundle.
>>>>>>       >     >>     >
>>>>>>       >     >>     >         On Mon, Aug 19, 2019 at 11:36 AM
>>>>>> Maximilian
>>>>>>      Michels
>>>>>>       >     >>     >         <m...@apache.org <mailto:m...@apache.org>
>>>>>>      <mailto:m...@apache.org <mailto:m...@apache.org>>
>>>>>>       >     <mailto:m...@apache.org <mailto:m...@apache.org>
>>>>>>      <mailto:m...@apache.org <mailto:m...@apache.org>>>
>>>>>>       >     >>     <mailto:m...@apache.org <mailto:m...@apache.org>
>>>>>>      <mailto:m...@apache.org <mailto:m...@apache.org>>
>>>>>>       >     <mailto:m...@apache.org <mailto:m...@apache.org>
>>>>>>      <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
>>>>>>       >     >>     >
>>>>>>       >     >>     >             Maybe a Beam Python expert can
>>>>>> chime in for
>>>>>>       >     Rakesh's
>>>>>>       >     >>     question?
>>>>>>       >     >>     >
>>>>>>       >     >>     >             Luke, I was assuming cache
>>>>>> tokens to be
>>>>>>      per key
>>>>>>       >     and state
>>>>>>       >     >>     >             id. During
>>>>>>       >     >>     >             implementing an initial support
>>>>>> on the
>>>>>>      Runner
>>>>>>       >     side, I
>>>>>>       >     >>     >             realized that we
>>>>>>       >     >>     >             probably want cache tokens to
>>>>>> only be
>>>>>>      per state
>>>>>>       >     id. Note
>>>>>>       >     >>     >             that if we had
>>>>>>       >     >>     >             per-key cache tokens, the number
>>>>>> of cache
>>>>>>       >     tokens would
>>>>>>       >     >>     >             approach the
>>>>>>       >     >>     >             total number of keys in an
>>>>>> application.
>>>>>>       >     >>     >
>>>>>>       >     >>     >             If anyone wants to have a look,
>>>>>> here is
>>>>>>      a first
>>>>>>       >     version of
>>>>>>       >     >>     >             the Runner
>>>>>>       >     >>     >             side for cache tokens. Note that
>>>>>> I only
>>>>>>       >     implemented cache
>>>>>>       >     >>     >             tokens for
>>>>>>       >     >>     >             BagUserState for now, but it can
>>>>>> be easily
>>>>>>       >     added for side
>>>>>>       >     >>     >             inputs as well.
>>>>>>       >     >>     >
>>>>>>       >     >>     > https://github.com/apache/beam/pull/9374
>>>>>>       >     >>     >
>>>>>>       >     >>     >             -Max
>>>>>>       >     >>     >
>>>>>>       >     >>     >
>>>>>>       >     >>
>>>>>>       >
>>>>>>

Reply via email to