SideInputState -> SideInput (side_input_state -> side_input)
+ more comments around the messages and the fields.


On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels <[email protected]> wrote:

> We would have to differentiate cache tokens for user state and side
> inputs. How about something like this?
>
> message ProcessBundleRequest {
>   // (Required) A reference to the process bundle descriptor that must be  // 
> instantiated and executed by the SDK harness.  string 
> process_bundle_descriptor_reference = 1;
>
>   message CacheToken {
>
>     message UserState {
>     }
>
>     message SideInputState {
>       string side_input_id = 1;
>     }
>
>     oneof type {
>       UserState user_state = 1;
>       SideInputState side_input_state = 2;
>     }
>
>     bytes token = 10;
>   }
>
>   // (Optional) A list of cache tokens that can be used by an SDK to reuse  
> // cached data returned by the State API across multiple bundles.  repeated 
> CacheToken cache_tokens = 2;
> }
>
> -Max
>
> On 27.08.19 18:43, Lukasz Cwik wrote:
>
> The bundles view of side inputs should never change during processing and
> should have a point in time snapshot.
>
> I was just trying to say that the cache token for side inputs being
> deferred till side input request time simplified the runners implementation
> since that is conclusively when the runner would need to take a look at the
> side input. Putting them as part of the ProcesBundleRequest complicates
> that but does make the SDK implementation significantly simpler which is a
> win.
>
> On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels <[email protected]> wrote:
>
>> Thanks for the quick response.
>>
>> Just to clarify, the issue with versioning side input is also present
>> when supplying the cache tokens on a request basis instead of per
>> bundle. The SDK never knows when the Runner receives a new version of
>> the side input. Like you pointed out, it needs to mark side inputs as
>> stale and generate new cache tokens for the stale side inputs.
>>
>> The difference between per-request tokens and per-bundle tokens would be
>> that the side input can only change after a bundle completes vs. during
>> the bundle. Side inputs are always fuzzy in that regard because there is
>> no precise instance where side inputs are atomically updated, other than
>> the assumption that they eventually will be updated. In that regard
>> per-bundle tokens for side input seem to be fine.
>>
>> All of the above is not an issue for user state, as its cache can remain
>> valid for the lifetime of a Runner<=>SDK Harness connection. A simple
>> solution would be to not cache side input because there are many cases
>> where the caching just adds additional overhead. However, I can also
>> imagine cases where side input is valid forever and caching would be
>> very beneficial.
>>
>> For the first version I want to focus on user state because that's where
>> I see the most benefit for caching. I don't see a problem though for the
>> Runner to detect new side input and reflect that in the cache tokens
>> supplied for a new bundle.
>>
>> -Max
>>
>> On 26.08.19 22:27, Lukasz Cwik wrote:
>> > Your summary below makes sense to me. I can see that recovery from
>> > rolling back doesn't need to be a priority and simplifies the solution
>> > for user state caching down to one token.
>> >
>> > Providing cache tokens upfront does require the Runner to know what
>> > "version" of everything it may supply to the SDK upfront (instead of on
>> > request) which would mean that the Runner may need to have a mapping
>> > from cache token to internal version identifier for things like side
>> > inputs which are typically broadcast. The Runner would also need to
>> poll
>> > to see if the side input has changed in the background to not block
>> > processing bundles with "stale" side input data.
>> >
>> > Ping me once you have the Runner PR updated and I'll take a look again.
>> >
>> > On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels <[email protected]
>> > <mailto:[email protected]>> wrote:
>> >
>> >     Thank you for the summary Luke. I really appreciate the effort you
>> put
>> >     into this!
>> >
>> >      > Based upon your discussion you seem to want option #1
>> >
>> >     I'm actually for option #2. The option to cache/invalidate side
>> inputs
>> >     is important, and we should incorporate this in the design. That's
>> why
>> >     option #1 is not flexible enough. However, a first implementation
>> could
>> >     defer caching of side inputs.
>> >
>> >     Option #3 was my initial thinking and the first version of the PR,
>> but I
>> >     think we agreed that there wouldn't be much gain from keeping a
>> cache
>> >     token per state id.
>> >
>> >     Option #4 is what is specifically documented in the reference doc
>> and
>> >     already part of the Proto, where valid tokens are provided for each
>> new
>> >     bundle and also as part of the response of a get/put/clear. We
>> mentioned
>> >     that the reply does not have to be waited on synchronously (I
>> mentioned
>> >     it even), but it complicates the implementation. The idea Thomas
>> and I
>> >     expressed was that a response is not even necessary if we assume
>> >     validity of the upfront provided cache tokens for the lifetime of a
>> >     bundle and that cache tokens will be invalidated as soon as the
>> Runner
>> >     fails in any way. This is naturally the case for Flink because it
>> will
>> >     simply "forget" its current cache tokens.
>> >
>> >     I currently envision the following schema:
>> >
>> >     Runner
>> >     ======
>> >
>> >     - Runner generates a globally unique cache token, one for user
>> state and
>> >     one for each side input
>> >
>> >     - The token is supplied to the SDK Harness for each bundle request
>> >
>> >     - For the lifetime of a Runner<=>SDK Harness connection this cache
>> token
>> >     will not change
>> >     - Runner will generate a new token if the connection/key space
>> changes
>> >     between Runner and SDK Harness
>> >
>> >
>> >     SDK
>> >     ===
>> >
>> >     - For each bundle the SDK worker stores the list of valid cache
>> tokens
>> >     - The SDK Harness keep a global cache across all its (local) workers
>> >     which is a LRU cache: state_key => (cache_token, value)
>> >     - get: Lookup cache using the valid cache token for the state. If no
>> >     match, then fetch from Runner and use the already available token
>> for
>> >     caching
>> >     - put: Put value in cache with a valid cache token, put value to
>> pending
>> >     writes which will be flushed out latest when the bundle ends
>> >     - clear: same as put but clear cache
>> >
>> >     It does look like this is not too far off from what you were
>> describing.
>> >     The main difference is that we just work with a single cache token.
>> In
>> >     my opinion we do not need the second cache token for writes, as
>> long as
>> >     we ensure that we generate a new cache token if the
>> >     bundle/checkpoint fails.
>> >
>> >     I have a draft PR
>> >        for the Runner: https://github.com/apache/beam/pull/9374
>> >        for the SDK: https://github.com/apache/beam/pull/9418
>> >
>> >     Note that the Runner PR needs to be updated to fully reflected the
>> above
>> >     scheme. The SDK implementation is WIP. I want to make sure that we
>> >     clarify the design before this gets finalized.
>> >
>> >     Thanks again for all your comments. Much appreciated!
>> >
>> >     Cheers,
>> >     Max
>> >
>> >     On 26.08.19 19:58, Lukasz Cwik wrote:
>> >      > There were originally a couple of ideas around how caching could
>> >     work:
>> >      > 1) One cache token for the entire bundle that is supplied up
>> >     front. The
>> >      > SDK caches everything using the given token. All
>> >     reads/clear/append for
>> >      > all types of state happen under this token. Anytime a side input
>> >      > changes, key processing partition range changes or a bundle
>> fails to
>> >      > process, the runner chooses a new cache token effectively
>> >     invalidating
>> >      > everything in the past>
>> >      > 2) One cache token per type of state that is supplied up front.
>> >      > The SDK caches all requests for a given type using the given
>> cache
>> >      > token. The runner can selectively choose which type to keep and
>> >     which to
>> >      > invalidate. Bundle failure and key processing partition changes
>> >      > invalidate all user state, side input change invalidates all side
>> >     inputs.
>> >      >
>> >      > 3) One cache token per state id that is supplied up front.
>> >      > The SDK caches all requests for the given state id using the
>> >     given cache
>> >      > token. The runner can selectively choose which to invalidate and
>> >     which
>> >      > to keep. Bundle failure and key processing partition changes
>> >     invalidate
>> >      > all user state, side input changes only invalidate the side input
>> >     that
>> >      > changed.
>> >      >
>> >      > 4) A cache token on each read/clear/append that is supplied on
>> the
>> >      > response of the call with an initial valid set that is supplied
>> at
>> >      > start. The runner can selectively choose which to keep on start.
>> >     Bundle
>> >      > failure allows runners to "roll back" to a known good state by
>> >     selecting
>> >      > the previous valid cache token as part of the initial set. Key
>> >      > processing partition changes allow runners to keep cached state
>> that
>> >      > hasn't changed since it can be tied to a version number of the
>> state
>> >      > itself as part of the initial set. Side input changes only
>> invalidate
>> >      > the side input that changed.
>> >      >
>> >      > Based upon your discussion you seem to want option #1 which
>> >     doesn't work
>> >      > well with side inputs clearing cached state. If we want to have
>> user
>> >      > state survive a changing side input, we would want one of the
>> other
>> >      > options. I do agree that supplying the cache token upfront is
>> >      > significantly simpler. Currently the protos are setup for #4
>> since it
>> >      > was the most flexible and at the time the pros outweighed the
>> cons.
>> >      >
>> >      > I don't understand why you think you need to wait for a response
>> >     for the
>> >      > append/clear to get its cache token since the only reason you
>> >     need the
>> >      > cache token is that you want to use that cached data when
>> >     processing a
>> >      > different bundle. I was thinking that the flow on the SDK side
>> >     would be
>> >      > something like (assuming there is a global cache of cache token
>> >     -> (map
>> >      > of state key -> data))
>> >      > 1) Create a local cache of (map of state key -> data) using the
>> >     initial
>> >      > set of valid cache tokens
>> >      > 2) Make all mutations in place on local cache without waiting for
>> >     response.
>> >      > 3) When response comes back, update global cache with new cache
>> >     token ->
>> >      > (map of state key -> data)) (this is when the data becomes
>> visible to
>> >      > other bundles that start processing)
>> >      > 4) Before the bundle finishes processing, wait for all
>> >     outstanding state
>> >      > calls to finish.
>> >      >
>> >      > To implement caching on the runner side, you would keep track of
>> >     at most
>> >      > 2 cache tokens per state key, one cache token represents the
>> initial
>> >      > value when the bundle started while the second represents the
>> >     modified
>> >      > state. If the bundle succeeds the runner passes in the set of
>> tokens
>> >      > which represent the new state, if the bundle fails you process
>> >     using the
>> >      > original ones.
>> >      >
>> >      > After thinking through the implementation again, we could supply
>> two
>> >      > cache tokens for each state id, the first being the set of
>> initial
>> >      > tokens if no writes happen while the second represents the token
>> >     to use
>> >      > if the SDK changes the state. This gives us the simplification
>> >     where we
>> >      > don't need to wait for the response before we update the global
>> cache
>> >      > making a typical blocking cache much easier to do. We also get
>> the
>> >      > benefit that runners can supply either the same cache token for a
>> >     state
>> >      > id or different ones. If the runner supplies the same one then
>> its
>> >      > telling the SDK to make modifications in place without any
>> rollback
>> >      > (which is good on memory since we are reducing copies of stuff)
>> or if
>> >      > the runner supplies two different ones then its telling the SDK
>> >     to keep
>> >      > the old data around. If we went through with this new option the
>> SDK
>> >      > side logic would be (assuming there is a global cache of cache
>> >     token ->
>> >      > (map of state key -> data)):
>> >      >
>> >      > 1) Create an empty local set of state ids that are dirty when
>> >     starting a
>> >      > new bundle (dirty set)
>> >      >
>> >      > For reads/gets:
>> >      > 2A) If the request is a read (get), use dirty set to choose which
>> >     cache
>> >      > token to lookup and use in the global cache. If the global cache
>> is
>> >      > missing data issue the appropriate request providing the result.
>> >      >
>> >      > For writes/appends/clear:
>> >      > 2B) if the cache tokens are different for the state id, add the
>> >     state id
>> >      > to the dirty set if it isn't there and perform the appropriate
>> >      > modification to convert the old cached state data to the new
>> >     state data
>> >      > 3B) modify the global caches data
>> >      > 4B) issue the request to the runner
>> >      > 5B*) add this request to the set of requests to block on before
>> >      > completing the bundle.
>> >      >
>> >      > (* Note, there was another idea to update the process bundle
>> >     response to
>> >      > contain the id of the last state request which would allow the
>> >     runner to
>> >      > know when it has seen the last state request allowing the SDK to
>> not
>> >      > block at all when finishing the bundle)
>> >      >
>> >      > On Thu, Aug 22, 2019 at 10:11 AM Maximilian Michels
>> >     <[email protected] <mailto:[email protected]>
>> >      > <mailto:[email protected] <mailto:[email protected]>>> wrote:
>> >      >
>> >      >     Just to give a quick update here. Rakesh, Thomas, and I had a
>> >     discussion
>> >      >     about async writes from the Python SDK to the Runner. Robert
>> >     was also
>> >      >     present for some parts of the discussion.
>> >      >
>> >      >     We concluded that blocking writes with the need to refresh
>> >     the cache
>> >      >     token each time are not going to provide enough
>> >     throughput/latency.
>> >      >
>> >      >     We figured that it will be enough to use a single cache
>> token per
>> >      >     Runner<=>SDK Harness connection. This cache token will be
>> >     provided by
>> >      >     the Runner in the ProcessBundleRequest. Writes will not yield
>> >     a new
>> >      >     cache token. The advantage is that we can use one cache token
>> >     for the
>> >      >     life time of the bundle and also across bundles, unless the
>> >     Runner
>> >      >     switches to a new Runner<=>SDK Harness connection; then the
>> >     Runner would
>> >      >     have to generate a new cache token.
>> >      >
>> >      >     We might require additional cache tokens for the side inputs.
>> >     For now,
>> >      >     I'm planning to only tackle user state which seems to be the
>> >     area where
>> >      >     users have expressed the most need for caching.
>> >      >
>> >      >     -Max
>> >      >
>> >      >     On 21.08.19 20:05, Maximilian Michels wrote:
>> >      >     >> There is probably a misunderstanding here: I'm suggesting
>> >     to use
>> >      >     a worker ID instead of cache tokens, not additionally.
>> >      >     >
>> >      >     > Ah! Misread that. We need a changing token to indicate
>> that the
>> >      >     cache is
>> >      >     > stale, e.g. checkpoint has failed / restoring from an old
>> >      >     checkpoint. If
>> >      >     > the _Runner_ generates a new unique token/id for workers
>> >     which outlast
>> >      >     > the Runner, then this should work fine. I don't think it is
>> >     safe
>> >      >     for the
>> >      >     > worker to supply the id. The Runner should be in control of
>> >     cache
>> >      >     tokens
>> >      >     > to avoid invalid tokens.
>> >      >     >
>> >      >     >> In the PR the token is modified as part of updating the
>> state.
>> >      >     Doesn't the SDK need the new token to update it's cache entry
>> >     also?
>> >      >     That's where it would help the SDK to know the new token
>> upfront.
>> >      >     >
>> >      >     > If the state is updated in the Runner, a new token has to
>> be
>> >      >     generated.
>> >      >     > The old one is not valid anymore. The SDK will use the
>> updated
>> >      >     token to
>> >      >     > store the new value in the cache. I understand that it
>> would be
>> >      >     nice to
>> >      >     > know the token upfront. That could be possible with some
>> token
>> >      >     > generation scheme. On the other hand, writes can be
>> >     asynchronous and
>> >      >     > thus not block the UDF.
>> >      >     >
>> >      >     >> But I believe there is no need to change the token in
>> first
>> >      >     place, unless bundles for the same key (ranges) can be
>> >     processed by
>> >      >     different workers.
>> >      >     >
>> >      >     > That's certainly possible, e.g. two workers A and B take
>> turn
>> >      >     processing
>> >      >     > a certain key range, one bundle after another:
>> >      >     >
>> >      >     > You process a bundle with a token T with A, then worker B
>> >     takes over.
>> >      >     > Both have an entry with cache token T. So B goes on to
>> >     modify the
>> >      >     state
>> >      >     > and uses the same cache token T. Then A takes over again. A
>> >     would
>> >      >     have a
>> >      >     > stale cache entry but T would still be a valid cache token.
>> >      >     >
>> >      >     >> Indeed the fact that Dataflow can dynamically split and
>> merge
>> >      >     these ranges is what makes it trickier. If Flink does not
>> >      >     repartition the ranges, then things are much easier.
>> >      >     >
>> >      >     > Flink does not dynamically repartition key ranges (yet). If
>> >     it started
>> >      >     > to support that, we would invalidate the cache tokens for
>> >     the changed
>> >      >     > partitions.
>> >      >     >
>> >      >     >
>> >      >     > I'd suggest the following cache token generation scheme:
>> >      >     >
>> >      >     > One cache token per key range for user state and one cache
>> >     token for
>> >      >     > each side input. On writes to user state or changing side
>> >     input, the
>> >      >     > associated cache token will be renewed.
>> >      >     >
>> >      >     > On the SDK side, it should be sufficient to let the SDK
>> >      >     re-associate all
>> >      >     > its cached data belonging to a valid cache token with a new
>> >     cache
>> >      >     token
>> >      >     > returned by a successful write. This has to happen in the
>> >     active scope
>> >      >     > (i.e. user state, or a particular side input).
>> >      >     >
>> >      >     > If the key range changes, new cache tokens have to
>> >     generated. This
>> >      >     > should happen automatically because the Runner does not
>> >     checkpoint
>> >      >     cache
>> >      >     > tokens and will generate new ones when it restarts from an
>> >     earlier
>> >      >     > checkpoint.
>> >      >     >
>> >      >     > The current PR needs to be changed to (1) only keep a
>> >     single cache
>> >      >     token
>> >      >     > per user state and key range (2) add support for cache
>> >     tokens for each
>> >      >     > side input.
>> >      >     >
>> >      >     > Hope that makes sense.
>> >      >     >
>> >      >     > -Max
>> >      >     >
>> >      >     > On 21.08.19 17:27, Reuven Lax wrote:
>> >      >     >>
>> >      >     >>
>> >      >     >> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels
>> >      >     <[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>
>> >      >     >> <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
>> >      >     >>
>> >      >     >>     Appreciate all your comments! Replying below.
>> >      >     >>
>> >      >     >>
>> >      >     >>     @Luke:
>> >      >     >>
>> >      >     >>     > Having cache tokens per key would be very expensive
>> >     indeed
>> >      >     and I
>> >      >     >>     believe we should go with a single cache token "per"
>> >     bundle.
>> >      >     >>
>> >      >     >>     Thanks for your comments on the PR. I was thinking to
>> >     propose
>> >      >     something
>> >      >     >>     along this lines of having cache tokens valid for a
>> >     particular
>> >      >     >>     checkpointing "epoch". That would require even less
>> token
>> >      >     renewal than
>> >      >     >>     the per-bundle approach.
>> >      >     >>
>> >      >     >>
>> >      >     >>     @Thomas, thanks for the input. Some remarks:
>> >      >     >>
>> >      >     >>     > Wouldn't it be simpler to have the runner just
>> track a
>> >      >     unique ID
>> >      >     >>     for each worker and use that to communicate if the
>> >     cache is
>> >      >     valid or
>> >      >     >>     not?
>> >      >     >>
>> >      >     >>     We do not need a unique id per worker. If a cache
>> token is
>> >      >     valid for a
>> >      >     >>     particular worker, it is also valid for another
>> >     worker. That
>> >      >     is with the
>> >      >     >>     assumption that key ranges are always disjoint
>> between the
>> >      >     workers.
>> >      >     >>
>> >      >     >>     > * When the bundle is started, the runner tells the
>> >     worker
>> >      >     if the
>> >      >     >>     cache has become invalid (since it knows if another
>> >     worker has
>> >      >     >>     mutated state)
>> >      >     >>
>> >      >     >>     This is simply done by not transferring the particular
>> >     cache
>> >      >     token. No
>> >      >     >>     need to declare it invalid explicitly.
>> >      >     >>
>> >      >     >>     > * When the worker sends mutation requests to the
>> >     runner, it
>> >      >     >>     includes its own ID (or the runner already has it as
>> >     contextual
>> >      >     >>     information). No need to wait for a response.
>> >      >     >>
>> >      >     >>     Mutations of cached values can be freely done as long
>> >     as the
>> >      >     cache token
>> >      >     >>     associated with the state is valid for a particular
>> >     bundle.
>> >      >     Only the
>> >      >     >>     first time, the Runner needs to wait on the response
>> >     to store
>> >      >     the cache
>> >      >     >>     token. This can also be done asynchronously.
>> >      >     >>
>> >      >     >>     > * When the bundle is finished, the runner records
>> >     the last
>> >      >     writer
>> >      >     >>     (only if a change occurred)
>> >      >     >>
>> >      >     >>     I believe this is not necessary because there will
>> only be
>> >      >     one writer at
>> >      >     >>     a time for a particular bundle and key range, hence
>> >     only one
>> >      >     writer
>> >      >     >>     holds a valid cache token for a particular state and
>> >     key range.
>> >      >     >>
>> >      >     >>
>> >      >     >>     @Reuven:
>> >      >     >>
>> >      >     >>     >  Dataflow divides the keyspace up into lexicographic
>> >      >     ranges, and
>> >      >     >>     creates a cache token per range.
>> >      >     >>
>> >      >     >>     State is always processed partitioned by the Flink
>> workers
>> >      >     (hash-based,
>> >      >     >>     not lexicopgrahical). I don't think that matters
>> though
>> >      >     because the key
>> >      >     >>     ranges do not overlap between the workers. Flink does
>> >     not support
>> >      >     >>     dynamically repartitioning the key ranges. Even in
>> case of
>> >      >     fine-grained
>> >      >     >>     recovery of workers and their key ranges, we would
>> simply
>> >      >     generate new
>> >      >     >>     cache tokens for a particular worker.
>> >      >     >>
>> >      >     >>
>> >      >     >> Dataflow's ranges are also hash based. When I said
>> >     lexicographical, I
>> >      >     >> meant lexicographical based on the hexadecimal hash value.
>> >      >     >>
>> >      >     >> Indeed the fact that Dataflow can dynamically split and
>> >     merge these
>> >      >     >> ranges is what makes it trickier. If Flink does not
>> >     repartition the
>> >      >     >> ranges, then things are much easier.
>> >      >     >>
>> >      >     >>
>> >      >     >>
>> >      >     >>     Thanks,
>> >      >     >>     Max
>> >      >     >>
>> >      >     >>     On 21.08.19 09:33, Reuven Lax wrote:
>> >      >     >>     > Dataflow does something like this, however since
>> work is
>> >      >     >>     > load balanced across workers a per-worker id doesn't
>> >     work
>> >      >     very well.
>> >      >     >>     > Dataflow divides the keyspace up into lexicographic
>> >     ranges, and
>> >      >     >>     creates
>> >      >     >>     > a cache token per range.
>> >      >     >>     >
>> >      >     >>     > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise
>> >      >     <[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>
>> >      >     >>     <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>>
>> >      >     >>     > <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>
>> >      >     <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
>> >      >     >>     >
>> >      >     >>     >     Commenting here vs. on the PR since related to
>> >     the overall
>> >      >     >>     approach.
>> >      >     >>     >
>> >      >     >>     >     Wouldn't it be simpler to have the runner just
>> >     track a
>> >      >     unique
>> >      >     >>     ID for
>> >      >     >>     >     each worker and use that to communicate if the
>> >     cache is
>> >      >     valid
>> >      >     >>     or not?
>> >      >     >>     >
>> >      >     >>     >     * When the bundle is started, the runner tells
>> the
>> >      >     worker if the
>> >      >     >>     >     cache has become invalid (since it knows if
>> another
>> >      >     worker has
>> >      >     >>     >     mutated state)
>> >      >     >>     >     * When the worker sends mutation requests to the
>> >     runner, it
>> >      >     >>     includes
>> >      >     >>     >     its own ID (or the runner already has it as
>> >     contextual
>> >      >     >>     information).
>> >      >     >>     >     No need to wait for a response.
>> >      >     >>     >     * When the bundle is finished, the runner
>> >     records the
>> >      >     last writer
>> >      >     >>     >     (only if a change occurred)
>> >      >     >>     >
>> >      >     >>     >     Whenever current worker ID and last writer ID
>> >     doesn't
>> >      >     match, cache
>> >      >     >>     >     is invalid.
>> >      >     >>     >
>> >      >     >>     >     Thomas
>> >      >     >>     >
>> >      >     >>     >
>> >      >     >>     >     On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik
>> >      >     <[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>
>> >      >     >>     <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>>
>> >      >     >>     >     <mailto:[email protected]
>> >     <mailto:[email protected]> <mailto:[email protected]
>> >     <mailto:[email protected]>>
>> >      >     <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
>> >      >     >>     >
>> >      >     >>     >         Having cache tokens per key would be very
>> >     expensive
>> >      >     indeed
>> >      >     >>     and I
>> >      >     >>     >         believe we should go with a single cache
>> token
>> >      >     "per" bundle.
>> >      >     >>     >
>> >      >     >>     >         On Mon, Aug 19, 2019 at 11:36 AM Maximilian
>> >     Michels
>> >      >     >>     >         <[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>
>> >      >     <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>>
>> >      >     >>     <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>
>> >      >     <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
>> >      >     >>     >
>> >      >     >>     >             Maybe a Beam Python expert can chime in
>> for
>> >      >     Rakesh's
>> >      >     >>     question?
>> >      >     >>     >
>> >      >     >>     >             Luke, I was assuming cache tokens to be
>> >     per key
>> >      >     and state
>> >      >     >>     >             id. During
>> >      >     >>     >             implementing an initial support on the
>> >     Runner
>> >      >     side, I
>> >      >     >>     >             realized that we
>> >      >     >>     >             probably want cache tokens to only be
>> >     per state
>> >      >     id. Note
>> >      >     >>     >             that if we had
>> >      >     >>     >             per-key cache tokens, the number of
>> cache
>> >      >     tokens would
>> >      >     >>     >             approach the
>> >      >     >>     >             total number of keys in an application.
>> >      >     >>     >
>> >      >     >>     >             If anyone wants to have a look, here is
>> >     a first
>> >      >     version of
>> >      >     >>     >             the Runner
>> >      >     >>     >             side for cache tokens. Note that I only
>> >      >     implemented cache
>> >      >     >>     >             tokens for
>> >      >     >>     >             BagUserState for now, but it can be
>> easily
>> >      >     added for side
>> >      >     >>     >             inputs as well.
>> >      >     >>     >
>> >      >     >>     > https://github.com/apache/beam/pull/9374
>> >      >     >>     >
>> >      >     >>     >             -Max
>> >      >     >>     >
>> >      >     >>     >
>> >      >     >>
>> >      >
>> >
>>
>

Reply via email to