Yes, that makes sense. What do you think about creating a document to
summarize the ideas presented here? Also, it would be good to capture
the status quo regarding caching in the Python SDK.

-Max

On 13.08.19 22:44, Thomas Weise wrote:
> The token would be needed in general to invalidate the cache when
> bundles are processed by different workers.
> 
> In the case of the Flink runner we don't have a scenario of SDK worker
> surviving the runner in the case of a failure, so there is no
> possibility of inconsistent state as result of a checkpoint failure.
> 
> --
> sent from mobile
> 
> On Tue, Aug 13, 2019, 1:18 PM Maximilian Michels <m...@apache.org
> <mailto:m...@apache.org>> wrote:
> 
>     Thanks for clarifying. Cache-invalidation for side inputs makes sense.
> 
>     In case the Runner fails to checkpoint, could it not re-attempt the
>     checkpoint? At least in the case of Flink, the cache would still be
>     valid until another checkpoint is attempted. For other Runners that may
>     not be the case. Also, rolling back state while keeping the SDK Harness
>     running requires to invalidate the cache.
> 
>     -Max
> 
>     On 13.08.19 18:09, Lukasz Cwik wrote:
>     >
>     >
>     > On Tue, Aug 13, 2019 at 4:36 AM Maximilian Michels <m...@apache.org
>     <mailto:m...@apache.org>
>     > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>     >
>     >     Agree that we have to be able to flush before a checkpoint to
>     avoid
>     >     caching too many elements. Also good point about checkpoint costs
>     >     increasing with flushing the cache on checkpoints. A LRU cache
>     policy in
>     >     the SDK seems desirable.
>     >
>     >     What is the role of the cache token in the design document[1]?
>     It looks
>     >     to me that the token is used to give the Runner control over
>     which and
>     >     how many elements can be cached by the SDK. Why is that necessary?
>     >     Shouldn't this be up to the SDK?
>     >
>     >  
>     > We want to be able to handle the case where the SDK completes the
>     bundle
>     > successfully but the runner fails to checkpoint the information.
>     > We also want the runner to be able to pass in cache tokens for things
>     > like side inputs which may change over time (and the SDK would not
>     know
>     > that this happened).
>     >  
>     >
>     >     -Max
>     >
>     >     [1]
>     >   
>      
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>     >
>     >     Is it simply to
>     >     On 12.08.19 19:55, Lukasz Cwik wrote:
>     >     >
>     >     >
>     >     > On Mon, Aug 12, 2019 at 10:09 AM Thomas Weise
>     <t...@apache.org <mailto:t...@apache.org>
>     >     <mailto:t...@apache.org <mailto:t...@apache.org>>
>     >     > <mailto:t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>>> wrote:
>     >     >
>     >     >
>     >     >     On Mon, Aug 12, 2019 at 8:53 AM Maximilian Michels
>     >     <m...@apache.org <mailto:m...@apache.org> <mailto:m...@apache.org
>     <mailto:m...@apache.org>>
>     >     >     <mailto:m...@apache.org <mailto:m...@apache.org>
>     <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
>     >     >
>     >     >         Thanks for starting this discussion Rakesh. An
>     efficient cache
>     >     >         layer is
>     >     >         one of the missing pieces for good performance in
>     stateful
>     >     >         pipelines.
>     >     >         The good news are that there is a level of caching
>     already
>     >     >         present in
>     >     >         Python which batches append requests until the bundle is
>     >     finished.
>     >     >
>     >     >         Thomas, in your example indeed we would have to
>     profile to see
>     >     >         why CPU
>     >     >         utilization is high on the Flink side but not in the
>     >     Python SDK
>     >     >         harness.
>     >     >         For example, older versions of Flink (<=1.5) have a high
>     >     cost of
>     >     >         deleting existing instances of a timer when setting
>     a timer.
>     >     >         Nevertheless, cross-bundle caching would likely
>     result in
>     >     increased
>     >     >         performance.
>     >     >
>     >     >
>     >     >     CPU on the Flink side was unchanged, and that's
>     important. The
>     >     >     throughout improvement comes from the extended bundle
>     caching
>     >     on the
>     >     >     SDK side. That's what tells me that cross-bundle caching is
>     >     needed.
>     >     >     Of course, it will require a good solution for the write
>     also
>     >     and I
>     >     >     like your idea of using the checkpoint boundary for that,
>     >     especially
>     >     >     since that already aligns with the bundle boundary and
>     is under
>     >     >     runner control. Of course we also want to be careful to
>     not cause
>     >     >     overly bursty writes.
>     >     >
>     >     >     Profiling will be useful for the timer processing, that
>     is also on
>     >     >     my list of suspects.
>     >     >
>     >     >
>     >     >         Luke, I think the idea to merge pending state requests
>     >     could be
>     >     >         complementary to caching across bundles.
>     >     >
>     >     >         Question: Couldn't we defer flushing back state from the
>     >     SDK to the
>     >     >         Runner indefinitely, provided that we add a way to
>     flush the
>     >     >         state in
>     >     >         case of a checkpoint?
>     >     >
>     >     >
>     >     > Flushing is needed to prevent the SDK from running out of
>     memory.
>     >     Having
>     >     > a fixed budget for state inside the SDK would have flushing
>     happen
>     >     under
>     >     > certain state usage scenarios.
>     >     > I could also see that only flushing at checkpoint may lead
>     to slow
>     >     > checkpoint performance so we may want to flush state that
>     hasn't been
>     >     > used in a while as well.
>     >     >  
>     >     >
>     >     >         Another performance improvement would be caching
>     read requests
>     >     >         because
>     >     >         these first go to the Runner regardless of already
>     cached
>     >     appends.
>     >     >
>     >     >         -Max
>     >     >
>     >     >         On 09.08.19 17:12, Lukasz Cwik wrote:
>     >     >         >
>     >     >         >
>     >     >         > On Fri, Aug 9, 2019 at 2:32 AM Robert Bradshaw
>     >     >         <rober...@google.com <mailto:rober...@google.com>
>     <mailto:rober...@google.com <mailto:rober...@google.com>>
>     >     <mailto:rober...@google.com <mailto:rober...@google.com>
>     <mailto:rober...@google.com <mailto:rober...@google.com>>>
>     >     >         > <mailto:rober...@google.com
>     <mailto:rober...@google.com> <mailto:rober...@google.com
>     <mailto:rober...@google.com>>
>     >     <mailto:rober...@google.com <mailto:rober...@google.com>
>     <mailto:rober...@google.com <mailto:rober...@google.com>>>>> wrote:
>     >     >         >
>     >     >         >     The question is whether the SDK needs to wait
>     for the
>     >     >         StateResponse to
>     >     >         >     come back before declaring the bundle done.
>     The proposal
>     >     >         was to not
>     >     >         >     send the cache token back as part of an append
>     >     >         StateResponse [1], but
>     >     >         >     pre-provide it as part of the bundle request.
>     >     >         >
>     >     >         >
>     >     >         > Agree, the purpose of the I'm Blocked message is
>     to occur
>     >     >         during bundle
>     >     >         > processing. 
>     >     >         >  
>     >     >         >
>     >     >         >     Thinking about this some more, if we assume
>     the state
>     >     >         response was
>     >     >         >     successfully applied, there's no reason for
>     the SDK to
>     >     >         block the
>     >     >         >     bundle until it has its hands on the cache
>     token--we can
>     >     >         update the
>     >     >         >     cache once the StateResponse comes back whether or
>     >     not the
>     >     >         bundle is
>     >     >         >     still active. On the other hand, the runner
>     needs a
>     >     way to
>     >     >         assert it
>     >     >         >     has received and processed all StateRequests from
>     >     the SDK
>     >     >         associated
>     >     >         >     with a bundle before it can declare the bundle
>     complete
>     >     >         (regardless of
>     >     >         >     the cache tokens), so this might not be safe
>     without
>     >     some
>     >     >         extra
>     >     >         >     coordination (e.g. the ProcessBundleResponse
>     indicating
>     >     >         the number of
>     >     >         >     state requests associated with a bundle).
>     >     >         >
>     >     >         >  
>     >     >         > Since the state request stream is ordered, we can
>     add the id
>     >     >         of the last
>     >     >         > state request as part of the ProcessBundleResponse.
>     >     >         >  
>     >     >         >
>     >     >         >     [1]
>     >     >         >   
>     >     >       
>     >   
>        
> https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L627
>     >     >         >
>     >     >         >     On Thu, Aug 8, 2019 at 6:57 PM Lukasz Cwik
>     >     >         <lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>
>     >     <mailto:lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>>
>     >     >         >     <mailto:lc...@google.com
>     <mailto:lc...@google.com> <mailto:lc...@google.com
>     <mailto:lc...@google.com>>
>     >     <mailto:lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote:
>     >     >         >     >
>     >     >         >     > The purpose of the new state API call in
>     BEAM-7000
>     >     is to
>     >     >         tell the
>     >     >         >     runner that the SDK is now blocked waiting for the
>     >     result of a
>     >     >         >     specific state request and it should be used for
>     >     fetches (not
>     >     >         >     updates) and is there to allow for SDKs to
>     differentiate
>     >     >         readLater
>     >     >         >     (I will need this data at some point in time
>     in the
>     >     >         future) from
>     >     >         >     read (I need this data now). This comes up
>     commonly
>     >     where
>     >     >         the user
>     >     >         >     prefetches multiple state cells and then looks
>     at their
>     >     >         content
>     >     >         >     allowing the runner to batch up those calls on
>     its end.
>     >     >         >     >
>     >     >         >     > The way it can be used for clear+append is
>     that the
>     >     >         runner can
>     >     >         >     store requests in memory up until some
>     time/memory limit
>     >     >         or until it
>     >     >         >     gets its first "blocked" call and then issue
>     all the
>     >     >         requests together.
>     >     >         >     >
>     >     >         >     >
>     >     >         >     > On Thu, Aug 8, 2019 at 9:42 AM Robert Bradshaw
>     >     >         >     <rober...@google.com
>     <mailto:rober...@google.com> <mailto:rober...@google.com
>     <mailto:rober...@google.com>>
>     >     <mailto:rober...@google.com <mailto:rober...@google.com>
>     <mailto:rober...@google.com <mailto:rober...@google.com>>>
>     >     >         <mailto:rober...@google.com
>     <mailto:rober...@google.com> <mailto:rober...@google.com
>     <mailto:rober...@google.com>>
>     >     <mailto:rober...@google.com <mailto:rober...@google.com>
>     <mailto:rober...@google.com <mailto:rober...@google.com>>>>> wrote:
>     >     >         >     >>
>     >     >         >     >> On Tue, Aug 6, 2019 at 12:07 AM Thomas Weise
>     >     >         <t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>
>     >     <mailto:t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>>
>     >     >         >     <mailto:t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>
>     >     <mailto:t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>>>> wrote:
>     >     >         >     >> >
>     >     >         >     >> > That would add a synchronization point that
>     >     forces extra
>     >     >         >     latency especially in streaming mode.
>     >     >         >     >> >
>     >     >         >     >> > Wouldn't it be possible for the runner to
>     >     assign the
>     >     >         token when
>     >     >         >     starting the bundle and for the SDK to pass it
>     along
>     >     the state
>     >     >         >     requests? That way, there would be no need to
>     batch and
>     >     >         wait for a
>     >     >         >     flush.
>     >     >         >     >>
>     >     >         >     >> I think it makes sense to let the runner
>     pre-assign
>     >     >         these state
>     >     >         >     update
>     >     >         >     >> tokens rather than forcing a
>     synchronization point.
>     >     >         >     >>
>     >     >         >     >> Here's some pointers for the Python
>     implementation:
>     >     >         >     >>
>     >     >         >     >> Currently, when a DoFn needs UserState, a
>     >     StateContext
>     >     >         object is used
>     >     >         >     >> that converts from a StateSpec to the
>     actual value.
>     >     >         When running
>     >     >         >     >> portably, this is FnApiUserStateContext
>     [1]. The
>     >     state
>     >     >         handles
>     >     >         >     >> themselves are cached at [2] but this
>     context only
>     >     >         lives for the
>     >     >         >     >> lifetime of a single bundle. Logic could be
>     added
>     >     here
>     >     >         to use the
>     >     >         >     >> token to share these across bundles.
>     >     >         >     >>
>     >     >         >     >> Each of these handles in turn invokes
>     >     >         state_handler.get* methods when
>     >     >         >     >> its read is called. (Here state_handler is
>     a thin
>     >     >         wrapper around the
>     >     >         >     >> service itself) and constructs the
>     appropriate result
>     >     >         from the
>     >     >         >     >> StateResponse. We would need to implement
>     caching at
>     >     >         this level as
>     >     >         >     >> well, including the deserialization. This will
>     >     probably
>     >     >         require some
>     >     >         >     >> restructoring of how _StateBackedIterable is
>     >     >         implemented (or,
>     >     >         >     >> possibly, making that class itself cache
>     aware).
>     >     >         Hopefully that's
>     >     >         >     >> enough to get started.
>     >     >         >     >>
>     >     >         >     >> [1]
>     >     >         >   
>     >     >       
>     >   
>        
> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L402
>     >     >         >     >> [2]
>     >     >         >   
>     >     >       
>     >   
>        
> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L436
>     >     >         >     >> .
>     >     >         >     >>
>     >     >         >     >> > On Mon, Aug 5, 2019 at 2:49 PM Lukasz Cwik
>     >     >         <lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>
>     >     <mailto:lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>>
>     >     >         >     <mailto:lc...@google.com
>     <mailto:lc...@google.com> <mailto:lc...@google.com
>     <mailto:lc...@google.com>>
>     >     <mailto:lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote:
>     >     >         >     >> >>
>     >     >         >     >> >> I believe the intent is to add a new
>     state API
>     >     call
>     >     >         telling
>     >     >         >     the runner that it is blocked waiting for a
>     response
>     >     >         (BEAM-7000).
>     >     >         >     >> >>
>     >     >         >     >> >> This should allow the runner to wait
>     till it sees
>     >     >         one of these
>     >     >         >     I'm blocked requests and then merge + batch
>     any state
>     >     >         calls it may
>     >     >         >     have at that point in time allowing it to convert
>     >     clear +
>     >     >         appends
>     >     >         >     into set calls and do any other optimizations as
>     >     well. By
>     >     >         default,
>     >     >         >     the runner would have a time and space based limit
>     >     on how many
>     >     >         >     outstanding state calls there are before
>     choosing to
>     >     >         resolve them.
>     >     >         >     >> >>
>     >     >         >     >> >> On Mon, Aug 5, 2019 at 5:43 PM Lukasz Cwik
>     >     >         <lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>
>     >     <mailto:lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>>
>     >     >         >     <mailto:lc...@google.com
>     <mailto:lc...@google.com> <mailto:lc...@google.com
>     <mailto:lc...@google.com>>
>     >     <mailto:lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote:
>     >     >         >     >> >>>
>     >     >         >     >> >>> Now I see what you mean.
>     >     >         >     >> >>>
>     >     >         >     >> >>> On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise
>     >     >         <t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>
>     >     <mailto:t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>>
>     >     >         >     <mailto:t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>
>     >     <mailto:t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>>>> wrote:
>     >     >         >     >> >>>>
>     >     >         >     >> >>>> Hi Luke,
>     >     >         >     >> >>>>
>     >     >         >     >> >>>> I guess the answer is that it depends
>     on the
>     >     state
>     >     >         backend.
>     >     >         >     If a set operation in the state backend is
>     available
>     >     that
>     >     >         is more
>     >     >         >     efficient than clear+append, then it would be
>     beneficial
>     >     >         to have a
>     >     >         >     dedicated fn api operation to allow for such
>     >     optimization.
>     >     >         That's
>     >     >         >     something that needs to be determined with a
>     profiler :)
>     >     >         >     >> >>>>
>     >     >         >     >> >>>> But the low hanging fruit is cross-bundle
>     >     caching.
>     >     >         >     >> >>>>
>     >     >         >     >> >>>> Thomas
>     >     >         >     >> >>>>
>     >     >         >     >> >>>> On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik
>     >     >         <lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>
>     >     <mailto:lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>>
>     >     >         >     <mailto:lc...@google.com
>     <mailto:lc...@google.com> <mailto:lc...@google.com
>     <mailto:lc...@google.com>>
>     >     <mailto:lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote:
>     >     >         >     >> >>>>>
>     >     >         >     >> >>>>> Thomas, why do you think a single
>     round trip is
>     >     >         needed?
>     >     >         >     >> >>>>>
>     >     >         >     >> >>>>> clear + append can be done blindly
>     from the SDK
>     >     >         side and it
>     >     >         >     has total knowledge of the state at that point
>     in time
>     >     >         till the end
>     >     >         >     of the bundle at which point you want to wait
>     to get the
>     >     >         cache token
>     >     >         >     back from the runner for the append call so
>     that for the
>     >     >         next bundle
>     >     >         >     you can reuse the state if the key wasn't
>     processed
>     >     elsewhere.
>     >     >         >     >> >>>>>
>     >     >         >     >> >>>>> Also, all state calls are "streamed" over
>     >     gRPC so
>     >     >         you don't
>     >     >         >     need to wait for clear to complete before
>     being able to
>     >     >         send append.
>     >     >         >     >> >>>>>
>     >     >         >     >> >>>>> On Tue, Jul 30, 2019 at 12:58 AM
>     jincheng sun
>     >     >         >     <sunjincheng...@gmail.com
>     <mailto:sunjincheng...@gmail.com>
>     >     <mailto:sunjincheng...@gmail.com
>     <mailto:sunjincheng...@gmail.com>>
>     >     >         <mailto:sunjincheng...@gmail.com
>     <mailto:sunjincheng...@gmail.com>
>     >     <mailto:sunjincheng...@gmail.com
>     <mailto:sunjincheng...@gmail.com>>>
>     >     >         <mailto:sunjincheng...@gmail.com
>     <mailto:sunjincheng...@gmail.com>
>     >     <mailto:sunjincheng...@gmail.com
>     <mailto:sunjincheng...@gmail.com>>
>     >     >         <mailto:sunjincheng...@gmail.com
>     <mailto:sunjincheng...@gmail.com>
>     >     <mailto:sunjincheng...@gmail.com
>     <mailto:sunjincheng...@gmail.com>>>>> wrote:
>     >     >         >     >> >>>>>>
>     >     >         >     >> >>>>>> Hi Rakesh,
>     >     >         >     >> >>>>>>
>     >     >         >     >> >>>>>> Glad to see you pointer this problem
>     out!
>     >     >         >     >> >>>>>> +1 for add this implementation. Manage
>     >     State by
>     >     >         >     write-through-cache is pretty important for
>     >     Streaming job!
>     >     >         >     >> >>>>>>
>     >     >         >     >> >>>>>> Best, Jincheng
>     >     >         >     >> >>>>>>
>     >     >         >     >> >>>>>> Thomas Weise <t...@apache.org
>     <mailto:t...@apache.org>
>     >     <mailto:t...@apache.org <mailto:t...@apache.org>>
>     >     >         <mailto:t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>>
>     >     <mailto:t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>
>     >     >         <mailto:t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>>>> 于
>     >     >         >     2019年7月29日周一 下午8:54写道:
>     >     >         >     >> >>>>>>>
>     >     >         >     >> >>>>>>> FYI a basic test appears to confirm the
>     >     >         importance of the
>     >     >         >     cross-bundle caching: I found that the
>     throughput can be
>     >     >         increased
>     >     >         >     by playing with the bundle size in the Flink
>     runner.
>     >     >         Default caps at
>     >     >         >     1000 elements (or 1 second). So on a high
>     throughput
>     >     >         stream the
>     >     >         >     bundles would be capped by the count limit.
>     Bumping the
>     >     >         count limit
>     >     >         >     increases the throughput by reducing the chatter
>     >     over the
>     >     >         state
>     >     >         >     plane (more cache hits due to larger bundle).
>     >     >         >     >> >>>>>>>
>     >     >         >     >> >>>>>>> The next level of investigation
>     would involve
>     >     >         profiling.
>     >     >         >     But just by looking at metrics, the CPU
>     utilization
>     >     on the
>     >     >         Python
>     >     >         >     worker side dropped significantly while on the
>     Flink
>     >     side
>     >     >         it remains
>     >     >         >     nearly same. There are no metrics for state
>     >     operations on
>     >     >         either
>     >     >         >     side, I think it would be very helpful to get
>     these in
>     >     >         place also.
>     >     >         >     >> >>>>>>>
>     >     >         >     >> >>>>>>> Below the stateful processing code for
>     >     reference.
>     >     >         >     >> >>>>>>>
>     >     >         >     >> >>>>>>> Thomas
>     >     >         >     >> >>>>>>>
>     >     >         >     >> >>>>>>>
>     >     >         >     >> >>>>>>> class StatefulFn(beam.DoFn):
>     >     >         >     >> >>>>>>>     count_state_spec =
>     >     >         userstate.CombiningValueStateSpec(
>     >     >         >     >> >>>>>>>         'count',
>     >     >         >   
>     >      beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum)
>     >     >         >     >> >>>>>>>     timer_spec =
>     userstate.TimerSpec('timer',
>     >     >         >     userstate.TimeDomain.WATERMARK)
>     >     >         >     >> >>>>>>>
>     >     >         >     >> >>>>>>>     def process(self, kv,
>     >     >         >     count=beam.DoFn.StateParam(count_state_spec),
>     >     >         >     timer=beam.DoFn.TimerParam(timer_spec),
>     >     >         window=beam.DoFn.WindowParam):
>     >     >         >     >> >>>>>>>         count.add(1)
>     >     >         >     >> >>>>>>>         timer_seconds =
>     (window.end.micros //
>     >     >         1000000) - 1
>     >     >         >     >> >>>>>>>         timer.set(timer_seconds)
>     >     >         >     >> >>>>>>>
>     >     >         >     >> >>>>>>>     @userstate.on_timer(timer_spec)
>     >     >         >     >> >>>>>>>     def process_timer(self,
>     >     >         >     count=beam.DoFn.StateParam(count_state_spec),
>     >     >         >     window=beam.DoFn.WindowParam):
>     >     >         >     >> >>>>>>>         if count.read() == 0:
>     >     >         >     >> >>>>>>>           
>      logging.warning("###timer fired
>     >     >         with count
>     >     >         >     %d, window %s" % (count.read(), window))
>     >     >         >     >> >>>>>>>
>     >     >         >     >> >>>>>>>
>     >     >         >     >> >>>>>>>
>     >     >         >     >> >>>>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert
>     >     Bradshaw
>     >     >         >     <rober...@google.com
>     <mailto:rober...@google.com> <mailto:rober...@google.com
>     <mailto:rober...@google.com>>
>     >     <mailto:rober...@google.com <mailto:rober...@google.com>
>     <mailto:rober...@google.com <mailto:rober...@google.com>>>
>     >     >         <mailto:rober...@google.com
>     <mailto:rober...@google.com> <mailto:rober...@google.com
>     <mailto:rober...@google.com>>
>     >     <mailto:rober...@google.com <mailto:rober...@google.com>
>     <mailto:rober...@google.com <mailto:rober...@google.com>>>>> wrote:
>     >     >         >     >> >>>>>>>>
>     >     >         >     >> >>>>>>>> On Wed, Jul 24, 2019 at 6:21 AM
>     Rakesh Kumar
>     >     >         >     <rakeshku...@lyft.com
>     <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com
>     <mailto:rakeshku...@lyft.com>>
>     >     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
>     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>
>     >     >         <mailto:rakeshku...@lyft.com
>     <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com
>     <mailto:rakeshku...@lyft.com>>
>     >     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
>     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>>> wrote:
>     >     >         >     >> >>>>>>>> >
>     >     >         >     >> >>>>>>>> > Thanks Robert,
>     >     >         >     >> >>>>>>>> >
>     >     >         >     >> >>>>>>>> >  I stumble on the jira that you have
>     >     created
>     >     >         some time ago
>     >     >         >     >> >>>>>>>> >
>     >     https://jira.apache.org/jira/browse/BEAM-5428
>     >     >         >     >> >>>>>>>> >
>     >     >         >     >> >>>>>>>> > You also marked code where code
>     >     changes are
>     >     >         required:
>     >     >         >     >> >>>>>>>> >
>     >     >         >   
>     >     >       
>     >   
>        
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
>     >     >         >     >> >>>>>>>> >
>     >     >         >   
>     >     >       
>     >   
>        
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>     >     >         >     >> >>>>>>>> >
>     >     >         >   
>     >     >       
>     >   
>        
> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
>     >     >         >     >> >>>>>>>> >
>     >     >         >     >> >>>>>>>> > I am willing to provide help to
>     implement
>     >     >         this. Let me
>     >     >         >     know how I can help.
>     >     >         >     >> >>>>>>>>
>     >     >         >     >> >>>>>>>> As far as I'm aware, no one is
>     actively
>     >     >         working on it
>     >     >         >     right now.
>     >     >         >     >> >>>>>>>> Please feel free to assign
>     yourself the JIRA
>     >     >         entry and
>     >     >         >     I'll be happy
>     >     >         >     >> >>>>>>>> to answer any questions you might
>     have if
>     >     >         (well probably
>     >     >         >     when) these
>     >     >         >     >> >>>>>>>> pointers are insufficient.
>     >     >         >     >> >>>>>>>>
>     >     >         >     >> >>>>>>>> > On Tue, Jul 23, 2019 at 3:47 AM
>     Robert
>     >     Bradshaw
>     >     >         >     <rober...@google.com
>     <mailto:rober...@google.com> <mailto:rober...@google.com
>     <mailto:rober...@google.com>>
>     >     <mailto:rober...@google.com <mailto:rober...@google.com>
>     <mailto:rober...@google.com <mailto:rober...@google.com>>>
>     >     >         <mailto:rober...@google.com
>     <mailto:rober...@google.com> <mailto:rober...@google.com
>     <mailto:rober...@google.com>>
>     >     <mailto:rober...@google.com <mailto:rober...@google.com>
>     <mailto:rober...@google.com <mailto:rober...@google.com>>>>> wrote:
>     >     >         >     >> >>>>>>>> >>
>     >     >         >     >> >>>>>>>> >> This is documented at
>     >     >         >     >> >>>>>>>> >>
>     >     >         >   
>     >     >       
>     >   
>        
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>     >     >         >     >> >>>>>>>> >> . Note that it requires
>     participation of
>     >     >         both the
>     >     >         >     runner and the SDK
>     >     >         >     >> >>>>>>>> >> (though there are no correctness
>     >     issues if
>     >     >         one or the
>     >     >         >     other side does
>     >     >         >     >> >>>>>>>> >> not understand the protocol,
>     caching just
>     >     >         won't be used).
>     >     >         >     >> >>>>>>>> >>
>     >     >         >     >> >>>>>>>> >> I don't think it's been implemented
>     >     >         anywhere, but
>     >     >         >     could be very
>     >     >         >     >> >>>>>>>> >> beneficial for performance.
>     >     >         >     >> >>>>>>>> >>
>     >     >         >     >> >>>>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM
>     >     Rakesh Kumar
>     >     >         >     <rakeshku...@lyft.com
>     <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com
>     <mailto:rakeshku...@lyft.com>>
>     >     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
>     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>
>     >     >         <mailto:rakeshku...@lyft.com
>     <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com
>     <mailto:rakeshku...@lyft.com>>
>     >     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
>     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>>> wrote:
>     >     >         >     >> >>>>>>>> >> >
>     >     >         >     >> >>>>>>>> >> > I checked the python sdk[1]
>     and it has
>     >     >         similar
>     >     >         >     implementation as Java SDK.
>     >     >         >     >> >>>>>>>> >> >
>     >     >         >     >> >>>>>>>> >> > I would agree with Thomas. In
>     case of
>     >     >         high volume
>     >     >         >     event stream and bigger cluster size, network
>     call can
>     >     >         potentially
>     >     >         >     cause a bottleneck.
>     >     >         >     >> >>>>>>>> >> >
>     >     >         >     >> >>>>>>>> >> > @Robert
>     >     >         >     >> >>>>>>>> >> > I am interested to see the
>     >     proposal. Can you
>     >     >         >     provide me the link of the proposal?
>     >     >         >     >> >>>>>>>> >> >
>     >     >         >     >> >>>>>>>> >> > [1]:
>     >     >         >   
>     >     >       
>     >   
>        
> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>     >     >         >     >> >>>>>>>> >> >
>     >     >         >     >> >>>>>>>> >> >
>     >     >         >     >> >>>>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM
>     >     Thomas Weise
>     >     >         >     <t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>
>     >     <mailto:t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>>
>     >     >         <mailto:t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>
>     >     <mailto:t...@apache.org <mailto:t...@apache.org>
>     <mailto:t...@apache.org <mailto:t...@apache.org>>>>> wrote:
>     >     >         >     >> >>>>>>>> >> >>
>     >     >         >     >> >>>>>>>> >> >> Thanks for the pointer. For
>     streaming,
>     >     >         it will be
>     >     >         >     important to support caching across bundles.
>     It appears
>     >     >         that even
>     >     >         >     the Java SDK doesn't support that yet?
>     >     >         >     >> >>>>>>>> >> >>
>     >     >         >     >> >>>>>>>> >> >>
>     >     >         >   
>     >     >       
>     >   
>        
> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>     >     >         >     >> >>>>>>>> >> >>
>     >     >         >     >> >>>>>>>> >> >> Regarding clear/append: It would
>     >     be nice
>     >     >         if both
>     >     >         >     could occur within a single Fn Api roundtrip when
>     >     the state is
>     >     >         >     persisted.
>     >     >         >     >> >>>>>>>> >> >>
>     >     >         >     >> >>>>>>>> >> >> Thanks,
>     >     >         >     >> >>>>>>>> >> >> Thomas
>     >     >         >     >> >>>>>>>> >> >>
>     >     >         >     >> >>>>>>>> >> >>
>     >     >         >     >> >>>>>>>> >> >>
>     >     >         >     >> >>>>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM
>     >     Lukasz Cwik
>     >     >         >     <lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>
>     >     <mailto:lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>>
>     >     >         <mailto:lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>
>     >     <mailto:lc...@google.com <mailto:lc...@google.com>
>     <mailto:lc...@google.com <mailto:lc...@google.com>>>>> wrote:
>     >     >         >     >> >>>>>>>> >> >>>
>     >     >         >     >> >>>>>>>> >> >>> User state is built on top
>     of read,
>     >     >         append and
>     >     >         >     clear and not off a read and write paradigm to
>     allow for
>     >     >         blind appends.
>     >     >         >     >> >>>>>>>> >> >>>
>     >     >         >     >> >>>>>>>> >> >>> The optimization you speak
>     of can
>     >     be done
>     >     >         >     completely inside the SDK without any additional
>     >     protocol
>     >     >         being
>     >     >         >     required as long as you clear the state first
>     and then
>     >     >         append all
>     >     >         >     your new data. The Beam Java SDK does this for all
>     >     runners
>     >     >         when
>     >     >         >     executed portably[1]. You could port the same
>     logic
>     >     to the
>     >     >         Beam
>     >     >         >     Python SDK as well.
>     >     >         >     >> >>>>>>>> >> >>>
>     >     >         >     >> >>>>>>>> >> >>> 1:
>     >     >         >   
>     >     >       
>     >   
>        
> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>     >     >         >     >> >>>>>>>> >> >>>
>     >     >         >     >> >>>>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM
>     >     Robert
>     >     >         Bradshaw
>     >     >         >     <rober...@google.com
>     <mailto:rober...@google.com> <mailto:rober...@google.com
>     <mailto:rober...@google.com>>
>     >     <mailto:rober...@google.com <mailto:rober...@google.com>
>     <mailto:rober...@google.com <mailto:rober...@google.com>>>
>     >     >         <mailto:rober...@google.com
>     <mailto:rober...@google.com> <mailto:rober...@google.com
>     <mailto:rober...@google.com>>
>     >     <mailto:rober...@google.com <mailto:rober...@google.com>
>     <mailto:rober...@google.com <mailto:rober...@google.com>>>>> wrote:
>     >     >         >     >> >>>>>>>> >> >>>>
>     >     >         >     >> >>>>>>>> >> >>>> Python workers also have a
>     >     per-bundle
>     >     >         SDK-side
>     >     >         >     cache. A protocol has
>     >     >         >     >> >>>>>>>> >> >>>> been proposed, but hasn't
>     yet been
>     >     >         implemented
>     >     >         >     in any SDKs or runners.
>     >     >         >     >> >>>>>>>> >> >>>>
>     >     >         >     >> >>>>>>>> >> >>>> On Tue, Jul 16, 2019 at
>     6:02 AM
>     >     Reuven Lax
>     >     >         >     <re...@google.com <mailto:re...@google.com>
>     <mailto:re...@google.com <mailto:re...@google.com>>
>     >     <mailto:re...@google.com <mailto:re...@google.com>
>     <mailto:re...@google.com <mailto:re...@google.com>>>
>     >     >         <mailto:re...@google.com <mailto:re...@google.com>
>     <mailto:re...@google.com <mailto:re...@google.com>>
>     >     <mailto:re...@google.com <mailto:re...@google.com>
>     <mailto:re...@google.com <mailto:re...@google.com>>>>> wrote:
>     >     >         >     >> >>>>>>>> >> >>>> >
>     >     >         >     >> >>>>>>>> >> >>>> > It's runner dependent. Some
>     >     runners
>     >     >         (e.g. the
>     >     >         >     Dataflow runner) do have such a cache, though I
>     >     think it's
>     >     >         currently
>     >     >         >     has a cap for large bags.
>     >     >         >     >> >>>>>>>> >> >>>> >
>     >     >         >     >> >>>>>>>> >> >>>> > Reuven
>     >     >         >     >> >>>>>>>> >> >>>> >
>     >     >         >     >> >>>>>>>> >> >>>> > On Mon, Jul 15, 2019 at
>     8:48 PM
>     >     >         Rakesh Kumar
>     >     >         >     <rakeshku...@lyft.com
>     <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com
>     <mailto:rakeshku...@lyft.com>>
>     >     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
>     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>
>     >     >         <mailto:rakeshku...@lyft.com
>     <mailto:rakeshku...@lyft.com> <mailto:rakeshku...@lyft.com
>     <mailto:rakeshku...@lyft.com>>
>     >     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>
>     <mailto:rakeshku...@lyft.com <mailto:rakeshku...@lyft.com>>>>> wrote:
>     >     >         >     >> >>>>>>>> >> >>>> >>
>     >     >         >     >> >>>>>>>> >> >>>> >> Hi,
>     >     >         >     >> >>>>>>>> >> >>>> >>
>     >     >         >     >> >>>>>>>> >> >>>> >> I have been using
>     python sdk
>     >     for the
>     >     >         >     application and also using BagState in
>     production. I was
>     >     >         wondering
>     >     >         >     whether state logic has any write-through-cache
>     >     >         implemented or not.
>     >     >         >     If we are sending every read and write request
>     through
>     >     >         network then
>     >     >         >     it comes with a performance cost. We can avoid
>     network
>     >     >         call for a
>     >     >         >     read operation if we have write-through-cache.
>     >     >         >     >> >>>>>>>> >> >>>> >> I have superficially looked
>     >     into the
>     >     >         >     implementation and I didn't see any cache
>     >     implementation.
>     >     >         >     >> >>>>>>>> >> >>>> >>
>     >     >         >     >> >>>>>>>> >> >>>> >> is it possible to have this
>     >     cache?
>     >     >         would it
>     >     >         >     cause any issue if we have the caching layer?
>     >     >         >     >> >>>>>>>> >> >>>> >>
>     >     >         >
>     >     >
>     >
> 

Reply via email to