We would have to differentiate cache tokens for user state and side inputs. How about something like this?

message ProcessBundleRequest {
// (Required) A reference to the process bundle descriptor that must be // instantiated and executed by the SDK harness. string process_bundle_descriptor_reference =1;

  message CacheToken {

    message UserState {
    }

    message SideInputState {
      string side_input_id =1;
    }

    oneof type {
      UserState user_state =1;
      SideInputState side_input_state =2;
    }

    bytes token =10;
  }

// (Optional) A list of cache tokens that can be used by an SDK to reuse // cached data returned by the State API across multiple bundles. repeated CacheToken cache_tokens =2;
}

-Max

On 27.08.19 18:43, Lukasz Cwik wrote:
The bundles view of side inputs should never change during processing and should have a point in time snapshot.

I was just trying to say that the cache token for side inputs being deferred till side input request time simplified the runners implementation since that is conclusively when the runner would need to take a look at the side input. Putting them as part of the ProcesBundleRequest complicates that but does make the SDK implementation significantly simpler which is a win.

On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels <[email protected] <mailto:[email protected]>> wrote:

    Thanks for the quick response.

    Just to clarify, the issue with versioning side input is also present
    when supplying the cache tokens on a request basis instead of per
    bundle. The SDK never knows when the Runner receives a new version of
    the side input. Like you pointed out, it needs to mark side inputs as
    stale and generate new cache tokens for the stale side inputs.

    The difference between per-request tokens and per-bundle tokens
    would be
    that the side input can only change after a bundle completes vs.
    during
    the bundle. Side inputs are always fuzzy in that regard because
    there is
    no precise instance where side inputs are atomically updated,
    other than
    the assumption that they eventually will be updated. In that regard
    per-bundle tokens for side input seem to be fine.

    All of the above is not an issue for user state, as its cache can
    remain
    valid for the lifetime of a Runner<=>SDK Harness connection. A simple
    solution would be to not cache side input because there are many
    cases
    where the caching just adds additional overhead. However, I can also
    imagine cases where side input is valid forever and caching would be
    very beneficial.

    For the first version I want to focus on user state because that's
    where
    I see the most benefit for caching. I don't see a problem though
    for the
    Runner to detect new side input and reflect that in the cache tokens
    supplied for a new bundle.

    -Max

    On 26.08.19 22:27, Lukasz Cwik wrote:
    > Your summary below makes sense to me. I can see that recovery from
    > rolling back doesn't need to be a priority and simplifies the
    solution
    > for user state caching down to one token.
    >
    > Providing cache tokens upfront does require the Runner to know what
    > "version" of everything it may supply to the SDK upfront
    (instead of on
    > request) which would mean that the Runner may need to have a
    mapping
    > from cache token to internal version identifier for things like
    side
    > inputs which are typically broadcast. The Runner would also need
    to poll
    > to see if the side input has changed in the background to not block
    > processing bundles with "stale" side input data.
    >
    > Ping me once you have the Runner PR updated and I'll take a look
    again.
    >
    > On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels
    <[email protected] <mailto:[email protected]>
    > <mailto:[email protected] <mailto:[email protected]>>> wrote:
    >
    >     Thank you for the summary Luke. I really appreciate the
    effort you put
    >     into this!
    >
    >      > Based upon your discussion you seem to want option #1
    >
    >     I'm actually for option #2. The option to cache/invalidate
    side inputs
    >     is important, and we should incorporate this in the design.
    That's why
    >     option #1 is not flexible enough. However, a first
    implementation could
    >     defer caching of side inputs.
    >
    >     Option #3 was my initial thinking and the first version of
    the PR, but I
    >     think we agreed that there wouldn't be much gain from
    keeping a cache
    >     token per state id.
    >
    >     Option #4 is what is specifically documented in the
    reference doc and
    >     already part of the Proto, where valid tokens are provided
    for each new
    >     bundle and also as part of the response of a get/put/clear.
    We mentioned
    >     that the reply does not have to be waited on synchronously
    (I mentioned
    >     it even), but it complicates the implementation. The idea
    Thomas and I
    >     expressed was that a response is not even necessary if we assume
    >     validity of the upfront provided cache tokens for the
    lifetime of a
    >     bundle and that cache tokens will be invalidated as soon as
    the Runner
    >     fails in any way. This is naturally the case for Flink
    because it will
    >     simply "forget" its current cache tokens.
    >
    >     I currently envision the following schema:
    >
    >     Runner
    >     ======
    >
    >     - Runner generates a globally unique cache token, one for
    user state and
    >     one for each side input
    >
    >     - The token is supplied to the SDK Harness for each bundle
    request
    >
    >     - For the lifetime of a Runner<=>SDK Harness connection this
    cache token
    >     will not change
    >     - Runner will generate a new token if the connection/key
    space changes
    >     between Runner and SDK Harness
    >
    >
    >     SDK
    >     ===
    >
    >     - For each bundle the SDK worker stores the list of valid
    cache tokens
    >     - The SDK Harness keep a global cache across all its (local)
    workers
    >     which is a LRU cache: state_key => (cache_token, value)
    >     - get: Lookup cache using the valid cache token for the
    state. If no
    >     match, then fetch from Runner and use the already available
    token for
    >     caching
    >     - put: Put value in cache with a valid cache token, put
    value to pending
    >     writes which will be flushed out latest when the bundle ends
    >     - clear: same as put but clear cache
    >
    >     It does look like this is not too far off from what you were
    describing.
    >     The main difference is that we just work with a single cache
    token. In
    >     my opinion we do not need the second cache token for writes,
    as long as
    >     we ensure that we generate a new cache token if the
    >     bundle/checkpoint fails.
    >
    >     I have a draft PR
    >        for the Runner: https://github.com/apache/beam/pull/9374
    >        for the SDK: https://github.com/apache/beam/pull/9418
    >
    >     Note that the Runner PR needs to be updated to fully
    reflected the above
    >     scheme. The SDK implementation is WIP. I want to make sure
    that we
    >     clarify the design before this gets finalized.
    >
    >     Thanks again for all your comments. Much appreciated!
    >
    >     Cheers,
    >     Max
    >
    >     On 26.08.19 19:58, Lukasz Cwik wrote:
    >      > There were originally a couple of ideas around how
    caching could
    >     work:
    >      > 1) One cache token for the entire bundle that is supplied up
    >     front. The
    >      > SDK caches everything using the given token. All
    >     reads/clear/append for
    >      > all types of state happen under this token. Anytime a
    side input
    >      > changes, key processing partition range changes or a
    bundle fails to
    >      > process, the runner chooses a new cache token effectively
    >     invalidating
    >      > everything in the past>
    >      > 2) One cache token per type of state that is supplied up
    front.
    >      > The SDK caches all requests for a given type using the
    given cache
    >      > token. The runner can selectively choose which type to
    keep and
    >     which to
    >      > invalidate. Bundle failure and key processing partition
    changes
    >      > invalidate all user state, side input change invalidates
    all side
    >     inputs.
    >      >
    >      > 3) One cache token per state id that is supplied up front.
    >      > The SDK caches all requests for the given state id using the
    >     given cache
    >      > token. The runner can selectively choose which to
    invalidate and
    >     which
    >      > to keep. Bundle failure and key processing partition changes
    >     invalidate
    >      > all user state, side input changes only invalidate the
    side input
    >     that
    >      > changed.
    >      >
    >      > 4) A cache token on each read/clear/append that is
    supplied on the
    >      > response of the call with an initial valid set that is
    supplied at
    >      > start. The runner can selectively choose which to keep on
    start.
    >     Bundle
    >      > failure allows runners to "roll back" to a known good
    state by
    >     selecting
    >      > the previous valid cache token as part of the initial
    set. Key
    >      > processing partition changes allow runners to keep cached
    state that
    >      > hasn't changed since it can be tied to a version number
    of the state
    >      > itself as part of the initial set. Side input changes
    only invalidate
    >      > the side input that changed.
    >      >
    >      > Based upon your discussion you seem to want option #1 which
    >     doesn't work
    >      > well with side inputs clearing cached state. If we want
    to have user
    >      > state survive a changing side input, we would want one of
    the other
    >      > options. I do agree that supplying the cache token upfront is
    >      > significantly simpler. Currently the protos are setup for
    #4 since it
    >      > was the most flexible and at the time the pros outweighed
    the cons.
    >      >
    >      > I don't understand why you think you need to wait for a
    response
    >     for the
    >      > append/clear to get its cache token since the only reason you
    >     need the
    >      > cache token is that you want to use that cached data when
    >     processing a
    >      > different bundle. I was thinking that the flow on the SDK
    side
    >     would be
    >      > something like (assuming there is a global cache of cache
    token
    >     -> (map
    >      > of state key -> data))
    >      > 1) Create a local cache of (map of state key -> data)
    using the
    >     initial
    >      > set of valid cache tokens
    >      > 2) Make all mutations in place on local cache without
    waiting for
    >     response.
    >      > 3) When response comes back, update global cache with new
    cache
    >     token ->
    >      > (map of state key -> data)) (this is when the data
    becomes visible to
    >      > other bundles that start processing)
    >      > 4) Before the bundle finishes processing, wait for all
    >     outstanding state
    >      > calls to finish.
    >      >
    >      > To implement caching on the runner side, you would keep
    track of
    >     at most
    >      > 2 cache tokens per state key, one cache token represents
    the initial
    >      > value when the bundle started while the second represents the
    >     modified
    >      > state. If the bundle succeeds the runner passes in the
    set of tokens
    >      > which represent the new state, if the bundle fails you
    process
    >     using the
    >      > original ones.
    >      >
    >      > After thinking through the implementation again, we could
    supply two
    >      > cache tokens for each state id, the first being the set
    of initial
    >      > tokens if no writes happen while the second represents
    the token
    >     to use
    >      > if the SDK changes the state. This gives us the
    simplification
    >     where we
    >      > don't need to wait for the response before we update the
    global cache
    >      > making a typical blocking cache much easier to do. We
    also get the
    >      > benefit that runners can supply either the same cache
    token for a
    >     state
    >      > id or different ones. If the runner supplies the same one
    then its
    >      > telling the SDK to make modifications in place without
    any rollback
    >      > (which is good on memory since we are reducing copies of
    stuff) or if
    >      > the runner supplies two different ones then its telling
    the SDK
    >     to keep
    >      > the old data around. If we went through with this new
    option the SDK
    >      > side logic would be (assuming there is a global cache of
    cache
    >     token ->
    >      > (map of state key -> data)):
    >      >
    >      > 1) Create an empty local set of state ids that are dirty when
    >     starting a
    >      > new bundle (dirty set)
    >      >
    >      > For reads/gets:
    >      > 2A) If the request is a read (get), use dirty set to
    choose which
    >     cache
    >      > token to lookup and use in the global cache. If the
    global cache is
    >      > missing data issue the appropriate request providing the
    result.
    >      >
    >      > For writes/appends/clear:
    >      > 2B) if the cache tokens are different for the state id,
    add the
    >     state id
    >      > to the dirty set if it isn't there and perform the
    appropriate
    >      > modification to convert the old cached state data to the new
    >     state data
    >      > 3B) modify the global caches data
    >      > 4B) issue the request to the runner
    >      > 5B*) add this request to the set of requests to block on
    before
    >      > completing the bundle.
    >      >
    >      > (* Note, there was another idea to update the process bundle
    >     response to
    >      > contain the id of the last state request which would
    allow the
    >     runner to
    >      > know when it has seen the last state request allowing the
    SDK to not
    >      > block at all when finishing the bundle)
    >      >
    >      > On Thu, Aug 22, 2019 at 10:11 AM Maximilian Michels
    >     <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
    >      > <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>> wrote:
    >      >
    >      >     Just to give a quick update here. Rakesh, Thomas, and
    I had a
    >     discussion
    >      >     about async writes from the Python SDK to the Runner.
    Robert
    >     was also
    >      >     present for some parts of the discussion.
    >      >
    >      >     We concluded that blocking writes with the need to
    refresh
    >     the cache
    >      >     token each time are not going to provide enough
    >     throughput/latency.
    >      >
    >      >     We figured that it will be enough to use a single
    cache token per
    >      >     Runner<=>SDK Harness connection. This cache token will be
    >     provided by
    >      >     the Runner in the ProcessBundleRequest. Writes will
    not yield
    >     a new
    >      >     cache token. The advantage is that we can use one
    cache token
    >     for the
    >      >     life time of the bundle and also across bundles,
    unless the
    >     Runner
    >      >     switches to a new Runner<=>SDK Harness connection;
    then the
    >     Runner would
    >      >     have to generate a new cache token.
    >      >
    >      >     We might require additional cache tokens for the side
    inputs.
    >     For now,
    >      >     I'm planning to only tackle user state which seems to
    be the
    >     area where
    >      >     users have expressed the most need for caching.
    >      >
    >      >     -Max
    >      >
    >      >     On 21.08.19 20:05, Maximilian Michels wrote:
    >      >     >> There is probably a misunderstanding here: I'm
    suggesting
    >     to use
    >      >     a worker ID instead of cache tokens, not additionally.
    >      >     >
    >      >     > Ah! Misread that. We need a changing token to
    indicate that the
    >      >     cache is
    >      >     > stale, e.g. checkpoint has failed / restoring from
    an old
    >      >     checkpoint. If
    >      >     > the _Runner_ generates a new unique token/id for
    workers
    >     which outlast
    >      >     > the Runner, then this should work fine. I don't
    think it is
    >     safe
    >      >     for the
    >      >     > worker to supply the id. The Runner should be in
    control of
    >     cache
    >      >     tokens
    >      >     > to avoid invalid tokens.
    >      >     >
    >      >     >> In the PR the token is modified as part of
    updating the state.
    >      >     Doesn't the SDK need the new token to update it's
    cache entry
    >     also?
    >      >     That's where it would help the SDK to know the new
    token upfront.
    >      >     >
    >      >     > If the state is updated in the Runner, a new token
    has to be
    >      >     generated.
    >      >     > The old one is not valid anymore. The SDK will use
    the updated
    >      >     token to
    >      >     > store the new value in the cache. I understand that
    it would be
    >      >     nice to
    >      >     > know the token upfront. That could be possible with
    some token
    >      >     > generation scheme. On the other hand, writes can be
    >     asynchronous and
    >      >     > thus not block the UDF.
    >      >     >
    >      >     >> But I believe there is no need to change the token
    in first
    >      >     place, unless bundles for the same key (ranges) can be
    >     processed by
    >      >     different workers.
    >      >     >
    >      >     > That's certainly possible, e.g. two workers A and B
    take turn
    >      >     processing
    >      >     > a certain key range, one bundle after another:
    >      >     >
    >      >     > You process a bundle with a token T with A, then
    worker B
    >     takes over.
    >      >     > Both have an entry with cache token T. So B goes on to
    >     modify the
    >      >     state
    >      >     > and uses the same cache token T. Then A takes over
    again. A
    >     would
    >      >     have a
    >      >     > stale cache entry but T would still be a valid
    cache token.
    >      >     >
    >      >     >> Indeed the fact that Dataflow can dynamically
    split and merge
    >      >     these ranges is what makes it trickier. If Flink does not
    >      >     repartition the ranges, then things are much easier.
    >      >     >
    >      >     > Flink does not dynamically repartition key ranges
    (yet). If
    >     it started
    >      >     > to support that, we would invalidate the cache
    tokens for
    >     the changed
    >      >     > partitions.
    >      >     >
    >      >     >
    >      >     > I'd suggest the following cache token generation
    scheme:
    >      >     >
    >      >     > One cache token per key range for user state and
    one cache
    >     token for
    >      >     > each side input. On writes to user state or
    changing side
    >     input, the
    >      >     > associated cache token will be renewed.
    >      >     >
    >      >     > On the SDK side, it should be sufficient to let the SDK
    >      >     re-associate all
    >      >     > its cached data belonging to a valid cache token
    with a new
    >     cache
    >      >     token
    >      >     > returned by a successful write. This has to happen
    in the
    >     active scope
    >      >     > (i.e. user state, or a particular side input).
    >      >     >
    >      >     > If the key range changes, new cache tokens have to
    >     generated. This
    >      >     > should happen automatically because the Runner does not
    >     checkpoint
    >      >     cache
    >      >     > tokens and will generate new ones when it restarts
    from an
    >     earlier
    >      >     > checkpoint.
    >      >     >
    >      >     > The current PR needs to be changed to (1) only keep a
    >     single cache
    >      >     token
    >      >     > per user state and key range (2) add support for cache
    >     tokens for each
    >      >     > side input.
    >      >     >
    >      >     > Hope that makes sense.
    >      >     >
    >      >     > -Max
    >      >     >
    >      >     > On 21.08.19 17:27, Reuven Lax wrote:
    >      >     >>
    >      >     >>
    >      >     >> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels
    >      >     <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
    >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>
    >      >     >> <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
    >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
    >      >     >>
    >      >     >>     Appreciate all your comments! Replying below.
    >      >     >>
    >      >     >>
    >      >     >>     @Luke:
    >      >     >>
    >      >     >>     > Having cache tokens per key would be very
    expensive
    >     indeed
    >      >     and I
    >      >     >>     believe we should go with a single cache token
    "per"
    >     bundle.
    >      >     >>
    >      >     >>     Thanks for your comments on the PR. I was
    thinking to
    >     propose
    >      >     something
    >      >     >>     along this lines of having cache tokens valid
    for a
    >     particular
    >      >     >>     checkpointing "epoch". That would require even
    less token
    >      >     renewal than
    >      >     >>     the per-bundle approach.
    >      >     >>
    >      >     >>
    >      >     >>     @Thomas, thanks for the input. Some remarks:
    >      >     >>
    >      >     >>     > Wouldn't it be simpler to have the runner
    just track a
    >      >     unique ID
    >      >     >>     for each worker and use that to communicate if the
    >     cache is
    >      >     valid or
    >      >     >>     not?
    >      >     >>
    >      >     >>     We do not need a unique id per worker. If a
    cache token is
    >      >     valid for a
    >      >     >>     particular worker, it is also valid for another
    >     worker. That
    >      >     is with the
    >      >     >>     assumption that key ranges are always disjoint
    between the
    >      >     workers.
    >      >     >>
    >      >     >>     > * When the bundle is started, the runner
    tells the
    >     worker
    >      >     if the
    >      >     >>     cache has become invalid (since it knows if
    another
    >     worker has
    >      >     >>     mutated state)
    >      >     >>
    >      >     >>     This is simply done by not transferring the
    particular
    >     cache
    >      >     token. No
    >      >     >>     need to declare it invalid explicitly.
    >      >     >>
    >      >     >>     > * When the worker sends mutation requests to the
    >     runner, it
    >      >     >>     includes its own ID (or the runner already has
    it as
    >     contextual
    >      >     >>     information). No need to wait for a response.
    >      >     >>
    >      >     >>     Mutations of cached values can be freely done
    as long
    >     as the
    >      >     cache token
    >      >     >>     associated with the state is valid for a
    particular
    >     bundle.
    >      >     Only the
    >      >     >>     first time, the Runner needs to wait on the
    response
    >     to store
    >      >     the cache
    >      >     >>     token. This can also be done asynchronously.
    >      >     >>
    >      >     >>     > * When the bundle is finished, the runner
    records
    >     the last
    >      >     writer
    >      >     >>     (only if a change occurred)
    >      >     >>
    >      >     >>     I believe this is not necessary because there
    will only be
    >      >     one writer at
    >      >     >>     a time for a particular bundle and key range,
    hence
    >     only one
    >      >     writer
    >      >     >>     holds a valid cache token for a particular
    state and
    >     key range.
    >      >     >>
    >      >     >>
    >      >     >>     @Reuven:
    >      >     >>
    >      >     >>     >  Dataflow divides the keyspace up into
    lexicographic
    >      >     ranges, and
    >      >     >>     creates a cache token per range.
    >      >     >>
    >      >     >>     State is always processed partitioned by the
    Flink workers
    >      >     (hash-based,
    >      >     >>     not lexicopgrahical). I don't think that
    matters though
    >      >     because the key
    >      >     >>     ranges do not overlap between the workers.
    Flink does
    >     not support
    >      >     >>     dynamically repartitioning the key ranges.
    Even in case of
    >      >     fine-grained
    >      >     >>     recovery of workers and their key ranges, we
    would simply
    >      >     generate new
    >      >     >>     cache tokens for a particular worker.
    >      >     >>
    >      >     >>
    >      >     >> Dataflow's ranges are also hash based. When I said
    >     lexicographical, I
    >      >     >> meant lexicographical based on the hexadecimal
    hash value.
    >      >     >>
    >      >     >> Indeed the fact that Dataflow can dynamically
    split and
    >     merge these
    >      >     >> ranges is what makes it trickier. If Flink does not
    >     repartition the
    >      >     >> ranges, then things are much easier.
    >      >     >>
    >      >     >>
    >      >     >>
    >      >     >>     Thanks,
    >      >     >>     Max
    >      >     >>
    >      >     >>     On 21.08.19 09:33, Reuven Lax wrote:
    >      >     >>     > Dataflow does something like this, however
    since work is
    >      >     >>     > load balanced across workers a per-worker id
    doesn't
    >     work
    >      >     very well.
    >      >     >>     > Dataflow divides the keyspace up into
    lexicographic
    >     ranges, and
    >      >     >>     creates
    >      >     >>     > a cache token per range.
    >      >     >>     >
    >      >     >>     > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise
    >      >     <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
    >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>
    >      >     >>     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
    >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>
    >      >     >>     > <mailto:[email protected]
    <mailto:[email protected]> <mailto:[email protected]
    <mailto:[email protected]>>
    >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>
    >      >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
    >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>>> wrote:
    >      >     >>     >
    >      >     >>     >     Commenting here vs. on the PR since
    related to
    >     the overall
    >      >     >>     approach.
    >      >     >>     >
    >      >     >>     >     Wouldn't it be simpler to have the
    runner just
    >     track a
    >      >     unique
    >      >     >>     ID for
    >      >     >>     >     each worker and use that to communicate
    if the
    >     cache is
    >      >     valid
    >      >     >>     or not?
    >      >     >>     >
    >      >     >>     >     * When the bundle is started, the runner
    tells the
    >      >     worker if the
    >      >     >>     >     cache has become invalid (since it knows
    if another
    >      >     worker has
    >      >     >>     >     mutated state)
    >      >     >>     >     * When the worker sends mutation
    requests to the
    >     runner, it
    >      >     >>     includes
    >      >     >>     >     its own ID (or the runner already has it as
    >     contextual
    >      >     >>     information).
    >      >     >>     >     No need to wait for a response.
    >      >     >>     >     * When the bundle is finished, the runner
    >     records the
    >      >     last writer
    >      >     >>     >     (only if a change occurred)
    >      >     >>     >
    >      >     >>     >     Whenever current worker ID and last
    writer ID
    >     doesn't
    >      >     match, cache
    >      >     >>     >     is invalid.
    >      >     >>     >
    >      >     >>     >     Thomas
    >      >     >>     >
    >      >     >>     >
    >      >     >>     >     On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik
    >      >     <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
    >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>
    >      >     >>     <mailto:[email protected]
    <mailto:[email protected]> <mailto:[email protected]
    <mailto:[email protected]>>
    >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>
    >      >     >>     >     <mailto:[email protected]
    <mailto:[email protected]>
    >     <mailto:[email protected] <mailto:[email protected]>>
    <mailto:[email protected] <mailto:[email protected]>
    >     <mailto:[email protected] <mailto:[email protected]>>>
    >      >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
    >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>>> wrote:
    >      >     >>     >
    >      >     >>     >         Having cache tokens per key would be
    very
    >     expensive
    >      >     indeed
    >      >     >>     and I
    >      >     >>     >         believe we should go with a single
    cache token
    >      >     "per" bundle.
    >      >     >>     >
    >      >     >>     >         On Mon, Aug 19, 2019 at 11:36 AM
    Maximilian
    >     Michels
    >      >     >>     >         <[email protected]
    <mailto:[email protected]> <mailto:[email protected]
    <mailto:[email protected]>>
    >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>
    >      >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
    >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>
    >      >     >>     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
    >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>
    >      >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
    >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>>> wrote:
    >      >     >>     >
    >      >     >>     >             Maybe a Beam Python expert can
    chime in for
    >      >     Rakesh's
    >      >     >>     question?
    >      >     >>     >
    >      >     >>     >             Luke, I was assuming cache
    tokens to be
    >     per key
    >      >     and state
    >      >     >>     >             id. During
    >      >     >>     >             implementing an initial support
    on the
    >     Runner
    >      >     side, I
    >      >     >>     >             realized that we
    >      >     >>     >             probably want cache tokens to
    only be
    >     per state
    >      >     id. Note
    >      >     >>     >             that if we had
    >      >     >>     >             per-key cache tokens, the number
    of cache
    >      >     tokens would
    >      >     >>     >             approach the
    >      >     >>     >             total number of keys in an
    application.
    >      >     >>     >
    >      >     >>     >             If anyone wants to have a look,
    here is
    >     a first
    >      >     version of
    >      >     >>     >             the Runner
    >      >     >>     >             side for cache tokens. Note that
    I only
    >      >     implemented cache
    >      >     >>     >             tokens for
    >      >     >>     >             BagUserState for now, but it can
    be easily
    >      >     added for side
    >      >     >>     >             inputs as well.
    >      >     >>     >
    >      >     >>     > https://github.com/apache/beam/pull/9374
    >      >     >>     >
    >      >     >>     >             -Max
    >      >     >>     >
    >      >     >>     >
    >      >     >>
    >      >
    >

Reply via email to