Now I see what you mean.

On Mon, Aug 5, 2019 at 5:42 PM Thomas Weise <t...@apache.org> wrote:

> Hi Luke,
>
> I guess the answer is that it depends on the state backend. If a set
> operation in the state backend is available that is more efficient than
> clear+append, then it would be beneficial to have a dedicated fn api
> operation to allow for such optimization. That's something that needs to be
> determined with a profiler :)
>
> But the low hanging fruit is cross-bundle caching.
>
> Thomas
>
> On Mon, Aug 5, 2019 at 2:06 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> Thomas, why do you think a single round trip is needed?
>>
>> clear + append can be done blindly from the SDK side and it has total
>> knowledge of the state at that point in time till the end of the bundle at
>> which point you want to wait to get the cache token back from the runner
>> for the append call so that for the next bundle you can reuse the state if
>> the key wasn't processed elsewhere.
>>
>> Also, all state calls are "streamed" over gRPC so you don't need to wait
>> for clear to complete before being able to send append.
>>
>> On Tue, Jul 30, 2019 at 12:58 AM jincheng sun <sunjincheng...@gmail.com>
>> wrote:
>>
>>> Hi Rakesh,
>>>
>>> Glad to see you pointer this problem out!
>>> +1 for add this implementation. Manage State by write-through-cache is
>>> pretty important for Streaming job!
>>>
>>> Best, Jincheng
>>>
>>> Thomas Weise <t...@apache.org> 于2019年7月29日周一 下午8:54写道:
>>>
>>>> FYI a basic test appears to confirm the importance of the cross-bundle
>>>> caching: I found that the throughput can be increased by playing with the
>>>> bundle size in the Flink runner. Default caps at 1000 elements (or 1
>>>> second). So on a high throughput stream the bundles would be capped by the
>>>> count limit. Bumping the count limit increases the throughput by reducing
>>>> the chatter over the state plane (more cache hits due to larger bundle).
>>>>
>>>> The next level of investigation would involve profiling. But just by
>>>> looking at metrics, the CPU utilization on the Python worker side dropped
>>>> significantly while on the Flink side it remains nearly same. There are no
>>>> metrics for state operations on either side, I think it would be very
>>>> helpful to get these in place also.
>>>>
>>>> Below the stateful processing code for reference.
>>>>
>>>> Thomas
>>>>
>>>>
>>>> class StatefulFn(beam.DoFn):
>>>>     count_state_spec = userstate.CombiningValueStateSpec(
>>>>         'count', beam.coders.IterableCoder(beam.coders.VarIntCoder()),
>>>> sum)
>>>>     timer_spec = userstate.TimerSpec('timer',
>>>> userstate.TimeDomain.WATERMARK)
>>>>
>>>>     def process(self, kv, count=beam.DoFn.StateParam(count_state_spec),
>>>> timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam):
>>>>         count.add(1)
>>>>         timer_seconds = (window.end.micros // 1000000) - 1
>>>>         timer.set(timer_seconds)
>>>>
>>>>     @userstate.on_timer(timer_spec)
>>>>     def process_timer(self,
>>>> count=beam.DoFn.StateParam(count_state_spec), 
>>>> window=beam.DoFn.WindowParam):
>>>>         if count.read() == 0:
>>>>             logging.warning("###timer fired with count %d, window %s" %
>>>> (count.read(), window))
>>>>
>>>>
>>>>
>>>> On Thu, Jul 25, 2019 at 5:09 AM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> On Wed, Jul 24, 2019 at 6:21 AM Rakesh Kumar <rakeshku...@lyft.com>
>>>>> wrote:
>>>>> >
>>>>> > Thanks Robert,
>>>>> >
>>>>> >  I stumble on the jira that you have created some time ago
>>>>> > https://jira.apache.org/jira/browse/BEAM-5428
>>>>> >
>>>>> > You also marked code where code changes are required:
>>>>> >
>>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L291
>>>>> >
>>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>>>>> >
>>>>> https://github.com/apache/beam/blob/7688bcfc8ebb4bedf26c5c3b3fe0e13c0ec2aa6d/sdks/python/apache_beam/runners/worker/bundle_processor.py#L465
>>>>> >
>>>>> > I am willing to provide help to implement this. Let me know how I
>>>>> can help.
>>>>>
>>>>> As far as I'm aware, no one is actively working on it right now.
>>>>> Please feel free to assign yourself the JIRA entry and I'll be happy
>>>>> to answer any questions you might have if (well probably when) these
>>>>> pointers are insufficient.
>>>>>
>>>>> > On Tue, Jul 23, 2019 at 3:47 AM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>> >>
>>>>> >> This is documented at
>>>>> >>
>>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
>>>>> >> . Note that it requires participation of both the runner and the SDK
>>>>> >> (though there are no correctness issues if one or the other side
>>>>> does
>>>>> >> not understand the protocol, caching just won't be used).
>>>>> >>
>>>>> >> I don't think it's been implemented anywhere, but could be very
>>>>> >> beneficial for performance.
>>>>> >>
>>>>> >> On Wed, Jul 17, 2019 at 6:00 PM Rakesh Kumar <rakeshku...@lyft.com>
>>>>> wrote:
>>>>> >> >
>>>>> >> > I checked the python sdk[1] and it has similar implementation as
>>>>> Java SDK.
>>>>> >> >
>>>>> >> > I would agree with Thomas. In case of high volume event stream
>>>>> and bigger cluster size, network call can potentially cause a bottleneck.
>>>>> >> >
>>>>> >> > @Robert
>>>>> >> > I am interested to see the proposal. Can you provide me the link
>>>>> of the proposal?
>>>>> >> >
>>>>> >> > [1]:
>>>>> https://github.com/apache/beam/blob/db59a3df665e094f0af17fe4d9df05fe420f3c16/sdks/python/apache_beam/transforms/userstate.py#L295
>>>>> >> >
>>>>> >> >
>>>>> >> > On Tue, Jul 16, 2019 at 9:43 AM Thomas Weise <t...@apache.org>
>>>>> wrote:
>>>>> >> >>
>>>>> >> >> Thanks for the pointer. For streaming, it will be important to
>>>>> support caching across bundles. It appears that even the Java SDK doesn't
>>>>> support that yet?
>>>>> >> >>
>>>>> >> >>
>>>>> https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L221
>>>>> >> >>
>>>>> >> >> Regarding clear/append: It would be nice if both could occur
>>>>> within a single Fn Api roundtrip when the state is persisted.
>>>>> >> >>
>>>>> >> >> Thanks,
>>>>> >> >> Thomas
>>>>> >> >>
>>>>> >> >>
>>>>> >> >>
>>>>> >> >> On Tue, Jul 16, 2019 at 6:58 AM Lukasz Cwik <lc...@google.com>
>>>>> wrote:
>>>>> >> >>>
>>>>> >> >>> User state is built on top of read, append and clear and not
>>>>> off a read and write paradigm to allow for blind appends.
>>>>> >> >>>
>>>>> >> >>> The optimization you speak of can be done completely inside the
>>>>> SDK without any additional protocol being required as long as you clear 
>>>>> the
>>>>> state first and then append all your new data. The Beam Java SDK does this
>>>>> for all runners when executed portably[1]. You could port the same logic 
>>>>> to
>>>>> the Beam Python SDK as well.
>>>>> >> >>>
>>>>> >> >>> 1:
>>>>> https://github.com/apache/beam/blob/41478d00d34598e56471d99d0845ac16efa5b8ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BagUserState.java#L84
>>>>> >> >>>
>>>>> >> >>> On Tue, Jul 16, 2019 at 5:54 AM Robert Bradshaw <
>>>>> rober...@google.com> wrote:
>>>>> >> >>>>
>>>>> >> >>>> Python workers also have a per-bundle SDK-side cache. A
>>>>> protocol has
>>>>> >> >>>> been proposed, but hasn't yet been implemented in any SDKs or
>>>>> runners.
>>>>> >> >>>>
>>>>> >> >>>> On Tue, Jul 16, 2019 at 6:02 AM Reuven Lax <re...@google.com>
>>>>> wrote:
>>>>> >> >>>> >
>>>>> >> >>>> > It's runner dependent. Some runners (e.g. the Dataflow
>>>>> runner) do have such a cache, though I think it's currently has a cap for
>>>>> large bags.
>>>>> >> >>>> >
>>>>> >> >>>> > Reuven
>>>>> >> >>>> >
>>>>> >> >>>> > On Mon, Jul 15, 2019 at 8:48 PM Rakesh Kumar <
>>>>> rakeshku...@lyft.com> wrote:
>>>>> >> >>>> >>
>>>>> >> >>>> >> Hi,
>>>>> >> >>>> >>
>>>>> >> >>>> >> I have been using python sdk for the application and also
>>>>> using BagState in production. I was wondering whether state logic has any
>>>>> write-through-cache implemented or not. If we are sending every read and
>>>>> write request through network then it comes with a performance cost. We 
>>>>> can
>>>>> avoid network call for a read operation if we have write-through-cache.
>>>>> >> >>>> >> I have superficially looked into the implementation and I
>>>>> didn't see any cache implementation.
>>>>> >> >>>> >>
>>>>> >> >>>> >> is it possible to have this cache? would it cause any issue
>>>>> if we have the caching layer?
>>>>> >> >>>> >>
>>>>>
>>>>

Reply via email to