Just to clarify, the repeated list of cache tokens in the process bundle request is used to validate reading *and* stored when writing? In that sense, should they just be called version identifiers or something like that?
On Tue, Aug 27, 2019 at 11:33 AM Maximilian Michels <m...@apache.org> wrote: > > Thanks. Updated: > > message ProcessBundleRequest { > // (Required) A reference to the process bundle descriptor that must be > // instantiated and executed by the SDK harness. > string process_bundle_descriptor_reference = 1; > > // A cache token which can be used by an SDK to check for the validity > // of cached elements which have a cache token associated. > message CacheToken { > > // A flag to indicate a cache token is valid for user state. > message UserState {} > > // A flag to indicate a cache token is valid for a side input. > message SideInput { > // The id of a side input. > string side_input = 1; > } > > // The scope of a cache token. > oneof type { > UserState user_state = 1; > SideInput side_input = 2; > } > > // The cache token identifier which should be globally unique. > bytes token = 10; > } > > // (Optional) A list of cache tokens that can be used by an SDK to reuse > // cached data returned by the State API across multiple bundles. > repeated CacheToken cache_tokens = 2; > } > > On 27.08.19 19:22, Lukasz Cwik wrote: > > SideInputState -> SideInput (side_input_state -> side_input) > + more comments around the messages and the fields. > > > On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels <m...@apache.org> wrote: >> >> We would have to differentiate cache tokens for user state and side inputs. >> How about something like this? >> >> message ProcessBundleRequest { >> // (Required) A reference to the process bundle descriptor that must be >> // instantiated and executed by the SDK harness. >> string process_bundle_descriptor_reference = 1; >> >> message CacheToken { >> >> message UserState { >> } >> >> message SideInputState { >> string side_input_id = 1; >> } >> >> oneof type { >> UserState user_state = 1; >> SideInputState side_input_state = 2; >> } >> >> bytes token = 10; >> } >> >> // (Optional) A list of cache tokens that can be used by an SDK to reuse >> // cached data returned by the State API across multiple bundles. >> repeated CacheToken cache_tokens = 2; >> } >> >> -Max >> >> On 27.08.19 18:43, Lukasz Cwik wrote: >> >> The bundles view of side inputs should never change during processing and >> should have a point in time snapshot. >> >> I was just trying to say that the cache token for side inputs being deferred >> till side input request time simplified the runners implementation since >> that is conclusively when the runner would need to take a look at the side >> input. Putting them as part of the ProcesBundleRequest complicates that but >> does make the SDK implementation significantly simpler which is a win. >> >> On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels <m...@apache.org> wrote: >>> >>> Thanks for the quick response. >>> >>> Just to clarify, the issue with versioning side input is also present >>> when supplying the cache tokens on a request basis instead of per >>> bundle. The SDK never knows when the Runner receives a new version of >>> the side input. Like you pointed out, it needs to mark side inputs as >>> stale and generate new cache tokens for the stale side inputs. >>> >>> The difference between per-request tokens and per-bundle tokens would be >>> that the side input can only change after a bundle completes vs. during >>> the bundle. Side inputs are always fuzzy in that regard because there is >>> no precise instance where side inputs are atomically updated, other than >>> the assumption that they eventually will be updated. In that regard >>> per-bundle tokens for side input seem to be fine. >>> >>> All of the above is not an issue for user state, as its cache can remain >>> valid for the lifetime of a Runner<=>SDK Harness connection. A simple >>> solution would be to not cache side input because there are many cases >>> where the caching just adds additional overhead. However, I can also >>> imagine cases where side input is valid forever and caching would be >>> very beneficial. >>> >>> For the first version I want to focus on user state because that's where >>> I see the most benefit for caching. I don't see a problem though for the >>> Runner to detect new side input and reflect that in the cache tokens >>> supplied for a new bundle. >>> >>> -Max >>> >>> On 26.08.19 22:27, Lukasz Cwik wrote: >>> > Your summary below makes sense to me. I can see that recovery from >>> > rolling back doesn't need to be a priority and simplifies the solution >>> > for user state caching down to one token. >>> > >>> > Providing cache tokens upfront does require the Runner to know what >>> > "version" of everything it may supply to the SDK upfront (instead of on >>> > request) which would mean that the Runner may need to have a mapping >>> > from cache token to internal version identifier for things like side >>> > inputs which are typically broadcast. The Runner would also need to poll >>> > to see if the side input has changed in the background to not block >>> > processing bundles with "stale" side input data. >>> > >>> > Ping me once you have the Runner PR updated and I'll take a look again. >>> > >>> > On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels <m...@apache.org >>> > <mailto:m...@apache.org>> wrote: >>> > >>> > Thank you for the summary Luke. I really appreciate the effort you put >>> > into this! >>> > >>> > > Based upon your discussion you seem to want option #1 >>> > >>> > I'm actually for option #2. The option to cache/invalidate side inputs >>> > is important, and we should incorporate this in the design. That's why >>> > option #1 is not flexible enough. However, a first implementation >>> > could >>> > defer caching of side inputs. >>> > >>> > Option #3 was my initial thinking and the first version of the PR, >>> > but I >>> > think we agreed that there wouldn't be much gain from keeping a cache >>> > token per state id. >>> > >>> > Option #4 is what is specifically documented in the reference doc and >>> > already part of the Proto, where valid tokens are provided for each >>> > new >>> > bundle and also as part of the response of a get/put/clear. We >>> > mentioned >>> > that the reply does not have to be waited on synchronously (I >>> > mentioned >>> > it even), but it complicates the implementation. The idea Thomas and I >>> > expressed was that a response is not even necessary if we assume >>> > validity of the upfront provided cache tokens for the lifetime of a >>> > bundle and that cache tokens will be invalidated as soon as the Runner >>> > fails in any way. This is naturally the case for Flink because it will >>> > simply "forget" its current cache tokens. >>> > >>> > I currently envision the following schema: >>> > >>> > Runner >>> > ====== >>> > >>> > - Runner generates a globally unique cache token, one for user state >>> > and >>> > one for each side input >>> > >>> > - The token is supplied to the SDK Harness for each bundle request >>> > >>> > - For the lifetime of a Runner<=>SDK Harness connection this cache >>> > token >>> > will not change >>> > - Runner will generate a new token if the connection/key space changes >>> > between Runner and SDK Harness >>> > >>> > >>> > SDK >>> > === >>> > >>> > - For each bundle the SDK worker stores the list of valid cache tokens >>> > - The SDK Harness keep a global cache across all its (local) workers >>> > which is a LRU cache: state_key => (cache_token, value) >>> > - get: Lookup cache using the valid cache token for the state. If no >>> > match, then fetch from Runner and use the already available token for >>> > caching >>> > - put: Put value in cache with a valid cache token, put value to >>> > pending >>> > writes which will be flushed out latest when the bundle ends >>> > - clear: same as put but clear cache >>> > >>> > It does look like this is not too far off from what you were >>> > describing. >>> > The main difference is that we just work with a single cache token. In >>> > my opinion we do not need the second cache token for writes, as long >>> > as >>> > we ensure that we generate a new cache token if the >>> > bundle/checkpoint fails. >>> > >>> > I have a draft PR >>> > for the Runner: https://github.com/apache/beam/pull/9374 >>> > for the SDK: https://github.com/apache/beam/pull/9418 >>> > >>> > Note that the Runner PR needs to be updated to fully reflected the >>> > above >>> > scheme. The SDK implementation is WIP. I want to make sure that we >>> > clarify the design before this gets finalized. >>> > >>> > Thanks again for all your comments. Much appreciated! >>> > >>> > Cheers, >>> > Max >>> > >>> > On 26.08.19 19:58, Lukasz Cwik wrote: >>> > > There were originally a couple of ideas around how caching could >>> > work: >>> > > 1) One cache token for the entire bundle that is supplied up >>> > front. The >>> > > SDK caches everything using the given token. All >>> > reads/clear/append for >>> > > all types of state happen under this token. Anytime a side input >>> > > changes, key processing partition range changes or a bundle fails >>> > to >>> > > process, the runner chooses a new cache token effectively >>> > invalidating >>> > > everything in the past> >>> > > 2) One cache token per type of state that is supplied up front. >>> > > The SDK caches all requests for a given type using the given cache >>> > > token. The runner can selectively choose which type to keep and >>> > which to >>> > > invalidate. Bundle failure and key processing partition changes >>> > > invalidate all user state, side input change invalidates all side >>> > inputs. >>> > > >>> > > 3) One cache token per state id that is supplied up front. >>> > > The SDK caches all requests for the given state id using the >>> > given cache >>> > > token. The runner can selectively choose which to invalidate and >>> > which >>> > > to keep. Bundle failure and key processing partition changes >>> > invalidate >>> > > all user state, side input changes only invalidate the side input >>> > that >>> > > changed. >>> > > >>> > > 4) A cache token on each read/clear/append that is supplied on the >>> > > response of the call with an initial valid set that is supplied at >>> > > start. The runner can selectively choose which to keep on start. >>> > Bundle >>> > > failure allows runners to "roll back" to a known good state by >>> > selecting >>> > > the previous valid cache token as part of the initial set. Key >>> > > processing partition changes allow runners to keep cached state >>> > that >>> > > hasn't changed since it can be tied to a version number of the >>> > state >>> > > itself as part of the initial set. Side input changes only >>> > invalidate >>> > > the side input that changed. >>> > > >>> > > Based upon your discussion you seem to want option #1 which >>> > doesn't work >>> > > well with side inputs clearing cached state. If we want to have >>> > user >>> > > state survive a changing side input, we would want one of the other >>> > > options. I do agree that supplying the cache token upfront is >>> > > significantly simpler. Currently the protos are setup for #4 since >>> > it >>> > > was the most flexible and at the time the pros outweighed the cons. >>> > > >>> > > I don't understand why you think you need to wait for a response >>> > for the >>> > > append/clear to get its cache token since the only reason you >>> > need the >>> > > cache token is that you want to use that cached data when >>> > processing a >>> > > different bundle. I was thinking that the flow on the SDK side >>> > would be >>> > > something like (assuming there is a global cache of cache token >>> > -> (map >>> > > of state key -> data)) >>> > > 1) Create a local cache of (map of state key -> data) using the >>> > initial >>> > > set of valid cache tokens >>> > > 2) Make all mutations in place on local cache without waiting for >>> > response. >>> > > 3) When response comes back, update global cache with new cache >>> > token -> >>> > > (map of state key -> data)) (this is when the data becomes visible >>> > to >>> > > other bundles that start processing) >>> > > 4) Before the bundle finishes processing, wait for all >>> > outstanding state >>> > > calls to finish. >>> > > >>> > > To implement caching on the runner side, you would keep track of >>> > at most >>> > > 2 cache tokens per state key, one cache token represents the >>> > initial >>> > > value when the bundle started while the second represents the >>> > modified >>> > > state. If the bundle succeeds the runner passes in the set of >>> > tokens >>> > > which represent the new state, if the bundle fails you process >>> > using the >>> > > original ones. >>> > > >>> > > After thinking through the implementation again, we could supply >>> > two >>> > > cache tokens for each state id, the first being the set of initial >>> > > tokens if no writes happen while the second represents the token >>> > to use >>> > > if the SDK changes the state. This gives us the simplification >>> > where we >>> > > don't need to wait for the response before we update the global >>> > cache >>> > > making a typical blocking cache much easier to do. We also get the >>> > > benefit that runners can supply either the same cache token for a >>> > state >>> > > id or different ones. If the runner supplies the same one then its >>> > > telling the SDK to make modifications in place without any rollback >>> > > (which is good on memory since we are reducing copies of stuff) or >>> > if >>> > > the runner supplies two different ones then its telling the SDK >>> > to keep >>> > > the old data around. If we went through with this new option the >>> > SDK >>> > > side logic would be (assuming there is a global cache of cache >>> > token -> >>> > > (map of state key -> data)): >>> > > >>> > > 1) Create an empty local set of state ids that are dirty when >>> > starting a >>> > > new bundle (dirty set) >>> > > >>> > > For reads/gets: >>> > > 2A) If the request is a read (get), use dirty set to choose which >>> > cache >>> > > token to lookup and use in the global cache. If the global cache is >>> > > missing data issue the appropriate request providing the result. >>> > > >>> > > For writes/appends/clear: >>> > > 2B) if the cache tokens are different for the state id, add the >>> > state id >>> > > to the dirty set if it isn't there and perform the appropriate >>> > > modification to convert the old cached state data to the new >>> > state data >>> > > 3B) modify the global caches data >>> > > 4B) issue the request to the runner >>> > > 5B*) add this request to the set of requests to block on before >>> > > completing the bundle. >>> > > >>> > > (* Note, there was another idea to update the process bundle >>> > response to >>> > > contain the id of the last state request which would allow the >>> > runner to >>> > > know when it has seen the last state request allowing the SDK to >>> > not >>> > > block at all when finishing the bundle) >>> > > >>> > > On Thu, Aug 22, 2019 at 10:11 AM Maximilian Michels >>> > <m...@apache.org <mailto:m...@apache.org> >>> > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: >>> > > >>> > > Just to give a quick update here. Rakesh, Thomas, and I had a >>> > discussion >>> > > about async writes from the Python SDK to the Runner. Robert >>> > was also >>> > > present for some parts of the discussion. >>> > > >>> > > We concluded that blocking writes with the need to refresh >>> > the cache >>> > > token each time are not going to provide enough >>> > throughput/latency. >>> > > >>> > > We figured that it will be enough to use a single cache token >>> > per >>> > > Runner<=>SDK Harness connection. This cache token will be >>> > provided by >>> > > the Runner in the ProcessBundleRequest. Writes will not yield >>> > a new >>> > > cache token. The advantage is that we can use one cache token >>> > for the >>> > > life time of the bundle and also across bundles, unless the >>> > Runner >>> > > switches to a new Runner<=>SDK Harness connection; then the >>> > Runner would >>> > > have to generate a new cache token. >>> > > >>> > > We might require additional cache tokens for the side inputs. >>> > For now, >>> > > I'm planning to only tackle user state which seems to be the >>> > area where >>> > > users have expressed the most need for caching. >>> > > >>> > > -Max >>> > > >>> > > On 21.08.19 20:05, Maximilian Michels wrote: >>> > > >> There is probably a misunderstanding here: I'm suggesting >>> > to use >>> > > a worker ID instead of cache tokens, not additionally. >>> > > > >>> > > > Ah! Misread that. We need a changing token to indicate that >>> > the >>> > > cache is >>> > > > stale, e.g. checkpoint has failed / restoring from an old >>> > > checkpoint. If >>> > > > the _Runner_ generates a new unique token/id for workers >>> > which outlast >>> > > > the Runner, then this should work fine. I don't think it is >>> > safe >>> > > for the >>> > > > worker to supply the id. The Runner should be in control of >>> > cache >>> > > tokens >>> > > > to avoid invalid tokens. >>> > > > >>> > > >> In the PR the token is modified as part of updating the >>> > state. >>> > > Doesn't the SDK need the new token to update it's cache entry >>> > also? >>> > > That's where it would help the SDK to know the new token >>> > upfront. >>> > > > >>> > > > If the state is updated in the Runner, a new token has to be >>> > > generated. >>> > > > The old one is not valid anymore. The SDK will use the >>> > updated >>> > > token to >>> > > > store the new value in the cache. I understand that it would >>> > be >>> > > nice to >>> > > > know the token upfront. That could be possible with some >>> > token >>> > > > generation scheme. On the other hand, writes can be >>> > asynchronous and >>> > > > thus not block the UDF. >>> > > > >>> > > >> But I believe there is no need to change the token in first >>> > > place, unless bundles for the same key (ranges) can be >>> > processed by >>> > > different workers. >>> > > > >>> > > > That's certainly possible, e.g. two workers A and B take turn >>> > > processing >>> > > > a certain key range, one bundle after another: >>> > > > >>> > > > You process a bundle with a token T with A, then worker B >>> > takes over. >>> > > > Both have an entry with cache token T. So B goes on to >>> > modify the >>> > > state >>> > > > and uses the same cache token T. Then A takes over again. A >>> > would >>> > > have a >>> > > > stale cache entry but T would still be a valid cache token. >>> > > > >>> > > >> Indeed the fact that Dataflow can dynamically split and >>> > merge >>> > > these ranges is what makes it trickier. If Flink does not >>> > > repartition the ranges, then things are much easier. >>> > > > >>> > > > Flink does not dynamically repartition key ranges (yet). If >>> > it started >>> > > > to support that, we would invalidate the cache tokens for >>> > the changed >>> > > > partitions. >>> > > > >>> > > > >>> > > > I'd suggest the following cache token generation scheme: >>> > > > >>> > > > One cache token per key range for user state and one cache >>> > token for >>> > > > each side input. On writes to user state or changing side >>> > input, the >>> > > > associated cache token will be renewed. >>> > > > >>> > > > On the SDK side, it should be sufficient to let the SDK >>> > > re-associate all >>> > > > its cached data belonging to a valid cache token with a new >>> > cache >>> > > token >>> > > > returned by a successful write. This has to happen in the >>> > active scope >>> > > > (i.e. user state, or a particular side input). >>> > > > >>> > > > If the key range changes, new cache tokens have to >>> > generated. This >>> > > > should happen automatically because the Runner does not >>> > checkpoint >>> > > cache >>> > > > tokens and will generate new ones when it restarts from an >>> > earlier >>> > > > checkpoint. >>> > > > >>> > > > The current PR needs to be changed to (1) only keep a >>> > single cache >>> > > token >>> > > > per user state and key range (2) add support for cache >>> > tokens for each >>> > > > side input. >>> > > > >>> > > > Hope that makes sense. >>> > > > >>> > > > -Max >>> > > > >>> > > > On 21.08.19 17:27, Reuven Lax wrote: >>> > > >> >>> > > >> >>> > > >> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels >>> > > <m...@apache.org <mailto:m...@apache.org> >>> > <mailto:m...@apache.org <mailto:m...@apache.org>> >>> > > >> <mailto:m...@apache.org <mailto:m...@apache.org> >>> > <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: >>> > > >> >>> > > >> Appreciate all your comments! Replying below. >>> > > >> >>> > > >> >>> > > >> @Luke: >>> > > >> >>> > > >> > Having cache tokens per key would be very expensive >>> > indeed >>> > > and I >>> > > >> believe we should go with a single cache token "per" >>> > bundle. >>> > > >> >>> > > >> Thanks for your comments on the PR. I was thinking to >>> > propose >>> > > something >>> > > >> along this lines of having cache tokens valid for a >>> > particular >>> > > >> checkpointing "epoch". That would require even less >>> > token >>> > > renewal than >>> > > >> the per-bundle approach. >>> > > >> >>> > > >> >>> > > >> @Thomas, thanks for the input. Some remarks: >>> > > >> >>> > > >> > Wouldn't it be simpler to have the runner just track a >>> > > unique ID >>> > > >> for each worker and use that to communicate if the >>> > cache is >>> > > valid or >>> > > >> not? >>> > > >> >>> > > >> We do not need a unique id per worker. If a cache token >>> > is >>> > > valid for a >>> > > >> particular worker, it is also valid for another >>> > worker. That >>> > > is with the >>> > > >> assumption that key ranges are always disjoint between >>> > the >>> > > workers. >>> > > >> >>> > > >> > * When the bundle is started, the runner tells the >>> > worker >>> > > if the >>> > > >> cache has become invalid (since it knows if another >>> > worker has >>> > > >> mutated state) >>> > > >> >>> > > >> This is simply done by not transferring the particular >>> > cache >>> > > token. No >>> > > >> need to declare it invalid explicitly. >>> > > >> >>> > > >> > * When the worker sends mutation requests to the >>> > runner, it >>> > > >> includes its own ID (or the runner already has it as >>> > contextual >>> > > >> information). No need to wait for a response. >>> > > >> >>> > > >> Mutations of cached values can be freely done as long >>> > as the >>> > > cache token >>> > > >> associated with the state is valid for a particular >>> > bundle. >>> > > Only the >>> > > >> first time, the Runner needs to wait on the response >>> > to store >>> > > the cache >>> > > >> token. This can also be done asynchronously. >>> > > >> >>> > > >> > * When the bundle is finished, the runner records >>> > the last >>> > > writer >>> > > >> (only if a change occurred) >>> > > >> >>> > > >> I believe this is not necessary because there will only >>> > be >>> > > one writer at >>> > > >> a time for a particular bundle and key range, hence >>> > only one >>> > > writer >>> > > >> holds a valid cache token for a particular state and >>> > key range. >>> > > >> >>> > > >> >>> > > >> @Reuven: >>> > > >> >>> > > >> > Dataflow divides the keyspace up into lexicographic >>> > > ranges, and >>> > > >> creates a cache token per range. >>> > > >> >>> > > >> State is always processed partitioned by the Flink >>> > workers >>> > > (hash-based, >>> > > >> not lexicopgrahical). I don't think that matters though >>> > > because the key >>> > > >> ranges do not overlap between the workers. Flink does >>> > not support >>> > > >> dynamically repartitioning the key ranges. Even in case >>> > of >>> > > fine-grained >>> > > >> recovery of workers and their key ranges, we would >>> > simply >>> > > generate new >>> > > >> cache tokens for a particular worker. >>> > > >> >>> > > >> >>> > > >> Dataflow's ranges are also hash based. When I said >>> > lexicographical, I >>> > > >> meant lexicographical based on the hexadecimal hash value. >>> > > >> >>> > > >> Indeed the fact that Dataflow can dynamically split and >>> > merge these >>> > > >> ranges is what makes it trickier. If Flink does not >>> > repartition the >>> > > >> ranges, then things are much easier. >>> > > >> >>> > > >> >>> > > >> >>> > > >> Thanks, >>> > > >> Max >>> > > >> >>> > > >> On 21.08.19 09:33, Reuven Lax wrote: >>> > > >> > Dataflow does something like this, however since work >>> > is >>> > > >> > load balanced across workers a per-worker id doesn't >>> > work >>> > > very well. >>> > > >> > Dataflow divides the keyspace up into lexicographic >>> > ranges, and >>> > > >> creates >>> > > >> > a cache token per range. >>> > > >> > >>> > > >> > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise >>> > > <t...@apache.org <mailto:t...@apache.org> >>> > <mailto:t...@apache.org <mailto:t...@apache.org>> >>> > > >> <mailto:t...@apache.org <mailto:t...@apache.org> >>> > <mailto:t...@apache.org <mailto:t...@apache.org>>> >>> > > >> > <mailto:t...@apache.org <mailto:t...@apache.org> >>> > <mailto:t...@apache.org <mailto:t...@apache.org>> >>> > > <mailto:t...@apache.org <mailto:t...@apache.org> >>> > <mailto:t...@apache.org <mailto:t...@apache.org>>>>> wrote: >>> > > >> > >>> > > >> > Commenting here vs. on the PR since related to >>> > the overall >>> > > >> approach. >>> > > >> > >>> > > >> > Wouldn't it be simpler to have the runner just >>> > track a >>> > > unique >>> > > >> ID for >>> > > >> > each worker and use that to communicate if the >>> > cache is >>> > > valid >>> > > >> or not? >>> > > >> > >>> > > >> > * When the bundle is started, the runner tells the >>> > > worker if the >>> > > >> > cache has become invalid (since it knows if >>> > another >>> > > worker has >>> > > >> > mutated state) >>> > > >> > * When the worker sends mutation requests to the >>> > runner, it >>> > > >> includes >>> > > >> > its own ID (or the runner already has it as >>> > contextual >>> > > >> information). >>> > > >> > No need to wait for a response. >>> > > >> > * When the bundle is finished, the runner >>> > records the >>> > > last writer >>> > > >> > (only if a change occurred) >>> > > >> > >>> > > >> > Whenever current worker ID and last writer ID >>> > doesn't >>> > > match, cache >>> > > >> > is invalid. >>> > > >> > >>> > > >> > Thomas >>> > > >> > >>> > > >> > >>> > > >> > On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik >>> > > <lc...@google.com <mailto:lc...@google.com> >>> > <mailto:lc...@google.com <mailto:lc...@google.com>> >>> > > >> <mailto:lc...@google.com <mailto:lc...@google.com> >>> > <mailto:lc...@google.com <mailto:lc...@google.com>>> >>> > > >> > <mailto:lc...@google.com >>> > <mailto:lc...@google.com> <mailto:lc...@google.com >>> > <mailto:lc...@google.com>> >>> > > <mailto:lc...@google.com <mailto:lc...@google.com> >>> > <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote: >>> > > >> > >>> > > >> > Having cache tokens per key would be very >>> > expensive >>> > > indeed >>> > > >> and I >>> > > >> > believe we should go with a single cache token >>> > > "per" bundle. >>> > > >> > >>> > > >> > On Mon, Aug 19, 2019 at 11:36 AM Maximilian >>> > Michels >>> > > >> > <m...@apache.org <mailto:m...@apache.org> >>> > <mailto:m...@apache.org <mailto:m...@apache.org>> >>> > > <mailto:m...@apache.org <mailto:m...@apache.org> >>> > <mailto:m...@apache.org <mailto:m...@apache.org>>> >>> > > >> <mailto:m...@apache.org <mailto:m...@apache.org> >>> > <mailto:m...@apache.org <mailto:m...@apache.org>> >>> > > <mailto:m...@apache.org <mailto:m...@apache.org> >>> > <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote: >>> > > >> > >>> > > >> > Maybe a Beam Python expert can chime in >>> > for >>> > > Rakesh's >>> > > >> question? >>> > > >> > >>> > > >> > Luke, I was assuming cache tokens to be >>> > per key >>> > > and state >>> > > >> > id. During >>> > > >> > implementing an initial support on the >>> > Runner >>> > > side, I >>> > > >> > realized that we >>> > > >> > probably want cache tokens to only be >>> > per state >>> > > id. Note >>> > > >> > that if we had >>> > > >> > per-key cache tokens, the number of cache >>> > > tokens would >>> > > >> > approach the >>> > > >> > total number of keys in an application. >>> > > >> > >>> > > >> > If anyone wants to have a look, here is >>> > a first >>> > > version of >>> > > >> > the Runner >>> > > >> > side for cache tokens. Note that I only >>> > > implemented cache >>> > > >> > tokens for >>> > > >> > BagUserState for now, but it can be easily >>> > > added for side >>> > > >> > inputs as well. >>> > > >> > >>> > > >> > https://github.com/apache/beam/pull/9374 >>> > > >> > >>> > > >> > -Max >>> > > >> > >>> > > >> > >>> > > >> >>> > > >>> >