Glad that Robert mentioned that timer payload design. I had an offline talk just recently mentioning it, and the other person did not see the point, since you can do it with state :-). I think it is elegant and simple to have timer payloads, and I am very happy that portability has gone this route. Reza brings up some cases, though, where you need the metadata in @ProcessElement before the timer fires. So, both are useful.
One thing that has no design that I know of is timers in merging windows. In today's design, it is feasible to have a combiner for the target timestamp, but this has a lot of drawbacks in the future: - This is less clear once delivery instructions are separated from output event time timestamp / wm hold. - If you have some state tracking metadata about the timer, you need to update it. Having two coupled combiners for the metadata and the timer seems complex compared to @OnMerge. - @OnMerge would be allowed to have side effects - @OnMerge might need to emit retractions Of course, I don't know of a design for @OnMerge, either. It could also be complex for a user. Kenn On Sun, Apr 28, 2019 at 6:43 PM Reza Rokni <[email protected]> wrote: > > @Robert Bradshaw <[email protected]> Some examples, mostly built out > from cases around Timeseries data, don't want to derail this thread so at a > hi level : > > Looping timers, a timer which allows for creation of a value within a > window when no external input has been seen. Requires metadata like "is > timer set". > > BiTemporalStream join, where we need to match leftCol.timestamp to a value > == (max(rightCol.timestamp) where rightCol.timestamp <= > leftCol.timestamp)) , this if for a application matching trades to quotes. > Metadata is used for > > - Taking the Key from the KV for use within the OnTimer call. > - Knowing where we are in watermarks for GC of objects in state. > - More timer metadata (min timer ..) > > It could be argued that what we are using state for mostly workarounds for > things that could eventually end up in the API itself. For example > > - There is a Jira for OnTimer Context to have Key. > - The GC needs are mostly due to not having a Map State object in all > runners yet. > > However I think as folks explore Beam there will always be little things > that require Metadata and so having access to something which gives us fine > grain control ( as Kenneth mentioned) is useful. > > Cheers > > Reza > > On Sat, 27 Apr 2019 at 02:59, Kenneth Knowles <[email protected]> wrote: > >> To be clear, the intent was always that ValueState would be not usable in >> merging pipelines. So no danger of clobbering, but also limited >> functionality. Is there a runner than accepts it and clobbers? The whole >> idea of the new DoFn is that it is easy to do the construction-time >> analysis and reject the invalid pipeline. It is actually runner independent >> and I think already implemented in ParDo's validation, no? >> >> Kenn >> >> On Fri, Apr 26, 2019 at 10:14 AM Lukasz Cwik <[email protected]> wrote: >> >>> I am in the camp where we should only support merging state (either >>> naturally via things like bags or via combiners). I believe that having the >>> wrapper that Brian suggests is useful for users. As for the @OnMerge >>> method, I believe combiners should have the ability to look at the window >>> information and we should treat @OnMerge as syntactic sugar over a combiner >>> if the combiner API is too cumbersome. >>> >>> I believe using combiners can also extend to side inputs and help us >>> deal with singleton and map like side inputs when multiple firings occur. I >>> also like treating everything like a combiner because it will give us a lot >>> reuse of combiner implementations across all the places they could be used >>> and will be especially useful when we start exposing APIs related to >>> retractions on combiners. >>> >>> On Fri, Apr 26, 2019 at 9:43 AM Brian Hulette <[email protected]> >>> wrote: >>> >>>> Yeah the danger with out of order processing concerns me more than the >>>> merging as well. As a new Beam user, I immediately gravitated towards >>>> ValueState since it was easy to think about and I just assumed there wasn't >>>> anything to be concerned about. So it was shocking to learn that there is >>>> this dangerous edge-case. >>>> >>>> What if ValueState were just implemented as a wrapper of CombiningState >>>> with a LatestCombineFn and documented as such (and perhaps we encourage >>>> users to consider using a CombiningState explicitly if at all possible)? >>>> >>>> Brian >>>> >>>> >>>> >>>> On Fri, Apr 26, 2019 at 2:29 AM Robert Bradshaw <[email protected]> >>>> wrote: >>>> >>>>> On Fri, Apr 26, 2019 at 6:40 AM Kenneth Knowles <[email protected]> >>>>> wrote: >>>>> > >>>>> > You could use a CombiningState with a CombineFn that returns the >>>>> minimum for this case. >>>>> >>>>> We've also wanted to be able to set data when setting a timer that >>>>> would be returned when the timer fires. (It's in the FnAPI, but not >>>>> the SDKs yet.) >>>>> >>>>> The metadata is an interesting usecase, do you have some more specific >>>>> examples? Might boil down to not having a rich enough (single) state >>>>> type. >>>>> >>>>> > But I've come to feel there is a mismatch. On the one hand, >>>>> ParDo(<stateful DoFn>) is a way to drop to a lower level and write logic >>>>> that does not fit a more general computational pattern, really taking fine >>>>> control. On the other hand, automatically merging state via CombiningState >>>>> or BagState is more of a no-knobs higher level of programming. To me there >>>>> seems to be a bit of a philosophical conflict. >>>>> > >>>>> > These days, I feel like an @OnMerge method would be more natural. If >>>>> you are using state and timers, you probably often want more direct >>>>> control >>>>> over how state from windows gets merged. An of course we don't even have a >>>>> design for timers - you would need some kind of timestamp CombineFn but I >>>>> think setting/unsetting timers manually makes more sense. Especially >>>>> considering the trickiness around merging windows in the absence of >>>>> retractions, you really need this callback, so you can issue retractions >>>>> manually for any output your stateful DoFn emitted in windows that no >>>>> longer exist. >>>>> >>>>> I agree we'll probably need an @OnMerge. On the other hand, I like >>>>> being able to have good defaults. The high/low level thing is a >>>>> continuum (the indexing example falling towards the high end). >>>>> >>>>> Actually, the merging questions bother me less than how easy it is to >>>>> accidentally clobber previous values. It looks so easy (like the >>>>> easiest state to use) but is actually the most dangerous. If one wants >>>>> this behavior, I would rather an explicit AnyCombineFn or >>>>> LatestCombineFn which makes you think about the semantics. >>>>> >>>>> - Robert >>>>> >>>>> > On Thu, Apr 25, 2019 at 5:49 PM Reza Rokni <[email protected]> wrote: >>>>> >> >>>>> >> +1 on the metadata use case. >>>>> >> >>>>> >> For performance reasons the Timer API does not support a read() >>>>> operation, which for the vast majority of use cases is not a required >>>>> feature. In the small set of use cases where it is needed, for example >>>>> when >>>>> you need to set a Timer in EventTime based on the smallest timestamp seen >>>>> in the elements within a DoFn, we can make use of a ValueState object to >>>>> keep track of the value. >>>>> >> >>>>> >> On Fri, 26 Apr 2019 at 00:38, Reuven Lax <[email protected]> wrote: >>>>> >>> >>>>> >>> I see examples of people using ValueState that I think are not >>>>> captured CombiningState. For example, one common one is users who set a >>>>> timer and then record the timestamp of that timer in a ValueState. In >>>>> general when you store state that is metadata about other state you store, >>>>> then ValueState will usually make more sense than CombiningState. >>>>> >>> >>>>> >>> On Thu, Apr 25, 2019 at 9:32 AM Brian Hulette <[email protected]> >>>>> wrote: >>>>> >>>> >>>>> >>>> Currently the Python SDK does not make ValueState available to >>>>> users. My initial inclination was to go ahead and implement it there to be >>>>> consistent with Java, but Robert brings up a great point here that >>>>> ValueState has an inherent race condition for out of order data, and a lot >>>>> of it's use cases can actually be implemented with a CombiningState >>>>> instead. >>>>> >>>> >>>>> >>>> It seems to me that at the very least we should discourage the >>>>> use of ValueState by noting the danger in the documentation and preferring >>>>> CombiningState in examples, and perhaps we should go further and deprecate >>>>> it in Java and not implement it in python. Either way I think we should be >>>>> consistent between Java and Python. >>>>> >>>> >>>>> >>>> I'm curious what people think about this, are there use cases >>>>> that we really need to keep ValueState around for? >>>>> >>>> >>>>> >>>> Brian >>>>> >>>> >>>>> >>>> ---------- Forwarded message --------- >>>>> >>>> From: Robert Bradshaw <[email protected]> >>>>> >>>> Date: Thu, Apr 25, 2019, 08:31 >>>>> >>>> Subject: Re: [docs] Python State & Timers >>>>> >>>> To: dev <[email protected]> >>>>> >>>> >>>>> >>>> >>>>> >>>> >>>>> >>>> >>>>> >>>> On Thu, Apr 25, 2019, 5:26 PM Maximilian Michels <[email protected]> >>>>> wrote: >>>>> >>>>> >>>>> >>>>> Completely agree that CombiningState is nicer in this example. >>>>> Users may >>>>> >>>>> still want to use ValueState when there is nothing to combine. >>>>> >>>> >>>>> >>>> >>>>> >>>> I've always had trouble coming up with any good examples of this. >>>>> >>>> >>>>> >>>>> Also, >>>>> >>>>> users already know ValueState from the Java SDK. >>>>> >>>> >>>>> >>>> >>>>> >>>> Maybe we should deprecate that :) >>>>> >>>> >>>>> >>>> >>>>> >>>>> On 25.04.19 17:12, Robert Bradshaw wrote: >>>>> >>>>> > On Thu, Apr 25, 2019 at 4:58 PM Maximilian Michels < >>>>> [email protected]> wrote: >>>>> >>>>> >> >>>>> >>>>> >> I forgot to give an example, just to clarify for others: >>>>> >>>>> >> >>>>> >>>>> >>> What was the specific example that was less natural? >>>>> >>>>> >> >>>>> >>>>> >> Basically every time we use ListState to express ValueState, >>>>> e.g. >>>>> >>>>> >> >>>>> >>>>> >> next_index, = list(state.read()) or [0] >>>>> >>>>> >> >>>>> >>>>> >> Taken from: >>>>> >>>>> >> >>>>> https://github.com/apache/beam/pull/8363/files#diff-ba1a2aed98079ccce869cd660ca9d97dR301 >>>>> >>>>> > >>>>> >>>>> > Yes, ListState is much less natural here. I think generally >>>>> >>>>> > CombiningValue is often a better replacement. E.g. the Java >>>>> example >>>>> >>>>> > reads >>>>> >>>>> > >>>>> >>>>> > >>>>> >>>>> > public void processElement( >>>>> >>>>> > ProcessContext context, @StateId("index") >>>>> ValueState<Integer> index) { >>>>> >>>>> > int current = firstNonNull(index.read(), 0); >>>>> >>>>> > context.output(KV.of(current, context.element())); >>>>> >>>>> > index.write(current+1); >>>>> >>>>> > } >>>>> >>>>> > >>>>> >>>>> > >>>>> >>>>> > which is replaced with bag state >>>>> >>>>> > >>>>> >>>>> > >>>>> >>>>> > def process(self, element, state=DoFn.StateParam(INDEX_STATE)): >>>>> >>>>> > next_index, = list(state.read()) or [0] >>>>> >>>>> > yield (element, next_index) >>>>> >>>>> > state.clear() >>>>> >>>>> > state.add(next_index + 1) >>>>> >>>>> > >>>>> >>>>> > >>>>> >>>>> > whereas CombiningState would be more natural (than ListState, >>>>> and >>>>> >>>>> > arguably than even ValueState), giving >>>>> >>>>> > >>>>> >>>>> > >>>>> >>>>> > def process(self, element, index=DoFn.StateParam(INDEX_STATE)): >>>>> >>>>> > yield element, index.read() >>>>> >>>>> > index.add(1) >>>>> >>>>> > >>>>> >>>>> > >>>>> >>>>> > >>>>> >>>>> > >>>>> >>>>> >> >>>>> >>>>> >> -Max >>>>> >>>>> >> >>>>> >>>>> >> On 25.04.19 16:40, Robert Bradshaw wrote: >>>>> >>>>> >>> https://github.com/apache/beam/pull/8402 >>>>> >>>>> >>> >>>>> >>>>> >>> On Thu, Apr 25, 2019 at 4:26 PM Robert Bradshaw < >>>>> [email protected]> wrote: >>>>> >>>>> >>>> >>>>> >>>>> >>>> Oh, this is for the indexing example. >>>>> >>>>> >>>> >>>>> >>>>> >>>> I actually think using CombiningState is more cleaner than >>>>> ValueState. >>>>> >>>>> >>>> >>>>> >>>>> >>>> >>>>> https://github.com/apache/beam/blob/release-2.12.0/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py#L262 >>>>> >>>>> >>>> >>>>> >>>>> >>>> (The fact that one must specify the accumulator coder is, >>>>> however, >>>>> >>>>> >>>> unfortunate. We should probably infer that if we can.) >>>>> >>>>> >>>> >>>>> >>>>> >>>> On Thu, Apr 25, 2019 at 4:19 PM Robert Bradshaw < >>>>> [email protected]> wrote: >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> The desire was to avoid the implicit disallowed >>>>> combination wart in >>>>> >>>>> >>>>> Python (until we could make sense of it), and also >>>>> ValueState could be >>>>> >>>>> >>>>> surprising with respect to older values overwriting newer >>>>> ones. What >>>>> >>>>> >>>>> was the specific example that was less natural? >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Apr 25, 2019 at 3:01 PM Maximilian Michels < >>>>> [email protected]> wrote: >>>>> >>>>> >>>>>> >>>>> >>>>> >>>>>> @Pablo: Thanks for following up with the PR! :) >>>>> >>>>> >>>>>> >>>>> >>>>> >>>>>> @Brian: I was wondering about this as well. It makes the >>>>> Python state >>>>> >>>>> >>>>>> code a bit unnatural. I'd suggest to add a ValueState >>>>> wrapper around >>>>> >>>>> >>>>>> ListState/CombiningState. >>>>> >>>>> >>>>>> >>>>> >>>>> >>>>>> @Robert: Like Reuven pointed out, we can disallow >>>>> ValueState for merging >>>>> >>>>> >>>>>> windows with state. >>>>> >>>>> >>>>>> >>>>> >>>>> >>>>>> @Reza: Great. Let's make sure it has Python examples out >>>>> of the box. >>>>> >>>>> >>>>>> Either Pablo or me could help there. >>>>> >>>>> >>>>>> >>>>> >>>>> >>>>>> Thanks, >>>>> >>>>> >>>>>> Max >>>>> >>>>> >>>>>> >>>>> >>>>> >>>>>> On 25.04.19 04:14, Reza Ardeshir Rokni wrote: >>>>> >>>>> >>>>>>> Pablo, Kenneth and I have a new blog ready for >>>>> publication which covers >>>>> >>>>> >>>>>>> how to create a "looping timer" it allows for default >>>>> values to be >>>>> >>>>> >>>>>>> created in a window when no incoming elements exists. We >>>>> just need to >>>>> >>>>> >>>>>>> clear a few bits before publication, but would be great >>>>> to have that >>>>> >>>>> >>>>>>> also include a python example, I wrote it in java... >>>>> >>>>> >>>>>>> >>>>> >>>>> >>>>>>> Cheers >>>>> >>>>> >>>>>>> >>>>> >>>>> >>>>>>> Reza >>>>> >>>>> >>>>>>> >>>>> >>>>> >>>>>>> On Thu, 25 Apr 2019 at 04:34, Reuven Lax < >>>>> [email protected] >>>>> >>>>> >>>>>>> <mailto:[email protected]>> wrote: >>>>> >>>>> >>>>>>> >>>>> >>>>> >>>>>>> Well state is still not implemented for merging >>>>> windows even for >>>>> >>>>> >>>>>>> Java (though I believe the idea was to disallow >>>>> ValueState there). >>>>> >>>>> >>>>>>> >>>>> >>>>> >>>>>>> On Wed, Apr 24, 2019 at 1:11 PM Robert Bradshaw < >>>>> [email protected] >>>>> >>>>> >>>>>>> <mailto:[email protected]>> wrote: >>>>> >>>>> >>>>>>> >>>>> >>>>> >>>>>>> It was unclear what the semantics were for >>>>> ValueState for merging >>>>> >>>>> >>>>>>> windows. (It's also a bit weird as it's >>>>> inherently a race condition >>>>> >>>>> >>>>>>> wrt element ordering, unlike Bag and >>>>> CombineState, though you can >>>>> >>>>> >>>>>>> always implement it as a CombineState that >>>>> always returns the latest >>>>> >>>>> >>>>>>> value which is a bit more explicit about the >>>>> dangers here.) >>>>> >>>>> >>>>>>> >>>>> >>>>> >>>>>>> On Wed, Apr 24, 2019 at 10:08 PM Brian Hulette >>>>> >>>>> >>>>>>> <[email protected] <mailto: >>>>> [email protected]>> wrote: >>>>> >>>>> >>>>>>> > >>>>> >>>>> >>>>>>> > That's a great idea! I thought about this >>>>> too after those >>>>> >>>>> >>>>>>> posts came up on the list recently. I started >>>>> to look into it, >>>>> >>>>> >>>>>>> but I noticed that there's actually no >>>>> implementation of >>>>> >>>>> >>>>>>> ValueState in userstate. Is there a reason for >>>>> that? I started >>>>> >>>>> >>>>>>> to work on a patch to add it but I was just >>>>> curious if there was >>>>> >>>>> >>>>>>> some reason it was omitted that I should be >>>>> aware of. >>>>> >>>>> >>>>>>> > >>>>> >>>>> >>>>>>> > We could certainly replicate the example >>>>> without ValueState >>>>> >>>>> >>>>>>> by using BagState and clearing it before each >>>>> write, but it >>>>> >>>>> >>>>>>> would be nice if we could draw a direct >>>>> parallel. >>>>> >>>>> >>>>>>> > >>>>> >>>>> >>>>>>> > Brian >>>>> >>>>> >>>>>>> > >>>>> >>>>> >>>>>>> > On Fri, Apr 12, 2019 at 7:05 AM Maximilian >>>>> Michels >>>>> >>>>> >>>>>>> <[email protected] <mailto:[email protected]>> >>>>> wrote: >>>>> >>>>> >>>>>>> >> >>>>> >>>>> >>>>>>> >> > It would probably be pretty easy to add >>>>> the corresponding >>>>> >>>>> >>>>>>> code snippets to the docs as well. >>>>> >>>>> >>>>>>> >> >>>>> >>>>> >>>>>>> >> It's probably a bit more work because >>>>> there is no section >>>>> >>>>> >>>>>>> dedicated to >>>>> >>>>> >>>>>>> >> state/timer yet in the documentation. >>>>> Tracked here: >>>>> >>>>> >>>>>>> >> >>>>> https://jira.apache.org/jira/browse/BEAM-2472 >>>>> >>>>> >>>>>>> >> >>>>> >>>>> >>>>>>> >> > I've been going over this topic a bit. >>>>> I'll add the >>>>> >>>>> >>>>>>> snippets next week, if that's fine by y'all. >>>>> >>>>> >>>>>>> >> >>>>> >>>>> >>>>>>> >> That would be great. The blog posts are a >>>>> great way to get >>>>> >>>>> >>>>>>> started with >>>>> >>>>> >>>>>>> >> state/timers. >>>>> >>>>> >>>>>>> >> >>>>> >>>>> >>>>>>> >> Thanks, >>>>> >>>>> >>>>>>> >> Max >>>>> >>>>> >>>>>>> >> >>>>> >>>>> >>>>>>> >> On 11.04.19 20:21, Pablo Estrada wrote: >>>>> >>>>> >>>>>>> >> > I've been going over this topic a bit. >>>>> I'll add the >>>>> >>>>> >>>>>>> snippets next week, >>>>> >>>>> >>>>>>> >> > if that's fine by y'all. >>>>> >>>>> >>>>>>> >> > Best >>>>> >>>>> >>>>>>> >> > -P. >>>>> >>>>> >>>>>>> >> > >>>>> >>>>> >>>>>>> >> > On Thu, Apr 11, 2019 at 5:27 AM Robert >>>>> Bradshaw >>>>> >>>>> >>>>>>> <[email protected] <mailto: >>>>> [email protected]> >>>>> >>>>> >>>>>>> >> > <mailto:[email protected] <mailto: >>>>> [email protected]>>> >>>>> >>>>> >>>>>>> wrote: >>>>> >>>>> >>>>>>> >> > >>>>> >>>>> >>>>>>> >> > That's a great idea! It would >>>>> probably be pretty easy >>>>> >>>>> >>>>>>> to add the >>>>> >>>>> >>>>>>> >> > corresponding code snippets to the >>>>> docs as well. >>>>> >>>>> >>>>>>> >> > >>>>> >>>>> >>>>>>> >> > On Thu, Apr 11, 2019 at 2:00 PM >>>>> Maximilian Michels >>>>> >>>>> >>>>>>> <[email protected] <mailto:[email protected]> >>>>> >>>>> >>>>>>> >> > <mailto:[email protected] <mailto: >>>>> [email protected]>>> wrote: >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>> >>>>>>> >> > > Hi everyone, >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>> >>>>>>> >> > > The Python SDK still lacks >>>>> documentation on state >>>>> >>>>> >>>>>>> and timers. >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>> >>>>>>> >> > > As a first step, what do you >>>>> think about updating >>>>> >>>>> >>>>>>> these two blog >>>>> >>>>> >>>>>>> >> > posts >>>>> >>>>> >>>>>>> >> > > with the corresponding Python >>>>> code? >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>> >>>>>>> >>>>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>> >>>>>>> >>>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>> >>>>>>> >> > > Thanks, >>>>> >>>>> >>>>>>> >> > > Max >>>>> >>>>> >>>>>>> >> > >>>>> >>>>> >>>>>>> >>>>> >> >>>>> >> >>>>> >> >>>>> >> -- >>>>> >> >>>>> >> This email may be confidential and privileged. If you received this >>>>> communication by mistake, please don't forward it to anyone else, please >>>>> erase all copies and attachments, and please let me know that it has gone >>>>> to the wrong person. >>>>> >> >>>>> >> The above terms reflect a potential business arrangement, are >>>>> provided solely as a basis for further discussion, and are not intended to >>>>> be and do not constitute a legally binding obligation. No legally binding >>>>> obligations will be created, implied, or inferred until an agreement in >>>>> final form is executed in writing by all parties involved. >>>>> >>>> > > -- > > This email may be confidential and privileged. If you received this > communication by mistake, please don't forward it to anyone else, please > erase all copies and attachments, and please let me know that it has gone > to the wrong person. > > The above terms reflect a potential business arrangement, are provided > solely as a basis for further discussion, and are not intended to be and do > not constitute a legally binding obligation. No legally binding obligations > will be created, implied, or inferred until an agreement in final form is > executed in writing by all parties involved. >
