The bundles view of side inputs should never change during
processing and should have a point in time snapshot.
I was just trying to say that the cache token for side inputs
being deferred till side input request time simplified the
runners implementation since that is conclusively when the runner
would need to take a look at the side input. Putting them as part
of the ProcesBundleRequest complicates that but does make the SDK
implementation significantly simpler which is a win.
On Tue, Aug 27, 2019 at 9:14 AM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>> wrote:
Thanks for the quick response.
Just to clarify, the issue with versioning side input is also
present
when supplying the cache tokens on a request basis instead of
per
bundle. The SDK never knows when the Runner receives a new
version of
the side input. Like you pointed out, it needs to mark side
inputs as
stale and generate new cache tokens for the stale side inputs.
The difference between per-request tokens and per-bundle
tokens would be
that the side input can only change after a bundle completes
vs. during
the bundle. Side inputs are always fuzzy in that regard
because there is
no precise instance where side inputs are atomically updated,
other than
the assumption that they eventually will be updated. In that
regard
per-bundle tokens for side input seem to be fine.
All of the above is not an issue for user state, as its cache
can remain
valid for the lifetime of a Runner<=>SDK Harness connection.
A simple
solution would be to not cache side input because there are
many cases
where the caching just adds additional overhead. However, I
can also
imagine cases where side input is valid forever and caching
would be
very beneficial.
For the first version I want to focus on user state because
that's where
I see the most benefit for caching. I don't see a problem
though for the
Runner to detect new side input and reflect that in the cache
tokens
supplied for a new bundle.
-Max
On 26.08.19 22:27, Lukasz Cwik wrote:
> Your summary below makes sense to me. I can see that
recovery from
> rolling back doesn't need to be a priority and simplifies
the solution
> for user state caching down to one token.
>
> Providing cache tokens upfront does require the Runner to
know what
> "version" of everything it may supply to the SDK upfront
(instead of on
> request) which would mean that the Runner may need to have
a mapping
> from cache token to internal version identifier for things
like side
> inputs which are typically broadcast. The Runner would also
need to poll
> to see if the side input has changed in the background to
not block
> processing bundles with "stale" side input data.
>
> Ping me once you have the Runner PR updated and I'll take a
look again.
>
> On Mon, Aug 26, 2019 at 12:20 PM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>
> Thank you for the summary Luke. I really appreciate the
effort you put
> into this!
>
> > Based upon your discussion you seem to want option #1
>
> I'm actually for option #2. The option to
cache/invalidate side inputs
> is important, and we should incorporate this in the
design. That's why
> option #1 is not flexible enough. However, a first
implementation could
> defer caching of side inputs.
>
> Option #3 was my initial thinking and the first version
of the PR, but I
> think we agreed that there wouldn't be much gain from
keeping a cache
> token per state id.
>
> Option #4 is what is specifically documented in the
reference doc and
> already part of the Proto, where valid tokens are
provided for each new
> bundle and also as part of the response of a
get/put/clear. We mentioned
> that the reply does not have to be waited on
synchronously (I mentioned
> it even), but it complicates the implementation. The
idea Thomas and I
> expressed was that a response is not even necessary if
we assume
> validity of the upfront provided cache tokens for the
lifetime of a
> bundle and that cache tokens will be invalidated as
soon as the Runner
> fails in any way. This is naturally the case for Flink
because it will
> simply "forget" its current cache tokens.
>
> I currently envision the following schema:
>
> Runner
> ======
>
> - Runner generates a globally unique cache token, one
for user state and
> one for each side input
>
> - The token is supplied to the SDK Harness for each
bundle request
>
> - For the lifetime of a Runner<=>SDK Harness connection
this cache token
> will not change
> - Runner will generate a new token if the
connection/key space changes
> between Runner and SDK Harness
>
>
> SDK
> ===
>
> - For each bundle the SDK worker stores the list of
valid cache tokens
> - The SDK Harness keep a global cache across all its
(local) workers
> which is a LRU cache: state_key => (cache_token, value)
> - get: Lookup cache using the valid cache token for the
state. If no
> match, then fetch from Runner and use the already
available token for
> caching
> - put: Put value in cache with a valid cache token, put
value to pending
> writes which will be flushed out latest when the bundle
ends
> - clear: same as put but clear cache
>
> It does look like this is not too far off from what you
were describing.
> The main difference is that we just work with a single
cache token. In
> my opinion we do not need the second cache token for
writes, as long as
> we ensure that we generate a new cache token if the
> bundle/checkpoint fails.
>
> I have a draft PR
> for the Runner: https://github.com/apache/beam/pull/9374
> for the SDK: https://github.com/apache/beam/pull/9418
>
> Note that the Runner PR needs to be updated to fully
reflected the above
> scheme. The SDK implementation is WIP. I want to make
sure that we
> clarify the design before this gets finalized.
>
> Thanks again for all your comments. Much appreciated!
>
> Cheers,
> Max
>
> On 26.08.19 19:58, Lukasz Cwik wrote:
> > There were originally a couple of ideas around how
caching could
> work:
> > 1) One cache token for the entire bundle that is
supplied up
> front. The
> > SDK caches everything using the given token. All
> reads/clear/append for
> > all types of state happen under this token. Anytime
a side input
> > changes, key processing partition range changes or a
bundle fails to
> > process, the runner chooses a new cache token
effectively
> invalidating
> > everything in the past>
> > 2) One cache token per type of state that is
supplied up front.
> > The SDK caches all requests for a given type using
the given cache
> > token. The runner can selectively choose which type
to keep and
> which to
> > invalidate. Bundle failure and key processing
partition changes
> > invalidate all user state, side input change
invalidates all side
> inputs.
> >
> > 3) One cache token per state id that is supplied up
front.
> > The SDK caches all requests for the given state id
using the
> given cache
> > token. The runner can selectively choose which to
invalidate and
> which
> > to keep. Bundle failure and key processing partition
changes
> invalidate
> > all user state, side input changes only invalidate
the side input
> that
> > changed.
> >
> > 4) A cache token on each read/clear/append that is
supplied on the
> > response of the call with an initial valid set that
is supplied at
> > start. The runner can selectively choose which to
keep on start.
> Bundle
> > failure allows runners to "roll back" to a known
good state by
> selecting
> > the previous valid cache token as part of the
initial set. Key
> > processing partition changes allow runners to keep
cached state that
> > hasn't changed since it can be tied to a version
number of the state
> > itself as part of the initial set. Side input
changes only invalidate
> > the side input that changed.
> >
> > Based upon your discussion you seem to want option
#1 which
> doesn't work
> > well with side inputs clearing cached state. If we
want to have user
> > state survive a changing side input, we would want
one of the other
> > options. I do agree that supplying the cache token
upfront is
> > significantly simpler. Currently the protos are
setup for #4 since it
> > was the most flexible and at the time the pros
outweighed the cons.
> >
> > I don't understand why you think you need to wait
for a response
> for the
> > append/clear to get its cache token since the only
reason you
> need the
> > cache token is that you want to use that cached data
when
> processing a
> > different bundle. I was thinking that the flow on
the SDK side
> would be
> > something like (assuming there is a global cache of
cache token
> -> (map
> > of state key -> data))
> > 1) Create a local cache of (map of state key ->
data) using the
> initial
> > set of valid cache tokens
> > 2) Make all mutations in place on local cache
without waiting for
> response.
> > 3) When response comes back, update global cache
with new cache
> token ->
> > (map of state key -> data)) (this is when the data
becomes visible to
> > other bundles that start processing)
> > 4) Before the bundle finishes processing, wait for all
> outstanding state
> > calls to finish.
> >
> > To implement caching on the runner side, you would
keep track of
> at most
> > 2 cache tokens per state key, one cache token
represents the initial
> > value when the bundle started while the second
represents the
> modified
> > state. If the bundle succeeds the runner passes in
the set of tokens
> > which represent the new state, if the bundle fails
you process
> using the
> > original ones.
> >
> > After thinking through the implementation again, we
could supply two
> > cache tokens for each state id, the first being the
set of initial
> > tokens if no writes happen while the second
represents the token
> to use
> > if the SDK changes the state. This gives us the
simplification
> where we
> > don't need to wait for the response before we update
the global cache
> > making a typical blocking cache much easier to do.
We also get the
> > benefit that runners can supply either the same
cache token for a
> state
> > id or different ones. If the runner supplies the
same one then its
> > telling the SDK to make modifications in place
without any rollback
> > (which is good on memory since we are reducing
copies of stuff) or if
> > the runner supplies two different ones then its
telling the SDK
> to keep
> > the old data around. If we went through with this
new option the SDK
> > side logic would be (assuming there is a global
cache of cache
> token ->
> > (map of state key -> data)):
> >
> > 1) Create an empty local set of state ids that are
dirty when
> starting a
> > new bundle (dirty set)
> >
> > For reads/gets:
> > 2A) If the request is a read (get), use dirty set to
choose which
> cache
> > token to lookup and use in the global cache. If the
global cache is
> > missing data issue the appropriate request providing
the result.
> >
> > For writes/appends/clear:
> > 2B) if the cache tokens are different for the state
id, add the
> state id
> > to the dirty set if it isn't there and perform the
appropriate
> > modification to convert the old cached state data to
the new
> state data
> > 3B) modify the global caches data
> > 4B) issue the request to the runner
> > 5B*) add this request to the set of requests to
block on before
> > completing the bundle.
> >
> > (* Note, there was another idea to update the
process bundle
> response to
> > contain the id of the last state request which would
allow the
> runner to
> > know when it has seen the last state request
allowing the SDK to not
> > block at all when finishing the bundle)
> >
> > On Thu, Aug 22, 2019 at 10:11 AM Maximilian Michels
> <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> >
> > Just to give a quick update here. Rakesh,
Thomas, and I had a
> discussion
> > about async writes from the Python SDK to the
Runner. Robert
> was also
> > present for some parts of the discussion.
> >
> > We concluded that blocking writes with the need
to refresh
> the cache
> > token each time are not going to provide enough
> throughput/latency.
> >
> > We figured that it will be enough to use a
single cache token per
> > Runner<=>SDK Harness connection. This cache
token will be
> provided by
> > the Runner in the ProcessBundleRequest. Writes
will not yield
> a new
> > cache token. The advantage is that we can use
one cache token
> for the
> > life time of the bundle and also across bundles,
unless the
> Runner
> > switches to a new Runner<=>SDK Harness
connection; then the
> Runner would
> > have to generate a new cache token.
> >
> > We might require additional cache tokens for the
side inputs.
> For now,
> > I'm planning to only tackle user state which
seems to be the
> area where
> > users have expressed the most need for caching.
> >
> > -Max
> >
> > On 21.08.19 20:05, Maximilian Michels wrote:
> > >> There is probably a misunderstanding here:
I'm suggesting
> to use
> > a worker ID instead of cache tokens, not
additionally.
> > >
> > > Ah! Misread that. We need a changing token to
indicate that the
> > cache is
> > > stale, e.g. checkpoint has failed / restoring
from an old
> > checkpoint. If
> > > the _Runner_ generates a new unique token/id
for workers
> which outlast
> > > the Runner, then this should work fine. I
don't think it is
> safe
> > for the
> > > worker to supply the id. The Runner should be
in control of
> cache
> > tokens
> > > to avoid invalid tokens.
> > >
> > >> In the PR the token is modified as part of
updating the state.
> > Doesn't the SDK need the new token to update
it's cache entry
> also?
> > That's where it would help the SDK to know the
new token upfront.
> > >
> > > If the state is updated in the Runner, a new
token has to be
> > generated.
> > > The old one is not valid anymore. The SDK will
use the updated
> > token to
> > > store the new value in the cache. I understand
that it would be
> > nice to
> > > know the token upfront. That could be possible
with some token
> > > generation scheme. On the other hand, writes
can be
> asynchronous and
> > > thus not block the UDF.
> > >
> > >> But I believe there is no need to change the
token in first
> > place, unless bundles for the same key (ranges)
can be
> processed by
> > different workers.
> > >
> > > That's certainly possible, e.g. two workers A
and B take turn
> > processing
> > > a certain key range, one bundle after another:
> > >
> > > You process a bundle with a token T with A,
then worker B
> takes over.
> > > Both have an entry with cache token T. So B
goes on to
> modify the
> > state
> > > and uses the same cache token T. Then A takes
over again. A
> would
> > have a
> > > stale cache entry but T would still be a valid
cache token.
> > >
> > >> Indeed the fact that Dataflow can dynamically
split and merge
> > these ranges is what makes it trickier. If Flink
does not
> > repartition the ranges, then things are much easier.
> > >
> > > Flink does not dynamically repartition key
ranges (yet). If
> it started
> > > to support that, we would invalidate the cache
tokens for
> the changed
> > > partitions.
> > >
> > >
> > > I'd suggest the following cache token
generation scheme:
> > >
> > > One cache token per key range for user state
and one cache
> token for
> > > each side input. On writes to user state or
changing side
> input, the
> > > associated cache token will be renewed.
> > >
> > > On the SDK side, it should be sufficient to
let the SDK
> > re-associate all
> > > its cached data belonging to a valid cache
token with a new
> cache
> > token
> > > returned by a successful write. This has to
happen in the
> active scope
> > > (i.e. user state, or a particular side input).
> > >
> > > If the key range changes, new cache tokens have to
> generated. This
> > > should happen automatically because the Runner
does not
> checkpoint
> > cache
> > > tokens and will generate new ones when it
restarts from an
> earlier
> > > checkpoint.
> > >
> > > The current PR needs to be changed to (1) only
keep a
> single cache
> > token
> > > per user state and key range (2) add support
for cache
> tokens for each
> > > side input.
> > >
> > > Hope that makes sense.
> > >
> > > -Max
> > >
> > > On 21.08.19 17:27, Reuven Lax wrote:
> > >>
> > >>
> > >> On Wed, Aug 21, 2019 at 2:16 AM Maximilian
Michels
> > <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > >> <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
> > >>
> > >> Appreciate all your comments! Replying below.
> > >>
> > >>
> > >> @Luke:
> > >>
> > >> > Having cache tokens per key would be
very expensive
> indeed
> > and I
> > >> believe we should go with a single cache
token "per"
> bundle.
> > >>
> > >> Thanks for your comments on the PR. I was
thinking to
> propose
> > something
> > >> along this lines of having cache tokens
valid for a
> particular
> > >> checkpointing "epoch". That would require
even less token
> > renewal than
> > >> the per-bundle approach.
> > >>
> > >>
> > >> @Thomas, thanks for the input. Some remarks:
> > >>
> > >> > Wouldn't it be simpler to have the
runner just track a
> > unique ID
> > >> for each worker and use that to
communicate if the
> cache is
> > valid or
> > >> not?
> > >>
> > >> We do not need a unique id per worker. If
a cache token is
> > valid for a
> > >> particular worker, it is also valid for
another
> worker. That
> > is with the
> > >> assumption that key ranges are always
disjoint between the
> > workers.
> > >>
> > >> > * When the bundle is started, the
runner tells the
> worker
> > if the
> > >> cache has become invalid (since it knows
if another
> worker has
> > >> mutated state)
> > >>
> > >> This is simply done by not transferring
the particular
> cache
> > token. No
> > >> need to declare it invalid explicitly.
> > >>
> > >> > * When the worker sends mutation
requests to the
> runner, it
> > >> includes its own ID (or the runner
already has it as
> contextual
> > >> information). No need to wait for a response.
> > >>
> > >> Mutations of cached values can be freely
done as long
> as the
> > cache token
> > >> associated with the state is valid for a
particular
> bundle.
> > Only the
> > >> first time, the Runner needs to wait on
the response
> to store
> > the cache
> > >> token. This can also be done asynchronously.
> > >>
> > >> > * When the bundle is finished, the
runner records
> the last
> > writer
> > >> (only if a change occurred)
> > >>
> > >> I believe this is not necessary because
there will only be
> > one writer at
> > >> a time for a particular bundle and key
range, hence
> only one
> > writer
> > >> holds a valid cache token for a
particular state and
> key range.
> > >>
> > >>
> > >> @Reuven:
> > >>
> > >> > Dataflow divides the keyspace up into
lexicographic
> > ranges, and
> > >> creates a cache token per range.
> > >>
> > >> State is always processed partitioned by
the Flink workers
> > (hash-based,
> > >> not lexicopgrahical). I don't think that
matters though
> > because the key
> > >> ranges do not overlap between the
workers. Flink does
> not support
> > >> dynamically repartitioning the key
ranges. Even in case of
> > fine-grained
> > >> recovery of workers and their key ranges,
we would simply
> > generate new
> > >> cache tokens for a particular worker.
> > >>
> > >>
> > >> Dataflow's ranges are also hash based. When I
said
> lexicographical, I
> > >> meant lexicographical based on the
hexadecimal hash value.
> > >>
> > >> Indeed the fact that Dataflow can dynamically
split and
> merge these
> > >> ranges is what makes it trickier. If Flink
does not
> repartition the
> > >> ranges, then things are much easier.
> > >>
> > >>
> > >>
> > >> Thanks,
> > >> Max
> > >>
> > >> On 21.08.19 09:33, Reuven Lax wrote:
> > >> > Dataflow does something like this,
however since work is
> > >> > load balanced across workers a
per-worker id doesn't
> work
> > very well.
> > >> > Dataflow divides the keyspace up into
lexicographic
> ranges, and
> > >> creates
> > >> > a cache token per range.
> > >> >
> > >> > On Tue, Aug 20, 2019 at 8:35 PM Thomas
Weise
> > <t...@apache.org <mailto:t...@apache.org>
<mailto:t...@apache.org <mailto:t...@apache.org>>
> <mailto:t...@apache.org <mailto:t...@apache.org>
<mailto:t...@apache.org <mailto:t...@apache.org>>>
> > >> <mailto:t...@apache.org
<mailto:t...@apache.org> <mailto:t...@apache.org
<mailto:t...@apache.org>>
> <mailto:t...@apache.org <mailto:t...@apache.org>
<mailto:t...@apache.org <mailto:t...@apache.org>>>>
> > >> > <mailto:t...@apache.org
<mailto:t...@apache.org> <mailto:t...@apache.org
<mailto:t...@apache.org>>
> <mailto:t...@apache.org <mailto:t...@apache.org>
<mailto:t...@apache.org <mailto:t...@apache.org>>>
> > <mailto:t...@apache.org <mailto:t...@apache.org>
<mailto:t...@apache.org <mailto:t...@apache.org>>
> <mailto:t...@apache.org <mailto:t...@apache.org>
<mailto:t...@apache.org <mailto:t...@apache.org>>>>>> wrote:
> > >> >
> > >> > Commenting here vs. on the PR since
related to
> the overall
> > >> approach.
> > >> >
> > >> > Wouldn't it be simpler to have the
runner just
> track a
> > unique
> > >> ID for
> > >> > each worker and use that to
communicate if the
> cache is
> > valid
> > >> or not?
> > >> >
> > >> > * When the bundle is started, the
runner tells the
> > worker if the
> > >> > cache has become invalid (since it
knows if another
> > worker has
> > >> > mutated state)
> > >> > * When the worker sends mutation
requests to the
> runner, it
> > >> includes
> > >> > its own ID (or the runner already
has it as
> contextual
> > >> information).
> > >> > No need to wait for a response.
> > >> > * When the bundle is finished, the
runner
> records the
> > last writer
> > >> > (only if a change occurred)
> > >> >
> > >> > Whenever current worker ID and last
writer ID
> doesn't
> > match, cache
> > >> > is invalid.
> > >> >
> > >> > Thomas
> > >> >
> > >> >
> > >> > On Tue, Aug 20, 2019 at 11:42 AM
Lukasz Cwik
> > <lc...@google.com <mailto:lc...@google.com>
<mailto:lc...@google.com <mailto:lc...@google.com>>
> <mailto:lc...@google.com <mailto:lc...@google.com>
<mailto:lc...@google.com <mailto:lc...@google.com>>>
> > >> <mailto:lc...@google.com
<mailto:lc...@google.com> <mailto:lc...@google.com
<mailto:lc...@google.com>>
> <mailto:lc...@google.com <mailto:lc...@google.com>
<mailto:lc...@google.com <mailto:lc...@google.com>>>>
> > >> > <mailto:lc...@google.com
<mailto:lc...@google.com>
> <mailto:lc...@google.com <mailto:lc...@google.com>>
<mailto:lc...@google.com <mailto:lc...@google.com>
> <mailto:lc...@google.com <mailto:lc...@google.com>>>
> > <mailto:lc...@google.com
<mailto:lc...@google.com> <mailto:lc...@google.com
<mailto:lc...@google.com>>
> <mailto:lc...@google.com <mailto:lc...@google.com>
<mailto:lc...@google.com <mailto:lc...@google.com>>>>>> wrote:
> > >> >
> > >> > Having cache tokens per key
would be very
> expensive
> > indeed
> > >> and I
> > >> > believe we should go with a
single cache token
> > "per" bundle.
> > >> >
> > >> > On Mon, Aug 19, 2019 at 11:36
AM Maximilian
> Michels
> > >> > <m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>
> > >> <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>>> wrote:
> > >> >
> > >> > Maybe a Beam Python expert
can chime in for
> > Rakesh's
> > >> question?
> > >> >
> > >> > Luke, I was assuming cache
tokens to be
> per key
> > and state
> > >> > id. During
> > >> > implementing an initial support on the
> Runner
> > side, I
> > >> > realized that we
> > >> > probably want cache tokens to only be
> per state
> > id. Note
> > >> > that if we had
> > >> > per-key cache tokens, the number of cache
> > tokens would
> > >> > approach the
> > >> > total number of keys in an
application.
> > >> >
> > >> > If anyone wants to have a
look, here is
> a first
> > version of
> > >> > the Runner
> > >> > side for cache tokens. Note
that I only
> > implemented cache
> > >> > tokens for
> > >> > BagUserState for now, but it can be easily
> > added for side
> > >> > inputs as well.
> > >> >
> > >> > https://github.com/apache/beam/pull/9374
> > >> >
> > >> > -Max
> > >> >
> > >> >
> > >>
> >
>