SideInputState -> SideInput (side_input_state -> side_input) + more comments around the messages and the fields.
On Tue, Aug 27, 2019 at 10:18 AM Maximilian Michels <[email protected]> wrote: > We would have to differentiate cache tokens for user state and side > inputs. How about something like this? > > message ProcessBundleRequest { > // (Required) A reference to the process bundle descriptor that must be // > instantiated and executed by the SDK harness. string > process_bundle_descriptor_reference = 1; > > message CacheToken { > > message UserState { > } > > message SideInputState { > string side_input_id = 1; > } > > oneof type { > UserState user_state = 1; > SideInputState side_input_state = 2; > } > > bytes token = 10; > } > > // (Optional) A list of cache tokens that can be used by an SDK to reuse > // cached data returned by the State API across multiple bundles. repeated > CacheToken cache_tokens = 2; > } > > -Max > > On 27.08.19 18:43, Lukasz Cwik wrote: > > The bundles view of side inputs should never change during processing and > should have a point in time snapshot. > > I was just trying to say that the cache token for side inputs being > deferred till side input request time simplified the runners implementation > since that is conclusively when the runner would need to take a look at the > side input. Putting them as part of the ProcesBundleRequest complicates > that but does make the SDK implementation significantly simpler which is a > win. > > On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels <[email protected]> wrote: > >> Thanks for the quick response. >> >> Just to clarify, the issue with versioning side input is also present >> when supplying the cache tokens on a request basis instead of per >> bundle. The SDK never knows when the Runner receives a new version of >> the side input. Like you pointed out, it needs to mark side inputs as >> stale and generate new cache tokens for the stale side inputs. >> >> The difference between per-request tokens and per-bundle tokens would be >> that the side input can only change after a bundle completes vs. during >> the bundle. Side inputs are always fuzzy in that regard because there is >> no precise instance where side inputs are atomically updated, other than >> the assumption that they eventually will be updated. In that regard >> per-bundle tokens for side input seem to be fine. >> >> All of the above is not an issue for user state, as its cache can remain >> valid for the lifetime of a Runner<=>SDK Harness connection. A simple >> solution would be to not cache side input because there are many cases >> where the caching just adds additional overhead. However, I can also >> imagine cases where side input is valid forever and caching would be >> very beneficial. >> >> For the first version I want to focus on user state because that's where >> I see the most benefit for caching. I don't see a problem though for the >> Runner to detect new side input and reflect that in the cache tokens >> supplied for a new bundle. >> >> -Max >> >> On 26.08.19 22:27, Lukasz Cwik wrote: >> > Your summary below makes sense to me. I can see that recovery from >> > rolling back doesn't need to be a priority and simplifies the solution >> > for user state caching down to one token. >> > >> > Providing cache tokens upfront does require the Runner to know what >> > "version" of everything it may supply to the SDK upfront (instead of on >> > request) which would mean that the Runner may need to have a mapping >> > from cache token to internal version identifier for things like side >> > inputs which are typically broadcast. The Runner would also need to >> poll >> > to see if the side input has changed in the background to not block >> > processing bundles with "stale" side input data. >> > >> > Ping me once you have the Runner PR updated and I'll take a look again. >> > >> > On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Thank you for the summary Luke. I really appreciate the effort you >> put >> > into this! >> > >> > > Based upon your discussion you seem to want option #1 >> > >> > I'm actually for option #2. The option to cache/invalidate side >> inputs >> > is important, and we should incorporate this in the design. That's >> why >> > option #1 is not flexible enough. However, a first implementation >> could >> > defer caching of side inputs. >> > >> > Option #3 was my initial thinking and the first version of the PR, >> but I >> > think we agreed that there wouldn't be much gain from keeping a >> cache >> > token per state id. >> > >> > Option #4 is what is specifically documented in the reference doc >> and >> > already part of the Proto, where valid tokens are provided for each >> new >> > bundle and also as part of the response of a get/put/clear. We >> mentioned >> > that the reply does not have to be waited on synchronously (I >> mentioned >> > it even), but it complicates the implementation. The idea Thomas >> and I >> > expressed was that a response is not even necessary if we assume >> > validity of the upfront provided cache tokens for the lifetime of a >> > bundle and that cache tokens will be invalidated as soon as the >> Runner >> > fails in any way. This is naturally the case for Flink because it >> will >> > simply "forget" its current cache tokens. >> > >> > I currently envision the following schema: >> > >> > Runner >> > ====== >> > >> > - Runner generates a globally unique cache token, one for user >> state and >> > one for each side input >> > >> > - The token is supplied to the SDK Harness for each bundle request >> > >> > - For the lifetime of a Runner<=>SDK Harness connection this cache >> token >> > will not change >> > - Runner will generate a new token if the connection/key space >> changes >> > between Runner and SDK Harness >> > >> > >> > SDK >> > === >> > >> > - For each bundle the SDK worker stores the list of valid cache >> tokens >> > - The SDK Harness keep a global cache across all its (local) workers >> > which is a LRU cache: state_key => (cache_token, value) >> > - get: Lookup cache using the valid cache token for the state. If no >> > match, then fetch from Runner and use the already available token >> for >> > caching >> > - put: Put value in cache with a valid cache token, put value to >> pending >> > writes which will be flushed out latest when the bundle ends >> > - clear: same as put but clear cache >> > >> > It does look like this is not too far off from what you were >> describing. >> > The main difference is that we just work with a single cache token. >> In >> > my opinion we do not need the second cache token for writes, as >> long as >> > we ensure that we generate a new cache token if the >> > bundle/checkpoint fails. >> > >> > I have a draft PR >> > for the Runner: https://github.com/apache/beam/pull/9374 >> > for the SDK: https://github.com/apache/beam/pull/9418 >> > >> > Note that the Runner PR needs to be updated to fully reflected the >> above >> > scheme. The SDK implementation is WIP. I want to make sure that we >> > clarify the design before this gets finalized. >> > >> > Thanks again for all your comments. Much appreciated! >> > >> > Cheers, >> > Max >> > >> > On 26.08.19 19:58, Lukasz Cwik wrote: >> > > There were originally a couple of ideas around how caching could >> > work: >> > > 1) One cache token for the entire bundle that is supplied up >> > front. The >> > > SDK caches everything using the given token. All >> > reads/clear/append for >> > > all types of state happen under this token. Anytime a side input >> > > changes, key processing partition range changes or a bundle >> fails to >> > > process, the runner chooses a new cache token effectively >> > invalidating >> > > everything in the past> >> > > 2) One cache token per type of state that is supplied up front. >> > > The SDK caches all requests for a given type using the given >> cache >> > > token. The runner can selectively choose which type to keep and >> > which to >> > > invalidate. Bundle failure and key processing partition changes >> > > invalidate all user state, side input change invalidates all side >> > inputs. >> > > >> > > 3) One cache token per state id that is supplied up front. >> > > The SDK caches all requests for the given state id using the >> > given cache >> > > token. The runner can selectively choose which to invalidate and >> > which >> > > to keep. Bundle failure and key processing partition changes >> > invalidate >> > > all user state, side input changes only invalidate the side input >> > that >> > > changed. >> > > >> > > 4) A cache token on each read/clear/append that is supplied on >> the >> > > response of the call with an initial valid set that is supplied >> at >> > > start. The runner can selectively choose which to keep on start. >> > Bundle >> > > failure allows runners to "roll back" to a known good state by >> > selecting >> > > the previous valid cache token as part of the initial set. Key >> > > processing partition changes allow runners to keep cached state >> that >> > > hasn't changed since it can be tied to a version number of the >> state >> > > itself as part of the initial set. Side input changes only >> invalidate >> > > the side input that changed. >> > > >> > > Based upon your discussion you seem to want option #1 which >> > doesn't work >> > > well with side inputs clearing cached state. If we want to have >> user >> > > state survive a changing side input, we would want one of the >> other >> > > options. I do agree that supplying the cache token upfront is >> > > significantly simpler. Currently the protos are setup for #4 >> since it >> > > was the most flexible and at the time the pros outweighed the >> cons. >> > > >> > > I don't understand why you think you need to wait for a response >> > for the >> > > append/clear to get its cache token since the only reason you >> > need the >> > > cache token is that you want to use that cached data when >> > processing a >> > > different bundle. I was thinking that the flow on the SDK side >> > would be >> > > something like (assuming there is a global cache of cache token >> > -> (map >> > > of state key -> data)) >> > > 1) Create a local cache of (map of state key -> data) using the >> > initial >> > > set of valid cache tokens >> > > 2) Make all mutations in place on local cache without waiting for >> > response. >> > > 3) When response comes back, update global cache with new cache >> > token -> >> > > (map of state key -> data)) (this is when the data becomes >> visible to >> > > other bundles that start processing) >> > > 4) Before the bundle finishes processing, wait for all >> > outstanding state >> > > calls to finish. >> > > >> > > To implement caching on the runner side, you would keep track of >> > at most >> > > 2 cache tokens per state key, one cache token represents the >> initial >> > > value when the bundle started while the second represents the >> > modified >> > > state. If the bundle succeeds the runner passes in the set of >> tokens >> > > which represent the new state, if the bundle fails you process >> > using the >> > > original ones. >> > > >> > > After thinking through the implementation again, we could supply >> two >> > > cache tokens for each state id, the first being the set of >> initial >> > > tokens if no writes happen while the second represents the token >> > to use >> > > if the SDK changes the state. This gives us the simplification >> > where we >> > > don't need to wait for the response before we update the global >> cache >> > > making a typical blocking cache much easier to do. We also get >> the >> > > benefit that runners can supply either the same cache token for a >> > state >> > > id or different ones. If the runner supplies the same one then >> its >> > > telling the SDK to make modifications in place without any >> rollback >> > > (which is good on memory since we are reducing copies of stuff) >> or if >> > > the runner supplies two different ones then its telling the SDK >> > to keep >> > > the old data around. If we went through with this new option the >> SDK >> > > side logic would be (assuming there is a global cache of cache >> > token -> >> > > (map of state key -> data)): >> > > >> > > 1) Create an empty local set of state ids that are dirty when >> > starting a >> > > new bundle (dirty set) >> > > >> > > For reads/gets: >> > > 2A) If the request is a read (get), use dirty set to choose which >> > cache >> > > token to lookup and use in the global cache. If the global cache >> is >> > > missing data issue the appropriate request providing the result. >> > > >> > > For writes/appends/clear: >> > > 2B) if the cache tokens are different for the state id, add the >> > state id >> > > to the dirty set if it isn't there and perform the appropriate >> > > modification to convert the old cached state data to the new >> > state data >> > > 3B) modify the global caches data >> > > 4B) issue the request to the runner >> > > 5B*) add this request to the set of requests to block on before >> > > completing the bundle. >> > > >> > > (* Note, there was another idea to update the process bundle >> > response to >> > > contain the id of the last state request which would allow the >> > runner to >> > > know when it has seen the last state request allowing the SDK to >> not >> > > block at all when finishing the bundle) >> > > >> > > On Thu, Aug 22, 2019 at 10:11 AM Maximilian Michels >> > <[email protected] <mailto:[email protected]> >> > > <mailto:[email protected] <mailto:[email protected]>>> wrote: >> > > >> > > Just to give a quick update here. Rakesh, Thomas, and I had a >> > discussion >> > > about async writes from the Python SDK to the Runner. Robert >> > was also >> > > present for some parts of the discussion. >> > > >> > > We concluded that blocking writes with the need to refresh >> > the cache >> > > token each time are not going to provide enough >> > throughput/latency. >> > > >> > > We figured that it will be enough to use a single cache >> token per >> > > Runner<=>SDK Harness connection. This cache token will be >> > provided by >> > > the Runner in the ProcessBundleRequest. Writes will not yield >> > a new >> > > cache token. The advantage is that we can use one cache token >> > for the >> > > life time of the bundle and also across bundles, unless the >> > Runner >> > > switches to a new Runner<=>SDK Harness connection; then the >> > Runner would >> > > have to generate a new cache token. >> > > >> > > We might require additional cache tokens for the side inputs. >> > For now, >> > > I'm planning to only tackle user state which seems to be the >> > area where >> > > users have expressed the most need for caching. >> > > >> > > -Max >> > > >> > > On 21.08.19 20:05, Maximilian Michels wrote: >> > > >> There is probably a misunderstanding here: I'm suggesting >> > to use >> > > a worker ID instead of cache tokens, not additionally. >> > > > >> > > > Ah! Misread that. We need a changing token to indicate >> that the >> > > cache is >> > > > stale, e.g. checkpoint has failed / restoring from an old >> > > checkpoint. If >> > > > the _Runner_ generates a new unique token/id for workers >> > which outlast >> > > > the Runner, then this should work fine. I don't think it is >> > safe >> > > for the >> > > > worker to supply the id. The Runner should be in control of >> > cache >> > > tokens >> > > > to avoid invalid tokens. >> > > > >> > > >> In the PR the token is modified as part of updating the >> state. >> > > Doesn't the SDK need the new token to update it's cache entry >> > also? >> > > That's where it would help the SDK to know the new token >> upfront. >> > > > >> > > > If the state is updated in the Runner, a new token has to >> be >> > > generated. >> > > > The old one is not valid anymore. The SDK will use the >> updated >> > > token to >> > > > store the new value in the cache. I understand that it >> would be >> > > nice to >> > > > know the token upfront. That could be possible with some >> token >> > > > generation scheme. On the other hand, writes can be >> > asynchronous and >> > > > thus not block the UDF. >> > > > >> > > >> But I believe there is no need to change the token in >> first >> > > place, unless bundles for the same key (ranges) can be >> > processed by >> > > different workers. >> > > > >> > > > That's certainly possible, e.g. two workers A and B take >> turn >> > > processing >> > > > a certain key range, one bundle after another: >> > > > >> > > > You process a bundle with a token T with A, then worker B >> > takes over. >> > > > Both have an entry with cache token T. So B goes on to >> > modify the >> > > state >> > > > and uses the same cache token T. Then A takes over again. A >> > would >> > > have a >> > > > stale cache entry but T would still be a valid cache token. >> > > > >> > > >> Indeed the fact that Dataflow can dynamically split and >> merge >> > > these ranges is what makes it trickier. If Flink does not >> > > repartition the ranges, then things are much easier. >> > > > >> > > > Flink does not dynamically repartition key ranges (yet). If >> > it started >> > > > to support that, we would invalidate the cache tokens for >> > the changed >> > > > partitions. >> > > > >> > > > >> > > > I'd suggest the following cache token generation scheme: >> > > > >> > > > One cache token per key range for user state and one cache >> > token for >> > > > each side input. On writes to user state or changing side >> > input, the >> > > > associated cache token will be renewed. >> > > > >> > > > On the SDK side, it should be sufficient to let the SDK >> > > re-associate all >> > > > its cached data belonging to a valid cache token with a new >> > cache >> > > token >> > > > returned by a successful write. This has to happen in the >> > active scope >> > > > (i.e. user state, or a particular side input). >> > > > >> > > > If the key range changes, new cache tokens have to >> > generated. This >> > > > should happen automatically because the Runner does not >> > checkpoint >> > > cache >> > > > tokens and will generate new ones when it restarts from an >> > earlier >> > > > checkpoint. >> > > > >> > > > The current PR needs to be changed to (1) only keep a >> > single cache >> > > token >> > > > per user state and key range (2) add support for cache >> > tokens for each >> > > > side input. >> > > > >> > > > Hope that makes sense. >> > > > >> > > > -Max >> > > > >> > > > On 21.08.19 17:27, Reuven Lax wrote: >> > > >> >> > > >> >> > > >> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels >> > > <[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>> >> > > >> <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >> > > >> >> > > >> Appreciate all your comments! Replying below. >> > > >> >> > > >> >> > > >> @Luke: >> > > >> >> > > >> > Having cache tokens per key would be very expensive >> > indeed >> > > and I >> > > >> believe we should go with a single cache token "per" >> > bundle. >> > > >> >> > > >> Thanks for your comments on the PR. I was thinking to >> > propose >> > > something >> > > >> along this lines of having cache tokens valid for a >> > particular >> > > >> checkpointing "epoch". That would require even less >> token >> > > renewal than >> > > >> the per-bundle approach. >> > > >> >> > > >> >> > > >> @Thomas, thanks for the input. Some remarks: >> > > >> >> > > >> > Wouldn't it be simpler to have the runner just >> track a >> > > unique ID >> > > >> for each worker and use that to communicate if the >> > cache is >> > > valid or >> > > >> not? >> > > >> >> > > >> We do not need a unique id per worker. If a cache >> token is >> > > valid for a >> > > >> particular worker, it is also valid for another >> > worker. That >> > > is with the >> > > >> assumption that key ranges are always disjoint >> between the >> > > workers. >> > > >> >> > > >> > * When the bundle is started, the runner tells the >> > worker >> > > if the >> > > >> cache has become invalid (since it knows if another >> > worker has >> > > >> mutated state) >> > > >> >> > > >> This is simply done by not transferring the particular >> > cache >> > > token. No >> > > >> need to declare it invalid explicitly. >> > > >> >> > > >> > * When the worker sends mutation requests to the >> > runner, it >> > > >> includes its own ID (or the runner already has it as >> > contextual >> > > >> information). No need to wait for a response. >> > > >> >> > > >> Mutations of cached values can be freely done as long >> > as the >> > > cache token >> > > >> associated with the state is valid for a particular >> > bundle. >> > > Only the >> > > >> first time, the Runner needs to wait on the response >> > to store >> > > the cache >> > > >> token. This can also be done asynchronously. >> > > >> >> > > >> > * When the bundle is finished, the runner records >> > the last >> > > writer >> > > >> (only if a change occurred) >> > > >> >> > > >> I believe this is not necessary because there will >> only be >> > > one writer at >> > > >> a time for a particular bundle and key range, hence >> > only one >> > > writer >> > > >> holds a valid cache token for a particular state and >> > key range. >> > > >> >> > > >> >> > > >> @Reuven: >> > > >> >> > > >> > Dataflow divides the keyspace up into lexicographic >> > > ranges, and >> > > >> creates a cache token per range. >> > > >> >> > > >> State is always processed partitioned by the Flink >> workers >> > > (hash-based, >> > > >> not lexicopgrahical). I don't think that matters >> though >> > > because the key >> > > >> ranges do not overlap between the workers. Flink does >> > not support >> > > >> dynamically repartitioning the key ranges. Even in >> case of >> > > fine-grained >> > > >> recovery of workers and their key ranges, we would >> simply >> > > generate new >> > > >> cache tokens for a particular worker. >> > > >> >> > > >> >> > > >> Dataflow's ranges are also hash based. When I said >> > lexicographical, I >> > > >> meant lexicographical based on the hexadecimal hash value. >> > > >> >> > > >> Indeed the fact that Dataflow can dynamically split and >> > merge these >> > > >> ranges is what makes it trickier. If Flink does not >> > repartition the >> > > >> ranges, then things are much easier. >> > > >> >> > > >> >> > > >> >> > > >> Thanks, >> > > >> Max >> > > >> >> > > >> On 21.08.19 09:33, Reuven Lax wrote: >> > > >> > Dataflow does something like this, however since >> work is >> > > >> > load balanced across workers a per-worker id doesn't >> > work >> > > very well. >> > > >> > Dataflow divides the keyspace up into lexicographic >> > ranges, and >> > > >> creates >> > > >> > a cache token per range. >> > > >> > >> > > >> > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise >> > > <[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>> >> > > >> <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>> >> > > >> > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>> >> > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>>> wrote: >> > > >> > >> > > >> > Commenting here vs. on the PR since related to >> > the overall >> > > >> approach. >> > > >> > >> > > >> > Wouldn't it be simpler to have the runner just >> > track a >> > > unique >> > > >> ID for >> > > >> > each worker and use that to communicate if the >> > cache is >> > > valid >> > > >> or not? >> > > >> > >> > > >> > * When the bundle is started, the runner tells >> the >> > > worker if the >> > > >> > cache has become invalid (since it knows if >> another >> > > worker has >> > > >> > mutated state) >> > > >> > * When the worker sends mutation requests to the >> > runner, it >> > > >> includes >> > > >> > its own ID (or the runner already has it as >> > contextual >> > > >> information). >> > > >> > No need to wait for a response. >> > > >> > * When the bundle is finished, the runner >> > records the >> > > last writer >> > > >> > (only if a change occurred) >> > > >> > >> > > >> > Whenever current worker ID and last writer ID >> > doesn't >> > > match, cache >> > > >> > is invalid. >> > > >> > >> > > >> > Thomas >> > > >> > >> > > >> > >> > > >> > On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik >> > > <[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>> >> > > >> <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>> >> > > >> > <mailto:[email protected] >> > <mailto:[email protected]> <mailto:[email protected] >> > <mailto:[email protected]>> >> > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>>> wrote: >> > > >> > >> > > >> > Having cache tokens per key would be very >> > expensive >> > > indeed >> > > >> and I >> > > >> > believe we should go with a single cache >> token >> > > "per" bundle. >> > > >> > >> > > >> > On Mon, Aug 19, 2019 at 11:36 AM Maximilian >> > Michels >> > > >> > <[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>> >> > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>> >> > > >> <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>> >> > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>>> wrote: >> > > >> > >> > > >> > Maybe a Beam Python expert can chime in >> for >> > > Rakesh's >> > > >> question? >> > > >> > >> > > >> > Luke, I was assuming cache tokens to be >> > per key >> > > and state >> > > >> > id. During >> > > >> > implementing an initial support on the >> > Runner >> > > side, I >> > > >> > realized that we >> > > >> > probably want cache tokens to only be >> > per state >> > > id. Note >> > > >> > that if we had >> > > >> > per-key cache tokens, the number of >> cache >> > > tokens would >> > > >> > approach the >> > > >> > total number of keys in an application. >> > > >> > >> > > >> > If anyone wants to have a look, here is >> > a first >> > > version of >> > > >> > the Runner >> > > >> > side for cache tokens. Note that I only >> > > implemented cache >> > > >> > tokens for >> > > >> > BagUserState for now, but it can be >> easily >> > > added for side >> > > >> > inputs as well. >> > > >> > >> > > >> > https://github.com/apache/beam/pull/9374 >> > > >> > >> > > >> > -Max >> > > >> > >> > > >> > >> > > >> >> > > >> > >> >
