The bundles view of side inputs should never change during processing and should have a point in time snapshot.
I was just trying to say that the cache token for side inputs being deferred till side input request time simplified the runners implementation since that is conclusively when the runner would need to take a look at the side input. Putting them as part of the ProcesBundleRequest complicates that but does make the SDK implementation significantly simpler which is a win. On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels <[email protected]> wrote: > Thanks for the quick response. > > Just to clarify, the issue with versioning side input is also present > when supplying the cache tokens on a request basis instead of per > bundle. The SDK never knows when the Runner receives a new version of > the side input. Like you pointed out, it needs to mark side inputs as > stale and generate new cache tokens for the stale side inputs. > > The difference between per-request tokens and per-bundle tokens would be > that the side input can only change after a bundle completes vs. during > the bundle. Side inputs are always fuzzy in that regard because there is > no precise instance where side inputs are atomically updated, other than > the assumption that they eventually will be updated. In that regard > per-bundle tokens for side input seem to be fine. > > All of the above is not an issue for user state, as its cache can remain > valid for the lifetime of a Runner<=>SDK Harness connection. A simple > solution would be to not cache side input because there are many cases > where the caching just adds additional overhead. However, I can also > imagine cases where side input is valid forever and caching would be > very beneficial. > > For the first version I want to focus on user state because that's where > I see the most benefit for caching. I don't see a problem though for the > Runner to detect new side input and reflect that in the cache tokens > supplied for a new bundle. > > -Max > > On 26.08.19 22:27, Lukasz Cwik wrote: > > Your summary below makes sense to me. I can see that recovery from > > rolling back doesn't need to be a priority and simplifies the solution > > for user state caching down to one token. > > > > Providing cache tokens upfront does require the Runner to know what > > "version" of everything it may supply to the SDK upfront (instead of on > > request) which would mean that the Runner may need to have a mapping > > from cache token to internal version identifier for things like side > > inputs which are typically broadcast. The Runner would also need to poll > > to see if the side input has changed in the background to not block > > processing bundles with "stale" side input data. > > > > Ping me once you have the Runner PR updated and I'll take a look again. > > > > On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels <[email protected] > > <mailto:[email protected]>> wrote: > > > > Thank you for the summary Luke. I really appreciate the effort you > put > > into this! > > > > > Based upon your discussion you seem to want option #1 > > > > I'm actually for option #2. The option to cache/invalidate side > inputs > > is important, and we should incorporate this in the design. That's > why > > option #1 is not flexible enough. However, a first implementation > could > > defer caching of side inputs. > > > > Option #3 was my initial thinking and the first version of the PR, > but I > > think we agreed that there wouldn't be much gain from keeping a cache > > token per state id. > > > > Option #4 is what is specifically documented in the reference doc and > > already part of the Proto, where valid tokens are provided for each > new > > bundle and also as part of the response of a get/put/clear. We > mentioned > > that the reply does not have to be waited on synchronously (I > mentioned > > it even), but it complicates the implementation. The idea Thomas and > I > > expressed was that a response is not even necessary if we assume > > validity of the upfront provided cache tokens for the lifetime of a > > bundle and that cache tokens will be invalidated as soon as the > Runner > > fails in any way. This is naturally the case for Flink because it > will > > simply "forget" its current cache tokens. > > > > I currently envision the following schema: > > > > Runner > > ====== > > > > - Runner generates a globally unique cache token, one for user state > and > > one for each side input > > > > - The token is supplied to the SDK Harness for each bundle request > > > > - For the lifetime of a Runner<=>SDK Harness connection this cache > token > > will not change > > - Runner will generate a new token if the connection/key space > changes > > between Runner and SDK Harness > > > > > > SDK > > === > > > > - For each bundle the SDK worker stores the list of valid cache > tokens > > - The SDK Harness keep a global cache across all its (local) workers > > which is a LRU cache: state_key => (cache_token, value) > > - get: Lookup cache using the valid cache token for the state. If no > > match, then fetch from Runner and use the already available token for > > caching > > - put: Put value in cache with a valid cache token, put value to > pending > > writes which will be flushed out latest when the bundle ends > > - clear: same as put but clear cache > > > > It does look like this is not too far off from what you were > describing. > > The main difference is that we just work with a single cache token. > In > > my opinion we do not need the second cache token for writes, as long > as > > we ensure that we generate a new cache token if the > > bundle/checkpoint fails. > > > > I have a draft PR > > for the Runner: https://github.com/apache/beam/pull/9374 > > for the SDK: https://github.com/apache/beam/pull/9418 > > > > Note that the Runner PR needs to be updated to fully reflected the > above > > scheme. The SDK implementation is WIP. I want to make sure that we > > clarify the design before this gets finalized. > > > > Thanks again for all your comments. Much appreciated! > > > > Cheers, > > Max > > > > On 26.08.19 19:58, Lukasz Cwik wrote: > > > There were originally a couple of ideas around how caching could > > work: > > > 1) One cache token for the entire bundle that is supplied up > > front. The > > > SDK caches everything using the given token. All > > reads/clear/append for > > > all types of state happen under this token. Anytime a side input > > > changes, key processing partition range changes or a bundle fails > to > > > process, the runner chooses a new cache token effectively > > invalidating > > > everything in the past> > > > 2) One cache token per type of state that is supplied up front. > > > The SDK caches all requests for a given type using the given cache > > > token. The runner can selectively choose which type to keep and > > which to > > > invalidate. Bundle failure and key processing partition changes > > > invalidate all user state, side input change invalidates all side > > inputs. > > > > > > 3) One cache token per state id that is supplied up front. > > > The SDK caches all requests for the given state id using the > > given cache > > > token. The runner can selectively choose which to invalidate and > > which > > > to keep. Bundle failure and key processing partition changes > > invalidate > > > all user state, side input changes only invalidate the side input > > that > > > changed. > > > > > > 4) A cache token on each read/clear/append that is supplied on the > > > response of the call with an initial valid set that is supplied at > > > start. The runner can selectively choose which to keep on start. > > Bundle > > > failure allows runners to "roll back" to a known good state by > > selecting > > > the previous valid cache token as part of the initial set. Key > > > processing partition changes allow runners to keep cached state > that > > > hasn't changed since it can be tied to a version number of the > state > > > itself as part of the initial set. Side input changes only > invalidate > > > the side input that changed. > > > > > > Based upon your discussion you seem to want option #1 which > > doesn't work > > > well with side inputs clearing cached state. If we want to have > user > > > state survive a changing side input, we would want one of the > other > > > options. I do agree that supplying the cache token upfront is > > > significantly simpler. Currently the protos are setup for #4 > since it > > > was the most flexible and at the time the pros outweighed the > cons. > > > > > > I don't understand why you think you need to wait for a response > > for the > > > append/clear to get its cache token since the only reason you > > need the > > > cache token is that you want to use that cached data when > > processing a > > > different bundle. I was thinking that the flow on the SDK side > > would be > > > something like (assuming there is a global cache of cache token > > -> (map > > > of state key -> data)) > > > 1) Create a local cache of (map of state key -> data) using the > > initial > > > set of valid cache tokens > > > 2) Make all mutations in place on local cache without waiting for > > response. > > > 3) When response comes back, update global cache with new cache > > token -> > > > (map of state key -> data)) (this is when the data becomes > visible to > > > other bundles that start processing) > > > 4) Before the bundle finishes processing, wait for all > > outstanding state > > > calls to finish. > > > > > > To implement caching on the runner side, you would keep track of > > at most > > > 2 cache tokens per state key, one cache token represents the > initial > > > value when the bundle started while the second represents the > > modified > > > state. If the bundle succeeds the runner passes in the set of > tokens > > > which represent the new state, if the bundle fails you process > > using the > > > original ones. > > > > > > After thinking through the implementation again, we could supply > two > > > cache tokens for each state id, the first being the set of initial > > > tokens if no writes happen while the second represents the token > > to use > > > if the SDK changes the state. This gives us the simplification > > where we > > > don't need to wait for the response before we update the global > cache > > > making a typical blocking cache much easier to do. We also get the > > > benefit that runners can supply either the same cache token for a > > state > > > id or different ones. If the runner supplies the same one then its > > > telling the SDK to make modifications in place without any > rollback > > > (which is good on memory since we are reducing copies of stuff) > or if > > > the runner supplies two different ones then its telling the SDK > > to keep > > > the old data around. If we went through with this new option the > SDK > > > side logic would be (assuming there is a global cache of cache > > token -> > > > (map of state key -> data)): > > > > > > 1) Create an empty local set of state ids that are dirty when > > starting a > > > new bundle (dirty set) > > > > > > For reads/gets: > > > 2A) If the request is a read (get), use dirty set to choose which > > cache > > > token to lookup and use in the global cache. If the global cache > is > > > missing data issue the appropriate request providing the result. > > > > > > For writes/appends/clear: > > > 2B) if the cache tokens are different for the state id, add the > > state id > > > to the dirty set if it isn't there and perform the appropriate > > > modification to convert the old cached state data to the new > > state data > > > 3B) modify the global caches data > > > 4B) issue the request to the runner > > > 5B*) add this request to the set of requests to block on before > > > completing the bundle. > > > > > > (* Note, there was another idea to update the process bundle > > response to > > > contain the id of the last state request which would allow the > > runner to > > > know when it has seen the last state request allowing the SDK to > not > > > block at all when finishing the bundle) > > > > > > On Thu, Aug 22, 2019 at 10:11 AM Maximilian Michels > > <[email protected] <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > > > > > Just to give a quick update here. Rakesh, Thomas, and I had a > > discussion > > > about async writes from the Python SDK to the Runner. Robert > > was also > > > present for some parts of the discussion. > > > > > > We concluded that blocking writes with the need to refresh > > the cache > > > token each time are not going to provide enough > > throughput/latency. > > > > > > We figured that it will be enough to use a single cache token > per > > > Runner<=>SDK Harness connection. This cache token will be > > provided by > > > the Runner in the ProcessBundleRequest. Writes will not yield > > a new > > > cache token. The advantage is that we can use one cache token > > for the > > > life time of the bundle and also across bundles, unless the > > Runner > > > switches to a new Runner<=>SDK Harness connection; then the > > Runner would > > > have to generate a new cache token. > > > > > > We might require additional cache tokens for the side inputs. > > For now, > > > I'm planning to only tackle user state which seems to be the > > area where > > > users have expressed the most need for caching. > > > > > > -Max > > > > > > On 21.08.19 20:05, Maximilian Michels wrote: > > > >> There is probably a misunderstanding here: I'm suggesting > > to use > > > a worker ID instead of cache tokens, not additionally. > > > > > > > > Ah! Misread that. We need a changing token to indicate that > the > > > cache is > > > > stale, e.g. checkpoint has failed / restoring from an old > > > checkpoint. If > > > > the _Runner_ generates a new unique token/id for workers > > which outlast > > > > the Runner, then this should work fine. I don't think it is > > safe > > > for the > > > > worker to supply the id. The Runner should be in control of > > cache > > > tokens > > > > to avoid invalid tokens. > > > > > > > >> In the PR the token is modified as part of updating the > state. > > > Doesn't the SDK need the new token to update it's cache entry > > also? > > > That's where it would help the SDK to know the new token > upfront. > > > > > > > > If the state is updated in the Runner, a new token has to be > > > generated. > > > > The old one is not valid anymore. The SDK will use the > updated > > > token to > > > > store the new value in the cache. I understand that it > would be > > > nice to > > > > know the token upfront. That could be possible with some > token > > > > generation scheme. On the other hand, writes can be > > asynchronous and > > > > thus not block the UDF. > > > > > > > >> But I believe there is no need to change the token in first > > > place, unless bundles for the same key (ranges) can be > > processed by > > > different workers. > > > > > > > > That's certainly possible, e.g. two workers A and B take > turn > > > processing > > > > a certain key range, one bundle after another: > > > > > > > > You process a bundle with a token T with A, then worker B > > takes over. > > > > Both have an entry with cache token T. So B goes on to > > modify the > > > state > > > > and uses the same cache token T. Then A takes over again. A > > would > > > have a > > > > stale cache entry but T would still be a valid cache token. > > > > > > > >> Indeed the fact that Dataflow can dynamically split and > merge > > > these ranges is what makes it trickier. If Flink does not > > > repartition the ranges, then things are much easier. > > > > > > > > Flink does not dynamically repartition key ranges (yet). If > > it started > > > > to support that, we would invalidate the cache tokens for > > the changed > > > > partitions. > > > > > > > > > > > > I'd suggest the following cache token generation scheme: > > > > > > > > One cache token per key range for user state and one cache > > token for > > > > each side input. On writes to user state or changing side > > input, the > > > > associated cache token will be renewed. > > > > > > > > On the SDK side, it should be sufficient to let the SDK > > > re-associate all > > > > its cached data belonging to a valid cache token with a new > > cache > > > token > > > > returned by a successful write. This has to happen in the > > active scope > > > > (i.e. user state, or a particular side input). > > > > > > > > If the key range changes, new cache tokens have to > > generated. This > > > > should happen automatically because the Runner does not > > checkpoint > > > cache > > > > tokens and will generate new ones when it restarts from an > > earlier > > > > checkpoint. > > > > > > > > The current PR needs to be changed to (1) only keep a > > single cache > > > token > > > > per user state and key range (2) add support for cache > > tokens for each > > > > side input. > > > > > > > > Hope that makes sense. > > > > > > > > -Max > > > > > > > > On 21.08.19 17:27, Reuven Lax wrote: > > > >> > > > >> > > > >> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels > > > <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>> > > > >> <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> wrote: > > > >> > > > >> Appreciate all your comments! Replying below. > > > >> > > > >> > > > >> @Luke: > > > >> > > > >> > Having cache tokens per key would be very expensive > > indeed > > > and I > > > >> believe we should go with a single cache token "per" > > bundle. > > > >> > > > >> Thanks for your comments on the PR. I was thinking to > > propose > > > something > > > >> along this lines of having cache tokens valid for a > > particular > > > >> checkpointing "epoch". That would require even less > token > > > renewal than > > > >> the per-bundle approach. > > > >> > > > >> > > > >> @Thomas, thanks for the input. Some remarks: > > > >> > > > >> > Wouldn't it be simpler to have the runner just track > a > > > unique ID > > > >> for each worker and use that to communicate if the > > cache is > > > valid or > > > >> not? > > > >> > > > >> We do not need a unique id per worker. If a cache > token is > > > valid for a > > > >> particular worker, it is also valid for another > > worker. That > > > is with the > > > >> assumption that key ranges are always disjoint between > the > > > workers. > > > >> > > > >> > * When the bundle is started, the runner tells the > > worker > > > if the > > > >> cache has become invalid (since it knows if another > > worker has > > > >> mutated state) > > > >> > > > >> This is simply done by not transferring the particular > > cache > > > token. No > > > >> need to declare it invalid explicitly. > > > >> > > > >> > * When the worker sends mutation requests to the > > runner, it > > > >> includes its own ID (or the runner already has it as > > contextual > > > >> information). No need to wait for a response. > > > >> > > > >> Mutations of cached values can be freely done as long > > as the > > > cache token > > > >> associated with the state is valid for a particular > > bundle. > > > Only the > > > >> first time, the Runner needs to wait on the response > > to store > > > the cache > > > >> token. This can also be done asynchronously. > > > >> > > > >> > * When the bundle is finished, the runner records > > the last > > > writer > > > >> (only if a change occurred) > > > >> > > > >> I believe this is not necessary because there will > only be > > > one writer at > > > >> a time for a particular bundle and key range, hence > > only one > > > writer > > > >> holds a valid cache token for a particular state and > > key range. > > > >> > > > >> > > > >> @Reuven: > > > >> > > > >> > Dataflow divides the keyspace up into lexicographic > > > ranges, and > > > >> creates a cache token per range. > > > >> > > > >> State is always processed partitioned by the Flink > workers > > > (hash-based, > > > >> not lexicopgrahical). I don't think that matters though > > > because the key > > > >> ranges do not overlap between the workers. Flink does > > not support > > > >> dynamically repartitioning the key ranges. Even in > case of > > > fine-grained > > > >> recovery of workers and their key ranges, we would > simply > > > generate new > > > >> cache tokens for a particular worker. > > > >> > > > >> > > > >> Dataflow's ranges are also hash based. When I said > > lexicographical, I > > > >> meant lexicographical based on the hexadecimal hash value. > > > >> > > > >> Indeed the fact that Dataflow can dynamically split and > > merge these > > > >> ranges is what makes it trickier. If Flink does not > > repartition the > > > >> ranges, then things are much easier. > > > >> > > > >> > > > >> > > > >> Thanks, > > > >> Max > > > >> > > > >> On 21.08.19 09:33, Reuven Lax wrote: > > > >> > Dataflow does something like this, however since > work is > > > >> > load balanced across workers a per-worker id doesn't > > work > > > very well. > > > >> > Dataflow divides the keyspace up into lexicographic > > ranges, and > > > >> creates > > > >> > a cache token per range. > > > >> > > > > >> > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise > > > <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>> > > > >> <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>> > > > >> > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>> > > > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>>> wrote: > > > >> > > > > >> > Commenting here vs. on the PR since related to > > the overall > > > >> approach. > > > >> > > > > >> > Wouldn't it be simpler to have the runner just > > track a > > > unique > > > >> ID for > > > >> > each worker and use that to communicate if the > > cache is > > > valid > > > >> or not? > > > >> > > > > >> > * When the bundle is started, the runner tells > the > > > worker if the > > > >> > cache has become invalid (since it knows if > another > > > worker has > > > >> > mutated state) > > > >> > * When the worker sends mutation requests to the > > runner, it > > > >> includes > > > >> > its own ID (or the runner already has it as > > contextual > > > >> information). > > > >> > No need to wait for a response. > > > >> > * When the bundle is finished, the runner > > records the > > > last writer > > > >> > (only if a change occurred) > > > >> > > > > >> > Whenever current worker ID and last writer ID > > doesn't > > > match, cache > > > >> > is invalid. > > > >> > > > > >> > Thomas > > > >> > > > > >> > > > > >> > On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik > > > <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>> > > > >> <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>> > > > >> > <mailto:[email protected] > > <mailto:[email protected]> <mailto:[email protected] > > <mailto:[email protected]>> > > > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>>> wrote: > > > >> > > > > >> > Having cache tokens per key would be very > > expensive > > > indeed > > > >> and I > > > >> > believe we should go with a single cache > token > > > "per" bundle. > > > >> > > > > >> > On Mon, Aug 19, 2019 at 11:36 AM Maximilian > > Michels > > > >> > <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>> > > > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>> > > > >> <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>> > > > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>>> wrote: > > > >> > > > > >> > Maybe a Beam Python expert can chime in > for > > > Rakesh's > > > >> question? > > > >> > > > > >> > Luke, I was assuming cache tokens to be > > per key > > > and state > > > >> > id. During > > > >> > implementing an initial support on the > > Runner > > > side, I > > > >> > realized that we > > > >> > probably want cache tokens to only be > > per state > > > id. Note > > > >> > that if we had > > > >> > per-key cache tokens, the number of cache > > > tokens would > > > >> > approach the > > > >> > total number of keys in an application. > > > >> > > > > >> > If anyone wants to have a look, here is > > a first > > > version of > > > >> > the Runner > > > >> > side for cache tokens. Note that I only > > > implemented cache > > > >> > tokens for > > > >> > BagUserState for now, but it can be > easily > > > added for side > > > >> > inputs as well. > > > >> > > > > >> > https://github.com/apache/beam/pull/9374 > > > >> > > > > >> > -Max > > > >> > > > > >> > > > > >> > > > > > >
