Thanks. Updated:

message ProcessBundleRequest {
// (Required) A reference to the process bundle descriptor that must be // instantiated and executed by the SDK harness. string process_bundle_descriptor_reference =1;

// A cache token which can be used by an SDK to check for the validity // of cached elements which have a cache token associated. message CacheToken {

    // A flag to indicate a cache token is valid for user state. message 
UserState {}

    // A flag to indicate a cache token is valid for a side input. message 
SideInput {
      // The id of a side input. string side_input =1;
    }

    // The scope of a cache token. oneof type {
      UserState user_state =1;
      SideInput side_input =2;
    }

    // The cache token identifier which should be globally unique. bytes token 
=10;
  }

// (Optional) A list of cache tokens that can be used by an SDK to reuse // cached data returned by the State API across multiple bundles. repeated CacheToken cache_tokens =2;
}

On 27.08.19 19:22, Lukasz Cwik wrote:
SideInputState -> SideInput (side_input_state -> side_input)
+ more comments around the messages and the fields.


On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels <m...@apache.org <mailto:m...@apache.org>> wrote:

    We would have to differentiate cache tokens for user state and
    side inputs. How about something like this?

    message ProcessBundleRequest {
       // (Required) A reference to the process bundle descriptor that
    must be // instantiated and executed by the SDK harness. string 
process_bundle_descriptor_reference =1;

       message CacheToken {

         message UserState {
         }

         message SideInputState {
           string side_input_id =1;
         }

         oneof type {
           UserState user_state =1;
           SideInputState side_input_state =2;
         }

         bytes token =10;
       }

       // (Optional) A list of cache tokens that can be used by an SDK to
    reuse // cached data returned by the State API across multiple
    bundles. repeated CacheToken cache_tokens =2;
    }

    -Max

    On 27.08.19 18:43, Lukasz Cwik wrote:
    The bundles view of side inputs should never change during
    processing and should have a point in time snapshot.

    I was just trying to say that the cache token for side inputs
    being deferred till side input request time simplified the
    runners implementation since that is conclusively when the runner
    would need to take a look at the side input. Putting them as part
    of the ProcesBundleRequest complicates that but does make the SDK
    implementation significantly simpler which is a win.

    On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels
    <m...@apache.org <mailto:m...@apache.org>> wrote:

        Thanks for the quick response.

        Just to clarify, the issue with versioning side input is also
        present
        when supplying the cache tokens on a request basis instead of
        per
        bundle. The SDK never knows when the Runner receives a new
        version of
        the side input. Like you pointed out, it needs to mark side
        inputs as
        stale and generate new cache tokens for the stale side inputs.

        The difference between per-request tokens and per-bundle
        tokens would be
        that the side input can only change after a bundle completes
        vs. during
        the bundle. Side inputs are always fuzzy in that regard
        because there is
        no precise instance where side inputs are atomically updated,
        other than
        the assumption that they eventually will be updated. In that
        regard
        per-bundle tokens for side input seem to be fine.

        All of the above is not an issue for user state, as its cache
        can remain
        valid for the lifetime of a Runner<=>SDK Harness connection.
        A simple
        solution would be to not cache side input because there are
        many cases
        where the caching just adds additional overhead. However, I
        can also
        imagine cases where side input is valid forever and caching
        would be
        very beneficial.

        For the first version I want to focus on user state because
        that's where
        I see the most benefit for caching. I don't see a problem
        though for the
        Runner to detect new side input and reflect that in the cache
        tokens
        supplied for a new bundle.

        -Max

        On 26.08.19 22:27, Lukasz Cwik wrote:
        > Your summary below makes sense to me. I can see that
        recovery from
        > rolling back doesn't need to be a priority and simplifies
        the solution
        > for user state caching down to one token.
        >
        > Providing cache tokens upfront does require the Runner to
        know what
        > "version" of everything it may supply to the SDK upfront
        (instead of on
        > request) which would mean that the Runner may need to have
        a mapping
        > from cache token to internal version identifier for things
        like side
        > inputs which are typically broadcast. The Runner would also
        need to poll
        > to see if the side input has changed in the background to
        not block
        > processing bundles with "stale" side input data.
        >
        > Ping me once you have the Runner PR updated and I'll take a
        look again.
        >
        > On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels
        <m...@apache.org <mailto:m...@apache.org>
        > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
        >
        >     Thank you for the summary Luke. I really appreciate the
        effort you put
        >     into this!
        >
        >      > Based upon your discussion you seem to want option #1
        >
        >     I'm actually for option #2. The option to
        cache/invalidate side inputs
        >     is important, and we should incorporate this in the
        design. That's why
        >     option #1 is not flexible enough. However, a first
        implementation could
        >     defer caching of side inputs.
        >
        >     Option #3 was my initial thinking and the first version
        of the PR, but I
        >     think we agreed that there wouldn't be much gain from
        keeping a cache
        >     token per state id.
        >
        >     Option #4 is what is specifically documented in the
        reference doc and
        >     already part of the Proto, where valid tokens are
        provided for each new
        >     bundle and also as part of the response of a
        get/put/clear. We mentioned
        >     that the reply does not have to be waited on
        synchronously (I mentioned
        >     it even), but it complicates the implementation. The
        idea Thomas and I
        >     expressed was that a response is not even necessary if
        we assume
        >     validity of the upfront provided cache tokens for the
        lifetime of a
        >     bundle and that cache tokens will be invalidated as
        soon as the Runner
        >     fails in any way. This is naturally the case for Flink
        because it will
        >     simply "forget" its current cache tokens.
        >
        >     I currently envision the following schema:
        >
        >     Runner
        >     ======
        >
        >     - Runner generates a globally unique cache token, one
        for user state and
        >     one for each side input
        >
        >     - The token is supplied to the SDK Harness for each
        bundle request
        >
        >     - For the lifetime of a Runner<=>SDK Harness connection
        this cache token
        >     will not change
        >     - Runner will generate a new token if the
        connection/key space changes
        >     between Runner and SDK Harness
        >
        >
        >     SDK
        >     ===
        >
        >     - For each bundle the SDK worker stores the list of
        valid cache tokens
        >     - The SDK Harness keep a global cache across all its
        (local) workers
        >     which is a LRU cache: state_key => (cache_token, value)
        >     - get: Lookup cache using the valid cache token for the
        state. If no
        >     match, then fetch from Runner and use the already
        available token for
        >     caching
        >     - put: Put value in cache with a valid cache token, put
        value to pending
        >     writes which will be flushed out latest when the bundle
        ends
        >     - clear: same as put but clear cache
        >
        >     It does look like this is not too far off from what you
        were describing.
        >     The main difference is that we just work with a single
        cache token. In
        >     my opinion we do not need the second cache token for
        writes, as long as
        >     we ensure that we generate a new cache token if the
        >     bundle/checkpoint fails.
        >
        >     I have a draft PR
        >        for the Runner: https://github.com/apache/beam/pull/9374
        >        for the SDK: https://github.com/apache/beam/pull/9418
        >
        >     Note that the Runner PR needs to be updated to fully
        reflected the above
        >     scheme. The SDK implementation is WIP. I want to make
        sure that we
        >     clarify the design before this gets finalized.
        >
        >     Thanks again for all your comments. Much appreciated!
        >
        >     Cheers,
        >     Max
        >
        >     On 26.08.19 19:58, Lukasz Cwik wrote:
        >      > There were originally a couple of ideas around how
        caching could
        >     work:
        >      > 1) One cache token for the entire bundle that is
        supplied up
        >     front. The
        >      > SDK caches everything using the given token. All
        >     reads/clear/append for
        >      > all types of state happen under this token. Anytime
        a side input
        >      > changes, key processing partition range changes or a
        bundle fails to
        >      > process, the runner chooses a new cache token
        effectively
        >     invalidating
        >      > everything in the past>
        >      > 2) One cache token per type of state that is
        supplied up front.
        >      > The SDK caches all requests for a given type using
        the given cache
        >      > token. The runner can selectively choose which type
        to keep and
        >     which to
        >      > invalidate. Bundle failure and key processing
        partition changes
        >      > invalidate all user state, side input change
        invalidates all side
        >     inputs.
        >      >
        >      > 3) One cache token per state id that is supplied up
        front.
        >      > The SDK caches all requests for the given state id
        using the
        >     given cache
        >      > token. The runner can selectively choose which to
        invalidate and
        >     which
        >      > to keep. Bundle failure and key processing partition
        changes
        >     invalidate
        >      > all user state, side input changes only invalidate
        the side input
        >     that
        >      > changed.
        >      >
        >      > 4) A cache token on each read/clear/append that is
        supplied on the
        >      > response of the call with an initial valid set that
        is supplied at
        >      > start. The runner can selectively choose which to
        keep on start.
        >     Bundle
        >      > failure allows runners to "roll back" to a known
        good state by
        >     selecting
        >      > the previous valid cache token as part of the
        initial set. Key
        >      > processing partition changes allow runners to keep
        cached state that
        >      > hasn't changed since it can be tied to a version
        number of the state
        >      > itself as part of the initial set. Side input
        changes only invalidate
        >      > the side input that changed.
        >      >
        >      > Based upon your discussion you seem to want option
        #1 which
        >     doesn't work
        >      > well with side inputs clearing cached state. If we
        want to have user
        >      > state survive a changing side input, we would want
        one of the other
        >      > options. I do agree that supplying the cache token
        upfront is
        >      > significantly simpler. Currently the protos are
        setup for #4 since it
        >      > was the most flexible and at the time the pros
        outweighed the cons.
        >      >
        >      > I don't understand why you think you need to wait
        for a response
        >     for the
        >      > append/clear to get its cache token since the only
        reason you
        >     need the
        >      > cache token is that you want to use that cached data
        when
        >     processing a
        >      > different bundle. I was thinking that the flow on
        the SDK side
        >     would be
        >      > something like (assuming there is a global cache of
        cache token
        >     -> (map
        >      > of state key -> data))
        >      > 1) Create a local cache of (map of state key ->
        data) using the
        >     initial
        >      > set of valid cache tokens
        >      > 2) Make all mutations in place on local cache
        without waiting for
        >     response.
        >      > 3) When response comes back, update global cache
        with new cache
        >     token ->
        >      > (map of state key -> data)) (this is when the data
        becomes visible to
        >      > other bundles that start processing)
        >      > 4) Before the bundle finishes processing, wait for all
        >     outstanding state
        >      > calls to finish.
        >      >
        >      > To implement caching on the runner side, you would
        keep track of
        >     at most
        >      > 2 cache tokens per state key, one cache token
        represents the initial
        >      > value when the bundle started while the second
        represents the
        >     modified
        >      > state. If the bundle succeeds the runner passes in
        the set of tokens
        >      > which represent the new state, if the bundle fails
        you process
        >     using the
        >      > original ones.
        >      >
        >      > After thinking through the implementation again, we
        could supply two
        >      > cache tokens for each state id, the first being the
        set of initial
        >      > tokens if no writes happen while the second
        represents the token
        >     to use
        >      > if the SDK changes the state. This gives us the
        simplification
        >     where we
        >      > don't need to wait for the response before we update
        the global cache
        >      > making a typical blocking cache much easier to do.
        We also get the
        >      > benefit that runners can supply either the same
        cache token for a
        >     state
        >      > id or different ones. If the runner supplies the
        same one then its
        >      > telling the SDK to make modifications in place
        without any rollback
        >      > (which is good on memory since we are reducing
        copies of stuff) or if
        >      > the runner supplies two different ones then its
        telling the SDK
        >     to keep
        >      > the old data around. If we went through with this
        new option the SDK
        >      > side logic would be (assuming there is a global
        cache of cache
        >     token ->
        >      > (map of state key -> data)):
        >      >
        >      > 1) Create an empty local set of state ids that are
        dirty when
        >     starting a
        >      > new bundle (dirty set)
        >      >
        >      > For reads/gets:
        >      > 2A) If the request is a read (get), use dirty set to
        choose which
        >     cache
        >      > token to lookup and use in the global cache. If the
        global cache is
        >      > missing data issue the appropriate request providing
        the result.
        >      >
        >      > For writes/appends/clear:
        >      > 2B) if the cache tokens are different for the state
        id, add the
        >     state id
        >      > to the dirty set if it isn't there and perform the
        appropriate
        >      > modification to convert the old cached state data to
        the new
        >     state data
        >      > 3B) modify the global caches data
        >      > 4B) issue the request to the runner
        >      > 5B*) add this request to the set of requests to
        block on before
        >      > completing the bundle.
        >      >
        >      > (* Note, there was another idea to update the
        process bundle
        >     response to
        >      > contain the id of the last state request which would
        allow the
        >     runner to
        >      > know when it has seen the last state request
        allowing the SDK to not
        >      > block at all when finishing the bundle)
        >      >
        >      > On Thu, Aug 22, 2019 at 10:11 AM Maximilian Michels
        >     <m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>
        >      > <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
        >      >
        >      >     Just to give a quick update here. Rakesh,
        Thomas, and I had a
        >     discussion
        >      >     about async writes from the Python SDK to the
        Runner. Robert
        >     was also
        >      >     present for some parts of the discussion.
        >      >
        >      >     We concluded that blocking writes with the need
        to refresh
        >     the cache
        >      >     token each time are not going to provide enough
        >     throughput/latency.
        >      >
        >      >     We figured that it will be enough to use a
        single cache token per
        >      >     Runner<=>SDK Harness connection. This cache
        token will be
        >     provided by
        >      >     the Runner in the ProcessBundleRequest. Writes
        will not yield
        >     a new
        >      >     cache token. The advantage is that we can use
        one cache token
        >     for the
        >      >     life time of the bundle and also across bundles,
        unless the
        >     Runner
        >      >     switches to a new Runner<=>SDK Harness
        connection; then the
        >     Runner would
        >      >     have to generate a new cache token.
        >      >
        >      >     We might require additional cache tokens for the
        side inputs.
        >     For now,
        >      >     I'm planning to only tackle user state which
        seems to be the
        >     area where
        >      >     users have expressed the most need for caching.
        >      >
        >      >     -Max
        >      >
        >      >     On 21.08.19 20:05, Maximilian Michels wrote:
        >      >     >> There is probably a misunderstanding here:
        I'm suggesting
        >     to use
        >      >     a worker ID instead of cache tokens, not
        additionally.
        >      >     >
        >      >     > Ah! Misread that. We need a changing token to
        indicate that the
        >      >     cache is
        >      >     > stale, e.g. checkpoint has failed / restoring
        from an old
        >      >     checkpoint. If
        >      >     > the _Runner_ generates a new unique token/id
        for workers
        >     which outlast
        >      >     > the Runner, then this should work fine. I
        don't think it is
        >     safe
        >      >     for the
        >      >     > worker to supply the id. The Runner should be
        in control of
        >     cache
        >      >     tokens
        >      >     > to avoid invalid tokens.
        >      >     >
        >      >     >> In the PR the token is modified as part of
        updating the state.
        >      >     Doesn't the SDK need the new token to update
        it's cache entry
        >     also?
        >      >     That's where it would help the SDK to know the
        new token upfront.
        >      >     >
        >      >     > If the state is updated in the Runner, a new
        token has to be
        >      >     generated.
        >      >     > The old one is not valid anymore. The SDK will
        use the updated
        >      >     token to
        >      >     > store the new value in the cache. I understand
        that it would be
        >      >     nice to
        >      >     > know the token upfront. That could be possible
        with some token
        >      >     > generation scheme. On the other hand, writes
        can be
        >     asynchronous and
        >      >     > thus not block the UDF.
        >      >     >
        >      >     >> But I believe there is no need to change the
        token in first
        >      >     place, unless bundles for the same key (ranges)
        can be
        >     processed by
        >      >     different workers.
        >      >     >
        >      >     > That's certainly possible, e.g. two workers A
        and B take turn
        >      >     processing
        >      >     > a certain key range, one bundle after another:
        >      >     >
        >      >     > You process a bundle with a token T with A,
        then worker B
        >     takes over.
        >      >     > Both have an entry with cache token T. So B
        goes on to
        >     modify the
        >      >     state
        >      >     > and uses the same cache token T. Then A takes
        over again. A
        >     would
        >      >     have a
        >      >     > stale cache entry but T would still be a valid
        cache token.
        >      >     >
        >      >     >> Indeed the fact that Dataflow can dynamically
        split and merge
        >      >     these ranges is what makes it trickier. If Flink
        does not
        >      >     repartition the ranges, then things are much easier.
        >      >     >
        >      >     > Flink does not dynamically repartition key
        ranges (yet). If
        >     it started
        >      >     > to support that, we would invalidate the cache
        tokens for
        >     the changed
        >      >     > partitions.
        >      >     >
        >      >     >
        >      >     > I'd suggest the following cache token
        generation scheme:
        >      >     >
        >      >     > One cache token per key range for user state
        and one cache
        >     token for
        >      >     > each side input. On writes to user state or
        changing side
        >     input, the
        >      >     > associated cache token will be renewed.
        >      >     >
        >      >     > On the SDK side, it should be sufficient to
        let the SDK
        >      >     re-associate all
        >      >     > its cached data belonging to a valid cache
        token with a new
        >     cache
        >      >     token
        >      >     > returned by a successful write. This has to
        happen in the
        >     active scope
        >      >     > (i.e. user state, or a particular side input).
        >      >     >
        >      >     > If the key range changes, new cache tokens have to
        >     generated. This
        >      >     > should happen automatically because the Runner
        does not
        >     checkpoint
        >      >     cache
        >      >     > tokens and will generate new ones when it
        restarts from an
        >     earlier
        >      >     > checkpoint.
        >      >     >
        >      >     > The current PR needs to be changed to (1) only
        keep a
        >     single cache
        >      >     token
        >      >     > per user state and key range (2) add support
        for cache
        >     tokens for each
        >      >     > side input.
        >      >     >
        >      >     > Hope that makes sense.
        >      >     >
        >      >     > -Max
        >      >     >
        >      >     > On 21.08.19 17:27, Reuven Lax wrote:
        >      >     >>
        >      >     >>
        >      >     >> On Wed, Aug 21, 2019 at 2:16 AM Maximilian
        Michels
        >      >     <m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>
        >     <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>>
        >      >     >> <mailto:m...@apache.org
        <mailto:m...@apache.org> <mailto:m...@apache.org
        <mailto:m...@apache.org>>
        >     <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
        >      >     >>
        >      >     >>     Appreciate all your comments! Replying below.
        >      >     >>
        >      >     >>
        >      >     >>     @Luke:
        >      >     >>
        >      >     >>     > Having cache tokens per key would be
        very expensive
        >     indeed
        >      >     and I
        >      >     >>     believe we should go with a single cache
        token "per"
        >     bundle.
        >      >     >>
        >      >     >>     Thanks for your comments on the PR. I was
        thinking to
        >     propose
        >      >     something
        >      >     >>     along this lines of having cache tokens
        valid for a
        >     particular
        >      >     >>     checkpointing "epoch". That would require
        even less token
        >      >     renewal than
        >      >     >>     the per-bundle approach.
        >      >     >>
        >      >     >>
        >      >     >>     @Thomas, thanks for the input. Some remarks:
        >      >     >>
        >      >     >>     > Wouldn't it be simpler to have the
        runner just track a
        >      >     unique ID
        >      >     >>     for each worker and use that to
        communicate if the
        >     cache is
        >      >     valid or
        >      >     >>     not?
        >      >     >>
        >      >     >>     We do not need a unique id per worker. If
        a cache token is
        >      >     valid for a
        >      >     >>     particular worker, it is also valid for
        another
        >     worker. That
        >      >     is with the
        >      >     >>     assumption that key ranges are always
        disjoint between the
        >      >     workers.
        >      >     >>
        >      >     >>     > * When the bundle is started, the
        runner tells the
        >     worker
        >      >     if the
        >      >     >>     cache has become invalid (since it knows
        if another
        >     worker has
        >      >     >>     mutated state)
        >      >     >>
        >      >     >>     This is simply done by not transferring
        the particular
        >     cache
        >      >     token. No
        >      >     >>     need to declare it invalid explicitly.
        >      >     >>
        >      >     >>     > * When the worker sends mutation
        requests to the
        >     runner, it
        >      >     >>     includes its own ID (or the runner
        already has it as
        >     contextual
        >      >     >>     information). No need to wait for a response.
        >      >     >>
        >      >     >>     Mutations of cached values can be freely
        done as long
        >     as the
        >      >     cache token
        >      >     >>     associated with the state is valid for a
        particular
        >     bundle.
        >      >     Only the
        >      >     >>     first time, the Runner needs to wait on
        the response
        >     to store
        >      >     the cache
        >      >     >>     token. This can also be done asynchronously.
        >      >     >>
        >      >     >>     > * When the bundle is finished, the
        runner records
        >     the last
        >      >     writer
        >      >     >>     (only if a change occurred)
        >      >     >>
        >      >     >>     I believe this is not necessary because
        there will only be
        >      >     one writer at
        >      >     >>     a time for a particular bundle and key
        range, hence
        >     only one
        >      >     writer
        >      >     >>     holds a valid cache token for a
        particular state and
        >     key range.
        >      >     >>
        >      >     >>
        >      >     >>     @Reuven:
        >      >     >>
        >      >     >>     >  Dataflow divides the keyspace up into
        lexicographic
        >      >     ranges, and
        >      >     >>     creates a cache token per range.
        >      >     >>
        >      >     >>     State is always processed partitioned by
        the Flink workers
        >      >     (hash-based,
        >      >     >>     not lexicopgrahical). I don't think that
        matters though
        >      >     because the key
        >      >     >>     ranges do not overlap between the
        workers. Flink does
        >     not support
        >      >     >>     dynamically repartitioning the key
        ranges. Even in case of
        >      >     fine-grained
        >      >     >>     recovery of workers and their key ranges,
        we would simply
        >      >     generate new
        >      >     >>     cache tokens for a particular worker.
        >      >     >>
        >      >     >>
        >      >     >> Dataflow's ranges are also hash based. When I
        said
        >     lexicographical, I
        >      >     >> meant lexicographical based on the
        hexadecimal hash value.
        >      >     >>
        >      >     >> Indeed the fact that Dataflow can dynamically
        split and
        >     merge these
        >      >     >> ranges is what makes it trickier. If Flink
        does not
        >     repartition the
        >      >     >> ranges, then things are much easier.
        >      >     >>
        >      >     >>
        >      >     >>
        >      >     >>     Thanks,
        >      >     >>     Max
        >      >     >>
        >      >     >>     On 21.08.19 09:33, Reuven Lax wrote:
        >      >     >>     > Dataflow does something like this,
        however since work is
        >      >     >>     > load balanced across workers a
        per-worker id doesn't
        >     work
        >      >     very well.
        >      >     >>     > Dataflow divides the keyspace up into
        lexicographic
        >     ranges, and
        >      >     >>     creates
        >      >     >>     > a cache token per range.
        >      >     >>     >
        >      >     >>     > On Tue, Aug 20, 2019 at 8:35 PM Thomas
        Weise
        >      >     <t...@apache.org <mailto:t...@apache.org>
        <mailto:t...@apache.org <mailto:t...@apache.org>>
        >     <mailto:t...@apache.org <mailto:t...@apache.org>
        <mailto:t...@apache.org <mailto:t...@apache.org>>>
        >      >     >>     <mailto:t...@apache.org
        <mailto:t...@apache.org> <mailto:t...@apache.org
        <mailto:t...@apache.org>>
        >     <mailto:t...@apache.org <mailto:t...@apache.org>
        <mailto:t...@apache.org <mailto:t...@apache.org>>>>
        >      >     >>     > <mailto:t...@apache.org
        <mailto:t...@apache.org> <mailto:t...@apache.org
        <mailto:t...@apache.org>>
        >     <mailto:t...@apache.org <mailto:t...@apache.org>
        <mailto:t...@apache.org <mailto:t...@apache.org>>>
        >      >     <mailto:t...@apache.org <mailto:t...@apache.org>
        <mailto:t...@apache.org <mailto:t...@apache.org>>
        >     <mailto:t...@apache.org <mailto:t...@apache.org>
        <mailto:t...@apache.org <mailto:t...@apache.org>>>>>> wrote:
        >      >     >>     >
        >      >     >>     >     Commenting here vs. on the PR since
        related to
        >     the overall
        >      >     >>     approach.
        >      >     >>     >
        >      >     >>     >     Wouldn't it be simpler to have the
        runner just
        >     track a
        >      >     unique
        >      >     >>     ID for
        >      >     >>     >     each worker and use that to
        communicate if the
        >     cache is
        >      >     valid
        >      >     >>     or not?
        >      >     >>     >
        >      >     >>     >     * When the bundle is started, the
        runner tells the
        >      >     worker if the
        >      >     >>     >     cache has become invalid (since it
        knows if another
        >      >     worker has
        >      >     >>     >     mutated state)
        >      >     >>     >     * When the worker sends mutation
        requests to the
        >     runner, it
        >      >     >>     includes
        >      >     >>     >     its own ID (or the runner already
        has it as
        >     contextual
        >      >     >>     information).
        >      >     >>     >     No need to wait for a response.
        >      >     >>     >     * When the bundle is finished, the
        runner
        >     records the
        >      >     last writer
        >      >     >>     >     (only if a change occurred)
        >      >     >>     >
        >      >     >>     >     Whenever current worker ID and last
        writer ID
        >     doesn't
        >      >     match, cache
        >      >     >>     >     is invalid.
        >      >     >>     >
        >      >     >>     >     Thomas
        >      >     >>     >
        >      >     >>     >
        >      >     >>     >     On Tue, Aug 20, 2019 at 11:42 AM
        Lukasz Cwik
        >      >     <lc...@google.com <mailto:lc...@google.com>
        <mailto:lc...@google.com <mailto:lc...@google.com>>
        >     <mailto:lc...@google.com <mailto:lc...@google.com>
        <mailto:lc...@google.com <mailto:lc...@google.com>>>
        >      >     >>     <mailto:lc...@google.com
        <mailto:lc...@google.com> <mailto:lc...@google.com
        <mailto:lc...@google.com>>
        >     <mailto:lc...@google.com <mailto:lc...@google.com>
        <mailto:lc...@google.com <mailto:lc...@google.com>>>>
        >      >     >>     >     <mailto:lc...@google.com
        <mailto:lc...@google.com>
        >     <mailto:lc...@google.com <mailto:lc...@google.com>>
        <mailto:lc...@google.com <mailto:lc...@google.com>
        >     <mailto:lc...@google.com <mailto:lc...@google.com>>>
        >      >     <mailto:lc...@google.com
        <mailto:lc...@google.com> <mailto:lc...@google.com
        <mailto:lc...@google.com>>
        >     <mailto:lc...@google.com <mailto:lc...@google.com>
        <mailto:lc...@google.com <mailto:lc...@google.com>>>>>> wrote:
        >      >     >>     >
        >      >     >>     >         Having cache tokens per key
        would be very
        >     expensive
        >      >     indeed
        >      >     >>     and I
        >      >     >>     >         believe we should go with a
        single cache token
        >      >     "per" bundle.
        >      >     >>     >
        >      >     >>     >         On Mon, Aug 19, 2019 at 11:36
        AM Maximilian
        >     Michels
        >      >     >>     >         <m...@apache.org
        <mailto:m...@apache.org> <mailto:m...@apache.org
        <mailto:m...@apache.org>>
        >     <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>>
        >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>
        >     <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>>>
        >      >     >>     <mailto:m...@apache.org
        <mailto:m...@apache.org> <mailto:m...@apache.org
        <mailto:m...@apache.org>>
        >     <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>>
        >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>
        >     <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>>>>> wrote:
        >      >     >>     >
        >      >     >>     >             Maybe a Beam Python expert
        can chime in for
        >      >     Rakesh's
        >      >     >>     question?
        >      >     >>     >
        >      >     >>     >             Luke, I was assuming cache
        tokens to be
        >     per key
        >      >     and state
        >      >     >>     >             id. During
        >      >     >>     >  implementing an initial support on the
        >     Runner
        >      >     side, I
        >      >     >>     >  realized that we
        >      >     >>     >  probably want cache tokens to only be
        >     per state
        >      >     id. Note
        >      >     >>     >             that if we had
        >      >     >>     >  per-key cache tokens, the number of cache
        >      >     tokens would
        >      >     >>     >  approach the
        >      >     >>     >             total number of keys in an
        application.
        >      >     >>     >
        >      >     >>     >             If anyone wants to have a
        look, here is
        >     a first
        >      >     version of
        >      >     >>     >             the Runner
        >      >     >>     >             side for cache tokens. Note
        that I only
        >      >     implemented cache
        >      >     >>     >  tokens for
        >      >     >>     >  BagUserState for now, but it can be easily
        >      >     added for side
        >      >     >>     >  inputs as well.
        >      >     >>     >
        >      >     >>     > https://github.com/apache/beam/pull/9374
        >      >     >>     >
        >      >     >>     >             -Max
        >      >     >>     >
        >      >     >>     >
        >      >     >>
        >      >
        >

Reply via email to