Yes, that makes sense. What do you think about creating a document to summarize the ideas presented here? Also, it would be good to capture the status quo regarding caching in the Python SDK.
-Max On 13.08.19 22:44, Thomas Weise wrote: > The token would be needed in general to invalidate the cache when > bundles are processed by different workers. > > In the case of the Flink runner we don't have a scenario of SDK worker > surviving the runner in the case of a failure, so there is no > possibility of inconsistent state as result of a checkpoint failure. > > -- > sent from mobile > > On Tue, Aug 13, 2019, 1:18 PM Maximilian Michels <m...@apache.org > <mailto:m...@apache.org>> wrote: > > Thanks for clarifying. Cache-invalidation for side inputs makes sense. > > In case the Runner fails to checkpoint, could it not re-attempt the > checkpoint? At least in the case of Flink, the cache would still be > valid until another checkpoint is attempted. For other Runners that may > not be the case. Also, rolling back state while keeping the SDK Harness > running requires to invalidate the cache. > > -Max > > On 13.08.19 18:09, Lukasz Cwik wrote: > > > > > > On Tue, Aug 13, 2019 at 4:36 AM Maximilian Michels <m...@apache.org > <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: > > > > Agree that we have to be able to flush before a checkpoint to > avoid > > caching too many elements. Also good point about checkpoint costs > > increasing with flushing the cache on checkpoints. A LRU cache > policy in > > the SDK seems desirable. > > > > What is the role of the cache token in the design document[1]? > It looks > > to me that the token is used to give the Runner control over > which and > > how many elements can be cached by the SDK. Why is that necessary? > > Shouldn't this be up to the SDK? > > > > > > We want to be able to handle the case where the SDK completes the > bundle > > successfully but the runner fails to checkpoint the information. > > We also want the runner to be able to pass in cache tokens for things > > like side inputs which may change over time (and the SDK would not > know > > that this happened). > > > > > > -Max > > > > [1] > > > > https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m > > > > Is it simply to > > On 12.08.19 19:55, Lukasz Cwik wrote: > > > > > > > > > On Mon, Aug 12, 2019 at 10:09 AM Thomas Weise > <t...@apache.org <mailto:t...@apache.org> > > <mailto:t...@apache.org <mailto:t...@apache.org>> > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>> wrote: > > > > > > > > > On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels > > <m...@apache.org <mailto:m...@apache.org> <mailto:m...@apache.org > <mailto:m...@apache.org>> > > > <mailto:m...@apache.org <mailto:m...@apache.org> > <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote: > > > > > > Thanks for starting this discussion Rakesh. An > efficient cache > > > layer is > > > one of the missing pieces for good performance in > stateful > > > pipelines. > > > The good news are that there is a level of caching > already > > > present in > > > Python which batches append requests until the bundle is > > finished. > > > > > > Thomas, in your example indeed we would have to > profile to see > > > why CPU > > > utilization is high on the Flink side but not in the > > Python SDK > > > harness. > > > For example, older versions of Flink (<=1.5) have a high > > cost of > > > deleting existing instances of a timer when setting > a timer. > > > Nevertheless, cross-bundle caching would likely > result in > > increased > > > performance. > > > > > > > > > CPU on the Flink side was unchanged, and that's > important. The > > > throughout improvement comes from the extended bundle > caching > > on the > > > SDK side. That's what tells me that cross-bundle caching is > > needed. > > > Of course, it will require a good solution for the write > also > > and I > > > like your idea of using the checkpoint boundary for that, > > especially > > > since that already aligns with the bundle boundary and > is under > > > runner control. Of course we also want to be careful to > not cause > > > overly bursty writes. > > > > > > Profiling will be useful for the timer processing, that > is also on > > > my list of suspects. > > > > > > > > > Luke, I think the idea to merge pending state requests > > could be > > > complementary to caching across bundles. > > > > > > Question: Couldn't we defer flushing back state from the > > SDK to the > > > Runner indefinitely, provided that we add a way to > flush the > > > state in > > > case of a checkpoint? > > > > > > > > > Flushing is needed to prevent the SDK from running out of > memory. > > Having > > > a fixed budget for state inside the SDK would have flushing > happen > > under > > > certain state usage scenarios. > > > I could also see that only flushing at checkpoint may lead > to slow > > > checkpoint performance so we may want to flush state that > hasn't been > > > used in a while as well. > > > > > > > > > Another performance improvement would be caching > read requests > > > because > > > these first go to the Runner regardless of already > cached > > appends. > > > > > > -Max > > > > > > On 09.08.19 17:12, Lukasz Cwik wrote: > > > > > > > > > > > > On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw > > > <rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > > <mailto:rober...@google.com > <mailto:rober...@google.com> <mailto:rober...@google.com > <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>>> wrote: > > > > > > > > The question is whether the SDK needs to wait > for the > > > StateResponse to > > > > come back before declaring the bundle done. > The proposal > > > was to not > > > > send the cache token back as part of an append > > > StateResponse [1], but > > > > pre-provide it as part of the bundle request. > > > > > > > > > > > > Agree, the purpose of the I'm Blocked message is > to occur > > > during bundle > > > > processing. > > > > > > > > > > > > Thinking about this some more, if we assume > the state > > > response was > > > > successfully applied, there's no reason for > the SDK to > > > block the > > > > bundle until it has its hands on the cache > token--we can > > > update the > > > > cache once the StateResponse comes back whether or > > not the > > > bundle is > > > > still active. On the other hand, the runner > needs a > > way to > > > assert it > > > > has received and processed all StateRequests from > > the SDK > > > associated > > > > with a bundle before it can declare the bundle > complete > > > (regardless of > > > > the cache tokens), so this might not be safe > without > > some > > > extra > > > > coordination (e.g. the ProcessBundleResponse > indicating > > > the number of > > > > state requests associated with a bundle). > > > > > > > > > > > > Since the state request stream is ordered, we can > add the id > > > of the last > > > > state request as part of the ProcessBundleResponse. > > > > > > > > > > > > [1] > > > > > > > > > > > https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627 > > > > > > > > On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik > > > <lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > > <mailto:lc...@google.com > <mailto:lc...@google.com> <mailto:lc...@google.com > <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote: > > > > > > > > > > The purpose of the new state API call in > BEAM-7000 > > is to > > > tell the > > > > runner that the SDK is now blocked waiting for the > > result of a > > > > specific state request and it should be used for > > fetches (not > > > > updates) and is there to allow for SDKs to > differentiate > > > readLater > > > > (I will need this data at some point in time > in the > > > future) from > > > > read (I need this data now). This comes up > commonly > > where > > > the user > > > > prefetches multiple state cells and then looks > at their > > > content > > > > allowing the runner to batch up those calls on > its end. > > > > > > > > > > The way it can be used for clear+append is > that the > > > runner can > > > > store requests in memory up until some > time/memory limit > > > or until it > > > > gets its first "blocked" call and then issue > all the > > > requests together. > > > > > > > > > > > > > > > On Thu, Aug 8, 2019 at 9:42 AM Robert Bradshaw > > > > <rober...@google.com > <mailto:rober...@google.com> <mailto:rober...@google.com > <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com > <mailto:rober...@google.com> <mailto:rober...@google.com > <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>>> wrote: > > > > >> > > > > >> On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise > > > <t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>>> wrote: > > > > >> > > > > > >> > That would add a synchronization point that > > forces extra > > > > latency especially in streaming mode. > > > > >> > > > > > >> > Wouldn't it be possible for the runner to > > assign the > > > token when > > > > starting the bundle and for the SDK to pass it > along > > the state > > > > requests? That way, there would be no need to > batch and > > > wait for a > > > > flush. > > > > >> > > > > >> I think it makes sense to let the runner > pre-assign > > > these state > > > > update > > > > >> tokens rather than forcing a > synchronization point. > > > > >> > > > > >> Here's some pointers for the Python > implementation: > > > > >> > > > > >> Currently, when a DoFn needs UserState, a > > StateContext > > > object is used > > > > >> that converts from a StateSpec to the > actual value. > > > When running > > > > >> portably, this is FnApiUserStateContext > [1]. The > > state > > > handles > > > > >> themselves are cached at [2] but this > context only > > > lives for the > > > > >> lifetime of a single bundle. Logic could be > added > > here > > > to use the > > > > >> token to share these across bundles. > > > > >> > > > > >> Each of these handles in turn invokes > > > state_handler.get* methods when > > > > >> its read is called. (Here state_handler is > a thin > > > wrapper around the > > > > >> service itself) and constructs the > appropriate result > > > from the > > > > >> StateResponse. We would need to implement > caching at > > > this level as > > > > >> well, including the deserialization. This will > > probably > > > require some > > > > >> restructoring of how _StateBackedIterable is > > > implemented (or, > > > > >> possibly, making that class itself cache > aware). > > > Hopefully that's > > > > >> enough to get started. > > > > >> > > > > >> [1] > > > > > > > > > > > https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L402 > > > > >> [2] > > > > > > > > > > > https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L436 > > > > >> . > > > > >> > > > > >> > On Mon, Aug 5, 2019 at 2:49 PM Lukasz Cwik > > > <lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > > <mailto:lc...@google.com > <mailto:lc...@google.com> <mailto:lc...@google.com > <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote: > > > > >> >> > > > > >> >> I believe the intent is to add a new > state API > > call > > > telling > > > > the runner that it is blocked waiting for a > response > > > (BEAM-7000). > > > > >> >> > > > > >> >> This should allow the runner to wait > till it sees > > > one of these > > > > I'm blocked requests and then merge + batch > any state > > > calls it may > > > > have at that point in time allowing it to convert > > clear + > > > appends > > > > into set calls and do any other optimizations as > > well. By > > > default, > > > > the runner would have a time and space based limit > > on how many > > > > outstanding state calls there are before > choosing to > > > resolve them. > > > > >> >> > > > > >> >> On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik > > > <lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > > <mailto:lc...@google.com > <mailto:lc...@google.com> <mailto:lc...@google.com > <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote: > > > > >> >>> > > > > >> >>> Now I see what you mean. > > > > >> >>> > > > > >> >>> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise > > > <t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>>> wrote: > > > > >> >>>> > > > > >> >>>> Hi Luke, > > > > >> >>>> > > > > >> >>>> I guess the answer is that it depends > on the > > state > > > backend. > > > > If a set operation in the state backend is > available > > that > > > is more > > > > efficient than clear+append, then it would be > beneficial > > > to have a > > > > dedicated fn api operation to allow for such > > optimization. > > > That's > > > > something that needs to be determined with a > profiler :) > > > > >> >>>> > > > > >> >>>> But the low hanging fruit is cross-bundle > > caching. > > > > >> >>>> > > > > >> >>>> Thomas > > > > >> >>>> > > > > >> >>>> On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik > > > <lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > > <mailto:lc...@google.com > <mailto:lc...@google.com> <mailto:lc...@google.com > <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote: > > > > >> >>>>> > > > > >> >>>>> Thomas, why do you think a single > round trip is > > > needed? > > > > >> >>>>> > > > > >> >>>>> clear + append can be done blindly > from the SDK > > > side and it > > > > has total knowledge of the state at that point > in time > > > till the end > > > > of the bundle at which point you want to wait > to get the > > > cache token > > > > back from the runner for the append call so > that for the > > > next bundle > > > > you can reuse the state if the key wasn't > processed > > elsewhere. > > > > >> >>>>> > > > > >> >>>>> Also, all state calls are "streamed" over > > gRPC so > > > you don't > > > > need to wait for clear to complete before > being able to > > > send append. > > > > >> >>>>> > > > > >> >>>>> On Tue, Jul 30, 2019 at 12:58 AM > jincheng sun > > > > <sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com> > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com>> > > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com> > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com>>> > > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com> > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com>> > > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com> > > <mailto:sunjincheng...@gmail.com > <mailto:sunjincheng...@gmail.com>>>>> wrote: > > > > >> >>>>>> > > > > >> >>>>>> Hi Rakesh, > > > > >> >>>>>> > > > > >> >>>>>> Glad to see you pointer this problem > out! > > > > >> >>>>>> +1 for add this implementation. Manage > > State by > > > > write-through-cache is pretty important for > > Streaming job! > > > > >> >>>>>> > > > > >> >>>>>> Best, Jincheng > > > > >> >>>>>> > > > > >> >>>>>> Thomas Weise <t...@apache.org > <mailto:t...@apache.org> > > <mailto:t...@apache.org <mailto:t...@apache.org>> > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>>> 于 > > > > 2019年7月29日周一 下午8:54写道: > > > > >> >>>>>>> > > > > >> >>>>>>> FYI a basic test appears to confirm the > > > importance of the > > > > cross-bundle caching: I found that the > throughput can be > > > increased > > > > by playing with the bundle size in the Flink > runner. > > > Default caps at > > > > 1000 elements (or 1 second). So on a high > throughput > > > stream the > > > > bundles would be capped by the count limit. > Bumping the > > > count limit > > > > increases the throughput by reducing the chatter > > over the > > > state > > > > plane (more cache hits due to larger bundle). > > > > >> >>>>>>> > > > > >> >>>>>>> The next level of investigation > would involve > > > profiling. > > > > But just by looking at metrics, the CPU > utilization > > on the > > > Python > > > > worker side dropped significantly while on the > Flink > > side > > > it remains > > > > nearly same. There are no metrics for state > > operations on > > > either > > > > side, I think it would be very helpful to get > these in > > > place also. > > > > >> >>>>>>> > > > > >> >>>>>>> Below the stateful processing code for > > reference. > > > > >> >>>>>>> > > > > >> >>>>>>> Thomas > > > > >> >>>>>>> > > > > >> >>>>>>> > > > > >> >>>>>>> class StatefulFn(beam.DoFn): > > > > >> >>>>>>> count_state_spec = > > > userstate.CombiningValueStateSpec( > > > > >> >>>>>>> 'count', > > > > > > beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum) > > > > >> >>>>>>> timer_spec = > userstate.TimerSpec('timer', > > > > userstate.TimeDomain.WATERMARK) > > > > >> >>>>>>> > > > > >> >>>>>>> def process(self, kv, > > > > count=beam.DoFn.StateParam(count_state_spec), > > > > timer=beam.DoFn.TimerParam(timer_spec), > > > window=beam.DoFn.WindowParam): > > > > >> >>>>>>> count.add(1) > > > > >> >>>>>>> timer_seconds = > (window.end.micros // > > > 1000000) - 1 > > > > >> >>>>>>> timer.set(timer_seconds) > > > > >> >>>>>>> > > > > >> >>>>>>> @userstate.on_timer(timer_spec) > > > > >> >>>>>>> def process_timer(self, > > > > count=beam.DoFn.StateParam(count_state_spec), > > > > window=beam.DoFn.WindowParam): > > > > >> >>>>>>> if count.read() == 0: > > > > >> >>>>>>> > logging.warning("###timer fired > > > with count > > > > %d, window %s" % (count.read(), window)) > > > > >> >>>>>>> > > > > >> >>>>>>> > > > > >> >>>>>>> > > > > >> >>>>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert > > Bradshaw > > > > <rober...@google.com > <mailto:rober...@google.com> <mailto:rober...@google.com > <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com > <mailto:rober...@google.com> <mailto:rober...@google.com > <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>>> wrote: > > > > >> >>>>>>>> > > > > >> >>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM > Rakesh Kumar > > > > <rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com>> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>> > > > <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com>> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>>> wrote: > > > > >> >>>>>>>> > > > > > >> >>>>>>>> > Thanks Robert, > > > > >> >>>>>>>> > > > > > >> >>>>>>>> > I stumble on the jira that you have > > created > > > some time ago > > > > >> >>>>>>>> > > > https://jira.apache.org/jira/browse/BEAM-5428 > > > > >> >>>>>>>> > > > > > >> >>>>>>>> > You also marked code where code > > changes are > > > required: > > > > >> >>>>>>>> > > > > > > > > > > > > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291 > > > > >> >>>>>>>> > > > > > > > > > > > > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349 > > > > >> >>>>>>>> > > > > > > > > > > > > https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465 > > > > >> >>>>>>>> > > > > > >> >>>>>>>> > I am willing to provide help to > implement > > > this. Let me > > > > know how I can help. > > > > >> >>>>>>>> > > > > >> >>>>>>>> As far as I'm aware, no one is > actively > > > working on it > > > > right now. > > > > >> >>>>>>>> Please feel free to assign > yourself the JIRA > > > entry and > > > > I'll be happy > > > > >> >>>>>>>> to answer any questions you might > have if > > > (well probably > > > > when) these > > > > >> >>>>>>>> pointers are insufficient. > > > > >> >>>>>>>> > > > > >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM > Robert > > Bradshaw > > > > <rober...@google.com > <mailto:rober...@google.com> <mailto:rober...@google.com > <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com > <mailto:rober...@google.com> <mailto:rober...@google.com > <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>>> wrote: > > > > >> >>>>>>>> >> > > > > >> >>>>>>>> >> This is documented at > > > > >> >>>>>>>> >> > > > > > > > > > > > https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m > > > > >> >>>>>>>> >> . Note that it requires > participation of > > > both the > > > > runner and the SDK > > > > >> >>>>>>>> >> (though there are no correctness > > issues if > > > one or the > > > > other side does > > > > >> >>>>>>>> >> not understand the protocol, > caching just > > > won't be used). > > > > >> >>>>>>>> >> > > > > >> >>>>>>>> >> I don't think it's been implemented > > > anywhere, but > > > > could be very > > > > >> >>>>>>>> >> beneficial for performance. > > > > >> >>>>>>>> >> > > > > >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM > > Rakesh Kumar > > > > <rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com>> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>> > > > <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com>> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>>> wrote: > > > > >> >>>>>>>> >> > > > > > >> >>>>>>>> >> > I checked the python sdk[1] > and it has > > > similar > > > > implementation as Java SDK. > > > > >> >>>>>>>> >> > > > > > >> >>>>>>>> >> > I would agree with Thomas. In > case of > > > high volume > > > > event stream and bigger cluster size, network > call can > > > potentially > > > > cause a bottleneck. > > > > >> >>>>>>>> >> > > > > > >> >>>>>>>> >> > @Robert > > > > >> >>>>>>>> >> > I am interested to see the > > proposal. Can you > > > > provide me the link of the proposal? > > > > >> >>>>>>>> >> > > > > > >> >>>>>>>> >> > [1]: > > > > > > > > > > > https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295 > > > > >> >>>>>>>> >> > > > > > >> >>>>>>>> >> > > > > > >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM > > Thomas Weise > > > > <t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>> > > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>> > > <mailto:t...@apache.org <mailto:t...@apache.org> > <mailto:t...@apache.org <mailto:t...@apache.org>>>>> wrote: > > > > >> >>>>>>>> >> >> > > > > >> >>>>>>>> >> >> Thanks for the pointer. For > streaming, > > > it will be > > > > important to support caching across bundles. > It appears > > > that even > > > > the Java SDK doesn't support that yet? > > > > >> >>>>>>>> >> >> > > > > >> >>>>>>>> >> >> > > > > > > > > > > > https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221 > > > > >> >>>>>>>> >> >> > > > > >> >>>>>>>> >> >> Regarding clear/append: It would > > be nice > > > if both > > > > could occur within a single Fn Api roundtrip when > > the state is > > > > persisted. > > > > >> >>>>>>>> >> >> > > > > >> >>>>>>>> >> >> Thanks, > > > > >> >>>>>>>> >> >> Thomas > > > > >> >>>>>>>> >> >> > > > > >> >>>>>>>> >> >> > > > > >> >>>>>>>> >> >> > > > > >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM > > Lukasz Cwik > > > > <lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>> > > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>> > > <mailto:lc...@google.com <mailto:lc...@google.com> > <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote: > > > > >> >>>>>>>> >> >>> > > > > >> >>>>>>>> >> >>> User state is built on top > of read, > > > append and > > > > clear and not off a read and write paradigm to > allow for > > > blind appends. > > > > >> >>>>>>>> >> >>> > > > > >> >>>>>>>> >> >>> The optimization you speak > of can > > be done > > > > completely inside the SDK without any additional > > protocol > > > being > > > > required as long as you clear the state first > and then > > > append all > > > > your new data. The Beam Java SDK does this for all > > runners > > > when > > > > executed portably[1]. You could port the same > logic > > to the > > > Beam > > > > Python SDK as well. > > > > >> >>>>>>>> >> >>> > > > > >> >>>>>>>> >> >>> 1: > > > > > > > > > > > https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84 > > > > >> >>>>>>>> >> >>> > > > > >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM > > Robert > > > Bradshaw > > > > <rober...@google.com > <mailto:rober...@google.com> <mailto:rober...@google.com > <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>> > > > <mailto:rober...@google.com > <mailto:rober...@google.com> <mailto:rober...@google.com > <mailto:rober...@google.com>> > > <mailto:rober...@google.com <mailto:rober...@google.com> > <mailto:rober...@google.com <mailto:rober...@google.com>>>>> wrote: > > > > >> >>>>>>>> >> >>>> > > > > >> >>>>>>>> >> >>>> Python workers also have a > > per-bundle > > > SDK-side > > > > cache. A protocol has > > > > >> >>>>>>>> >> >>>> been proposed, but hasn't > yet been > > > implemented > > > > in any SDKs or runners. > > > > >> >>>>>>>> >> >>>> > > > > >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at > 6:02 AM > > Reuven Lax > > > > <re...@google.com <mailto:re...@google.com> > <mailto:re...@google.com <mailto:re...@google.com>> > > <mailto:re...@google.com <mailto:re...@google.com> > <mailto:re...@google.com <mailto:re...@google.com>>> > > > <mailto:re...@google.com <mailto:re...@google.com> > <mailto:re...@google.com <mailto:re...@google.com>> > > <mailto:re...@google.com <mailto:re...@google.com> > <mailto:re...@google.com <mailto:re...@google.com>>>>> wrote: > > > > >> >>>>>>>> >> >>>> > > > > > >> >>>>>>>> >> >>>> > It's runner dependent. Some > > runners > > > (e.g. the > > > > Dataflow runner) do have such a cache, though I > > think it's > > > currently > > > > has a cap for large bags. > > > > >> >>>>>>>> >> >>>> > > > > > >> >>>>>>>> >> >>>> > Reuven > > > > >> >>>>>>>> >> >>>> > > > > > >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at > 8:48 PM > > > Rakesh Kumar > > > > <rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com>> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>> > > > <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com > <mailto:rakeshku...@lyft.com>> > > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com> > <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>>> wrote: > > > > >> >>>>>>>> >> >>>> >> > > > > >> >>>>>>>> >> >>>> >> Hi, > > > > >> >>>>>>>> >> >>>> >> > > > > >> >>>>>>>> >> >>>> >> I have been using > python sdk > > for the > > > > application and also using BagState in > production. I was > > > wondering > > > > whether state logic has any write-through-cache > > > implemented or not. > > > > If we are sending every read and write request > through > > > network then > > > > it comes with a performance cost. We can avoid > network > > > call for a > > > > read operation if we have write-through-cache. > > > > >> >>>>>>>> >> >>>> >> I have superficially looked > > into the > > > > implementation and I didn't see any cache > > implementation. > > > > >> >>>>>>>> >> >>>> >> > > > > >> >>>>>>>> >> >>>> >> is it possible to have this > > cache? > > > would it > > > > cause any issue if we have the caching layer? > > > > >> >>>>>>>> >> >>>> >> > > > > > > > > > >