Hi Rakesh, Sorry about the delay, I was away for a while and just now caught up with this. I'm not actively working on this now - thanks for the contribution!
On Tue, Jul 23, 2019 at 9:48 PM Rakesh Kumar <[email protected]> wrote: > Hi Brian/Robert > > I am moving ahead with implementing ReadModifyWriteState. I will just > repurpose this PR: https://github.com/apache/beam/pull/9067 . > > > > On Mon, Jul 15, 2019 at 7:47 PM Rakesh Kumar <[email protected]> wrote: > >> Brian, >> >> I just want to follow up. Let me know if you are working on this. >> Otherwise, I can implement ReadModifyWriteState. >> >> On Mon, May 6, 2019 at 4:52 PM Reza Rokni <[email protected]> wrote: >> >>> When used as metadata I think the ReadModifyWrite naming is very >>> accurate for the majority of cases. >>> >>> The only case that does not follow that pattern is if its being used as >>> a Boolean to indicate that something should be done in the OnTimer call >>> based on an event that has been seen in the OnProcess code. But even then >>> calling it ReadModifyWrite would still be ok and easily understood. >>> >>> *From: *Kenneth Knowles <[email protected]> >>> *Date: *Fri, 3 May 2019 at 10:58 >>> *To: *dev >>> >>> Agree with all of your points about the drawbacks of ValueState. It is >>>> definitely a pro/con weighing sort of situation. Considering the number of >>>> users who are new to the orthogonality of event time and processing time, >>>> ValueState could certainly lead to confusion about why things are not in >>>> any particular order. Perhaps a middle ground is to have a bad of "written >>>> values" with sequence numbers, and document some patterns/examples of how a >>>> user may care to resolve these sequences numbers on read, or perhaps some >>>> combiners like you describe. Overall, I'd defer to Reza and Reuven as they >>>> seem very familiar with real-world use cases here. >>>> >>>> Kenn >>>> >>>> On Thu, May 2, 2019 at 2:53 AM Robert Bradshaw <[email protected]> >>>> wrote: >>>> >>>>> On Wed, May 1, 2019 at 8:09 PM Kenneth Knowles <[email protected]> >>>>> wrote: >>>>> > >>>>> > On Wed, May 1, 2019 at 8:51 AM Reuven Lax <[email protected]> wrote: >>>>> >> >>>>> >> ValueState is not necessarily racy if you're doing a >>>>> read-modify-write. It's only racy if you're doing something like writing >>>>> last element seen. >>>>> > >>>>> > Race conditions are not inherently a problem. They are neither >>>>> necessary nor sufficient for correctness. In this case, it is not the >>>>> classic sense of race condition anyhow, it is simply a nondeterministic >>>>> result, which may often be perfectly fine. >>>>> >>>>> One can write correct code with ValueState, but it's harder to do. >>>>> This is exacerbated by the fact that at first glance it looks easier >>>>> to use. >>>>> >>>>> >>>>> On Wed, May 1, 2019 at 8:30 AM Lukasz Cwik <[email protected]> >>>>> wrote: >>>>> >>>>>> >>>>> >>>>>> Isn't a value state just a bag state with at most one element >>>>> and the usage pattern would be? >>>>> >>>>>> 1) value_state.get == bag_state.read.next() (both have to >>>>> handle the case when neither have been set) >>>>> >>>>>> 2) user logic on what to do with current state + additional >>>>> information to produce new state >>>>> >>>>>> 3) value_state.set == bag_state.clear + bag_state.append? (note >>>>> that Runners should optimize clear + append to become a single >>>>> transaction/write) >>>>> > >>>>> > Your unpacking is accurate, but "X is just a Y" is not accurate. In >>>>> this case you've demonstrated that value state *can be implemented using* >>>>> bag state / has a workaround. But it is not subsumed by bag state. One >>>>> important feature of ValueState is that it is statically determined that >>>>> the transform cannot be used with merging windows. >>>>> >>>>> The flip side is that it makes it easy to write code that cannot be >>>>> used with merging windows. Which hurts composition (especially if >>>>> these operations are used as part of larger composite operations). >>>>> >>>>> > Another feature is that it is impossible to accidentally write more >>>>> than one value. And a third important feature is that it declares what it >>>>> is so that the code is more readable. >>>>> >>>>> +1. Which is why I think CombiningState is a better substitute than >>>>> BagState where it makes sense (and often does, and often can even be >>>>> an improvement over ValueState for performance and readability). >>>>> >>>>> Perhaps instead we could call it ReadModifyWrite state. It could make >>>>> sense, as well as a read() and write() operation, that we even offer a >>>>> modify(I, (I, S) -> S) operation. >>>>> >>>>> (Also, yes, when I said Latest I too meant a hypothetical "throw away >>>>> everything else when a new element is written" one, not the specific >>>>> one in the code. Sorry for the confusion.) >>>>> >>>>> >>>>>> For example, the blog post with the counter example would be: >>>>> >>>>>> @StateId("buffer") >>>>> >>>>>> private final StateSpec<BagState<Event>> bufferedEvents = >>>>> StateSpecs.bag(); >>>>> >>>>>> >>>>> >>>>>> @StateId("count") >>>>> >>>>>> private final StateSpec<BagState<Integer>> countState = >>>>> StateSpecs.bag(); >>>>> >>>>>> >>>>> >>>>>> @ProcessElement >>>>> >>>>>> public void process( >>>>> >>>>>> ProcessContext context, >>>>> >>>>>> @StateId("buffer") BagState<Event> bufferState, >>>>> >>>>>> @StateId("count") BagState<Integer> countState) { >>>>> >>>>>> >>>>> >>>>>> int count = Iterables.getFirst(countState.read(), 0); >>>>> >>>>>> count = count + 1; >>>>> >>>>>> countState.clear(); >>>>> >>>>>> countState.append(count); >>>>> >>>>>> bufferState.add(context.element()); >>>>> >>>>>> >>>>> >>>>>> if (count > MAX_BUFFER_SIZE) { >>>>> >>>>>> for (EnrichedEvent enrichedEvent : >>>>> enrichEvents(bufferState.read())) { >>>>> >>>>>> context.output(enrichedEvent); >>>>> >>>>>> } >>>>> >>>>>> bufferState.clear(); >>>>> >>>>>> countState.clear(); >>>>> >>>>>> } >>>>> >>>>>> } >>>>> >>>>>> >>>>> >>>>>> On Tue, Apr 30, 2019 at 5:39 PM Kenneth Knowles < >>>>> [email protected]> wrote: >>>>> >>>>>>> >>>>> >>>>>>> Anything where the state evolves serially but arbitrarily - >>>>> the toy example is the integer counter in my blog post - needs ValueState. >>>>> You can't do it with AnyCombineFn. And I think LatestCombineFn is >>>>> dangerous, especially when it comes to CombingState. ValueState is more >>>>> explicit, and I still maintain that it is status quo, modulo unimplemented >>>>> features in the Python SDK. The reads and writes are explicitly serial per >>>>> (key, window), unlike for a CombiningState. Because of a CombineFn's >>>>> requirement to be associative and commutative, I would interpret it that >>>>> the runner is allowed to reorder inputs even after they are "written" by >>>>> the user's DoFn - for example doing blind writes to an unordered bag, and >>>>> only later reading the elements out and combining them in arbitrary order. >>>>> Or any other strategy mixing addInput and mergeAccumulators. It would be a >>>>> violation of the meaning of CombineFn to overspecify CombiningState >>>>> further >>>>> than this. So you cannot actually implement a consistent >>>>> CombiningState(LatestCombineFn). >>>>> >>>>>>> >>>>> >>>>>>> Kenn >>>>> >>>>>>> >>>>> >>>>>>> On Tue, Apr 30, 2019 at 5:19 PM Robert Bradshaw < >>>>> [email protected]> wrote: >>>>> >>>>>>>> >>>>> >>>>>>>> On Wed, May 1, 2019 at 1:55 AM Brian Hulette < >>>>> [email protected]> wrote: >>>>> >>>>>>>> > >>>>> >>>>>>>> > Reza - you're definitely not derailing, that's exactly what >>>>> I was looking for! >>>>> >>>>>>>> > >>>>> >>>>>>>> > I've actually recently encountered an additional use case >>>>> where I'd like to use ValueState in the Python SDK. I'm experimenting with >>>>> an ArrowBatchingDoFn that uses state and timers to batch up python >>>>> dictionaries into arrow record batches (actually my entire purpose for >>>>> jumping down this python state rabbit hole). >>>>> >>>>>>>> > >>>>> >>>>>>>> > At first blush it seems like the best way to do this would >>>>> be to just replicate the batching approach in the timely processing post >>>>> [1], but when the bag is full combine the elements into an arrow record >>>>> batch, rather than enriching all of the elements and writing them out >>>>> separately. However, if possible I'd like to pre-allocate buffers for each >>>>> column and populate them as elements arrive (at least for columns with a >>>>> fixed size type), so a bag state wouldn't be ideal. >>>>> >>>>>>>> >>>>> >>>>>>>> It seems it'd be preferable to do the conversion from a bag of >>>>> >>>>>>>> elements to a single arrow frame all at once, when emitting, >>>>> rather >>>>> >>>>>>>> than repeatedly reading and writing the partial batch to and >>>>> from >>>>> >>>>>>>> state with every element that comes in. (Bag state has blind >>>>> append.) >>>>> >>>>>>>> >>>>> >>>>>>>> > Also, a CombiningValueState is not ideal because I'd need >>>>> to implement a merge_accumulators function that combines several >>>>> in-progress batches. I could certainly implement that, but I'd prefer that >>>>> it never be called unless absolutely necessary, which doesn't seem to be >>>>> the case for CombiningValueState. (As an aside, maybe there's some room >>>>> there for a middle ground between ValueState and CombiningValueState >>>>> >>>>>>>> >>>>> >>>>>>>> This does actually feel natural (to me), because you're >>>>> repeatedly >>>>> >>>>>>>> adding elements to build something up. merge_accumulators >>>>> would >>>>> >>>>>>>> probably be pretty easy (concatenation) but unless your >>>>> windows are >>>>> >>>>>>>> merging could just throw a not implemented error to really >>>>> guard >>>>> >>>>>>>> against it being used. >>>>> >>>>>>>> >>>>> >>>>>>>> > I suppose you could argue that this is a pretty low-level >>>>> optimization we should be able to shield our users from, but right now I >>>>> just wish I had ValueState in python so I didn't have to hack it up with a >>>>> BagState :) >>>>> >>>>>>>> > >>>>> >>>>>>>> > Anyway, in light of this and all the other use-cases >>>>> mentioned here, I think the resolution is to just implement ValueState in >>>>> python, and document the danger with ValueState in both Python and Java. >>>>> Just to be clear, the danger I'm referring to is that users might easily >>>>> forget that data can be out of order, and use ValueState in a way that >>>>> assumes it's been populated with data from the most recent element in >>>>> event >>>>> time, then in practice out of order data clobbers their state. I'm happy >>>>> to >>>>> write up a PR for this - are there any objections to that? >>>>> >>>>>>>> >>>>> >>>>>>>> I still haven't seen a good case for it (though I haven't >>>>> looked at >>>>> >>>>>>>> Reza's BiTemporalStream yet). Much harder to remove things >>>>> once >>>>> >>>>>>>> they're in. Can we just add a Any and/or LatestCombineFn and >>>>> use (and >>>>> >>>>>>>> point to) that instead? With the comment that if you're doing >>>>> >>>>>>>> read-modify-write, an add_input may be better. >>>>> >>>>>>>> >>>>> >>>>>>>> > [1] >>>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html >>>>> >>>>>>>> > >>>>> >>>>>>>> > On Mon, Apr 29, 2019 at 12:23 AM Robert Bradshaw < >>>>> [email protected]> wrote: >>>>> >>>>>>>> >> >>>>> >>>>>>>> >> On Mon, Apr 29, 2019 at 3:43 AM Reza Rokni <[email protected]> >>>>> wrote: >>>>> >>>>>>>> >> > >>>>> >>>>>>>> >> > @Robert Bradshaw Some examples, mostly built out from >>>>> cases around Timeseries data, don't want to derail this thread so at a hi >>>>> level : >>>>> >>>>>>>> >> >>>>> >>>>>>>> >> Thanks. Perfectly on-topic for the thread. >>>>> >>>>>>>> >> >>>>> >>>>>>>> >> > Looping timers, a timer which allows for creation of a >>>>> value within a window when no external input has been seen. Requires >>>>> metadata like "is timer set". >>>>> >>>>>>>> >> > >>>>> >>>>>>>> >> > BiTemporalStream join, where we need to match >>>>> leftCol.timestamp to a value == (max(rightCol.timestamp) where >>>>> rightCol.timestamp <= leftCol.timestamp)) , this if for a application >>>>> matching trades to quotes. >>>>> >>>>>>>> >> >>>>> >>>>>>>> >> I'd be interested in seeing the code here. The fact that >>>>> you have a >>>>> >>>>>>>> >> max here makes me wonder if combining would be applicable. >>>>> >>>>>>>> >> >>>>> >>>>>>>> >> (FWIW, I've long thought it would be useful to do this >>>>> kind of thing >>>>> >>>>>>>> >> with Windows. Basically, it'd be like session windows with >>>>> one side >>>>> >>>>>>>> >> being the window from the timestamp forward into the >>>>> future, and the >>>>> >>>>>>>> >> other side being from the timestamp back a certain amount >>>>> in the past. >>>>> >>>>>>>> >> This seems a common join pattern.) >>>>> >>>>>>>> >> >>>>> >>>>>>>> >> > Metadata is used for >>>>> >>>>>>>> >> > >>>>> >>>>>>>> >> > Taking the Key from the KV for use within the OnTimer >>>>> call. >>>>> >>>>>>>> >> > Knowing where we are in watermarks for GC of objects in >>>>> state. >>>>> >>>>>>>> >> > More timer metadata (min timer ..) >>>>> >>>>>>>> >> > >>>>> >>>>>>>> >> > It could be argued that what we are using state for >>>>> mostly workarounds for things that could eventually end up in the API >>>>> itself. For example >>>>> >>>>>>>> >> > >>>>> >>>>>>>> >> > There is a Jira for OnTimer Context to have Key. >>>>> >>>>>>>> >> > The GC needs are mostly due to not having a Map State >>>>> object in all runners yet. >>>>> >>>>>>>> >> >>>>> >>>>>>>> >> Yeah. GC could probably be done with a max combine. The >>>>> Key (which >>>>> >>>>>>>> >> should be in the API) could be an AnyCombine for now (safe >>>>> to >>>>> >>>>>>>> >> overwrite because it's always the same). >>>>> >>>>>>>> >> >>>>> >>>>>>>> >> > However I think as folks explore Beam there will always >>>>> be little things that require Metadata and so having access to something >>>>> which gives us fine grain control ( as Kenneth mentioned) is useful. >>>>> >>>>>>>> >> >>>>> >>>>>>>> >> Likely. I guess in line with making easy things easy, I'd >>>>> like to make >>>>> >>>>>>>> >> dangerous things hard(er). As Kenn says, we'll probably >>>>> need some kind >>>>> >>>>>>>> >> of lower-level thing, especially if we introduce OnMerge. >>>>> >>>>>>>> >> >>>>> >>>>>>>> >> > Cheers >>>>> >>>>>>>> >> > >>>>> >>>>>>>> >> > Reza >>>>> >>>>>>>> >> > >>>>> >>>>>>>> >> > On Sat, 27 Apr 2019 at 02:59, Kenneth Knowles < >>>>> [email protected]> wrote: >>>>> >>>>>>>> >> >> >>>>> >>>>>>>> >> >> To be clear, the intent was always that ValueState >>>>> would be not usable in merging pipelines. So no danger of clobbering, but >>>>> also limited functionality. Is there a runner than accepts it and >>>>> clobbers? >>>>> The whole idea of the new DoFn is that it is easy to do the >>>>> construction-time analysis and reject the invalid pipeline. It is actually >>>>> runner independent and I think already implemented in ParDo's validation, >>>>> no? >>>>> >>>>>>>> >> >> >>>>> >>>>>>>> >> >> Kenn >>>>> >>>>>>>> >> >> >>>>> >>>>>>>> >> >> On Fri, Apr 26, 2019 at 10:14 AM Lukasz Cwik < >>>>> [email protected]> wrote: >>>>> >>>>>>>> >> >>> >>>>> >>>>>>>> >> >>> I am in the camp where we should only support merging >>>>> state (either naturally via things like bags or via combiners). I believe >>>>> that having the wrapper that Brian suggests is useful for users. As for >>>>> the >>>>> @OnMerge method, I believe combiners should have the ability to look at >>>>> the >>>>> window information and we should treat @OnMerge as syntactic sugar over a >>>>> combiner if the combiner API is too cumbersome. >>>>> >>>>>>>> >> >>> >>>>> >>>>>>>> >> >>> I believe using combiners can also extend to side >>>>> inputs and help us deal with singleton and map like side inputs when >>>>> multiple firings occur. I also like treating everything like a combiner >>>>> because it will give us a lot reuse of combiner implementations across all >>>>> the places they could be used and will be especially useful when we start >>>>> exposing APIs related to retractions on combiners. >>>>> >>>>>>>> >> >>> >>>>> >>>>>>>> >> >>> On Fri, Apr 26, 2019 at 9:43 AM Brian Hulette < >>>>> [email protected]> wrote: >>>>> >>>>>>>> >> >>>> >>>>> >>>>>>>> >> >>>> Yeah the danger with out of order processing concerns >>>>> me more than the merging as well. As a new Beam user, I immediately >>>>> gravitated towards ValueState since it was easy to think about and I just >>>>> assumed there wasn't anything to be concerned about. So it was shocking to >>>>> learn that there is this dangerous edge-case. >>>>> >>>>>>>> >> >>>> >>>>> >>>>>>>> >> >>>> What if ValueState were just implemented as a wrapper >>>>> of CombiningState with a LatestCombineFn and documented as such (and >>>>> perhaps we encourage users to consider using a CombiningState explicitly >>>>> if >>>>> at all possible)? >>>>> >>>>>>>> >> >>>> >>>>> >>>>>>>> >> >>>> Brian >>>>> >>>>>>>> >> >>>> >>>>> >>>>>>>> >> >>>> >>>>> >>>>>>>> >> >>>> >>>>> >>>>>>>> >> >>>> On Fri, Apr 26, 2019 at 2:29 AM Robert Bradshaw < >>>>> [email protected]> wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>>> >> >>>>> On Fri, Apr 26, 2019 at 6:40 AM Kenneth Knowles < >>>>> [email protected]> wrote: >>>>> >>>>>>>> >> >>>>> > >>>>> >>>>>>>> >> >>>>> > You could use a CombiningState with a CombineFn >>>>> that returns the minimum for this case. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>>> >> >>>>> We've also wanted to be able to set data when >>>>> setting a timer that >>>>> >>>>>>>> >> >>>>> would be returned when the timer fires. (It's in the >>>>> FnAPI, but not >>>>> >>>>>>>> >> >>>>> the SDKs yet.) >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>>> >> >>>>> The metadata is an interesting usecase, do you have >>>>> some more specific >>>>> >>>>>>>> >> >>>>> examples? Might boil down to not having a rich >>>>> enough (single) state >>>>> >>>>>>>> >> >>>>> type. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>>> >> >>>>> > But I've come to feel there is a mismatch. On the >>>>> one hand, ParDo(<stateful DoFn>) is a way to drop to a lower level and >>>>> write logic that does not fit a more general computational pattern, really >>>>> taking fine control. On the other hand, automatically merging state via >>>>> CombiningState or BagState is more of a no-knobs higher level of >>>>> programming. To me there seems to be a bit of a philosophical conflict. >>>>> >>>>>>>> >> >>>>> > >>>>> >>>>>>>> >> >>>>> > These days, I feel like an @OnMerge method would >>>>> be more natural. If you are using state and timers, you probably often >>>>> want >>>>> more direct control over how state from windows gets merged. An of course >>>>> we don't even have a design for timers - you would need some kind of >>>>> timestamp CombineFn but I think setting/unsetting timers manually makes >>>>> more sense. Especially considering the trickiness around merging windows >>>>> in >>>>> the absence of retractions, you really need this callback, so you can >>>>> issue >>>>> retractions manually for any output your stateful DoFn emitted in windows >>>>> that no longer exist. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>>> >> >>>>> I agree we'll probably need an @OnMerge. On the >>>>> other hand, I like >>>>> >>>>>>>> >> >>>>> being able to have good defaults. The high/low level >>>>> thing is a >>>>> >>>>>>>> >> >>>>> continuum (the indexing example falling towards the >>>>> high end). >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>>> >> >>>>> Actually, the merging questions bother me less than >>>>> how easy it is to >>>>> >>>>>>>> >> >>>>> accidentally clobber previous values. It looks so >>>>> easy (like the >>>>> >>>>>>>> >> >>>>> easiest state to use) but is actually the most >>>>> dangerous. If one wants >>>>> >>>>>>>> >> >>>>> this behavior, I would rather an explicit >>>>> AnyCombineFn or >>>>> >>>>>>>> >> >>>>> LatestCombineFn which makes you think about the >>>>> semantics. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>>> >> >>>>> - Robert >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>>> >> >>>>> > On Thu, Apr 25, 2019 at 5:49 PM Reza Rokni < >>>>> [email protected]> wrote: >>>>> >>>>>>>> >> >>>>> >> >>>>> >>>>>>>> >> >>>>> >> +1 on the metadata use case. >>>>> >>>>>>>> >> >>>>> >> >>>>> >>>>>>>> >> >>>>> >> For performance reasons the Timer API does not >>>>> support a read() operation, which for the vast majority of use cases is >>>>> not a required feature. In the small set of use cases where it is needed, >>>>> for example when you need to set a Timer in EventTime based on the >>>>> smallest >>>>> timestamp seen in the elements within a DoFn, we can make use of a >>>>> ValueState object to keep track of the value. >>>>> >>>>>>>> >> >>>>> >> >>>>> >>>>>>>> >> >>>>> >> On Fri, 26 Apr 2019 at 00:38, Reuven Lax < >>>>> [email protected]> wrote: >>>>> >>>>>>>> >> >>>>> >>> >>>>> >>>>>>>> >> >>>>> >>> I see examples of people using ValueState that I >>>>> think are not captured CombiningState. For example, one common one is >>>>> users >>>>> who set a timer and then record the timestamp of that timer in a >>>>> ValueState. In general when you store state that is metadata about other >>>>> state you store, then ValueState will usually make more sense than >>>>> CombiningState. >>>>> >>>>>>>> >> >>>>> >>> >>>>> >>>>>>>> >> >>>>> >>> On Thu, Apr 25, 2019 at 9:32 AM Brian Hulette < >>>>> [email protected]> wrote: >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> Currently the Python SDK does not make >>>>> ValueState available to users. My initial inclination was to go ahead and >>>>> implement it there to be consistent with Java, but Robert brings up a >>>>> great >>>>> point here that ValueState has an inherent race condition for out of order >>>>> data, and a lot of it's use cases can actually be implemented with a >>>>> CombiningState instead. >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> It seems to me that at the very least we should >>>>> discourage the use of ValueState by noting the danger in the documentation >>>>> and preferring CombiningState in examples, and perhaps we should go >>>>> further >>>>> and deprecate it in Java and not implement it in python. Either way I >>>>> think >>>>> we should be consistent between Java and Python. >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> I'm curious what people think about this, are >>>>> there use cases that we really need to keep ValueState around for? >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> Brian >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> ---------- Forwarded message --------- >>>>> >>>>>>>> >> >>>>> >>>> From: Robert Bradshaw <[email protected]> >>>>> >>>>>>>> >> >>>>> >>>> Date: Thu, Apr 25, 2019, 08:31 >>>>> >>>>>>>> >> >>>>> >>>> Subject: Re: [docs] Python State & Timers >>>>> >>>>>>>> >> >>>>> >>>> To: dev <[email protected]> >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> On Thu, Apr 25, 2019, 5:26 PM Maximilian >>>>> Michels <[email protected]> wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>>>> >>>>>>>> >> >>>>> >>>>> Completely agree that CombiningState is nicer >>>>> in this example. Users may >>>>> >>>>>>>> >> >>>>> >>>>> still want to use ValueState when there is >>>>> nothing to combine. >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> I've always had trouble coming up with any good >>>>> examples of this. >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>>> Also, >>>>> >>>>>>>> >> >>>>> >>>>> users already know ValueState from the Java >>>>> SDK. >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> Maybe we should deprecate that :) >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>>> On 25.04.19 17:12, Robert Bradshaw wrote: >>>>> >>>>>>>> >> >>>>> >>>>> > On Thu, Apr 25, 2019 at 4:58 PM Maximilian >>>>> Michels <[email protected]> wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>> >> I forgot to give an example, just to >>>>> clarify for others: >>>>> >>>>>>>> >> >>>>> >>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>> >>> What was the specific example that was >>>>> less natural? >>>>> >>>>>>>> >> >>>>> >>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>> >> Basically every time we use ListState to >>>>> express ValueState, e.g. >>>>> >>>>>>>> >> >>>>> >>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>> >> next_index, = list(state.read()) or [0] >>>>> >>>>>>>> >> >>>>> >>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>> >> Taken from: >>>>> >>>>>>>> >> >>>>> >>>>> >> >>>>> https://github.com/apache/beam/pull/8363/files#diff-ba1a2aed98079ccce869cd660ca9d97dR301 >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > Yes, ListState is much less natural here. I >>>>> think generally >>>>> >>>>>>>> >> >>>>> >>>>> > CombiningValue is often a better >>>>> replacement. E.g. the Java example >>>>> >>>>>>>> >> >>>>> >>>>> > reads >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > public void processElement( >>>>> >>>>>>>> >> >>>>> >>>>> > ProcessContext context, >>>>> @StateId("index") ValueState<Integer> index) { >>>>> >>>>>>>> >> >>>>> >>>>> > int current = >>>>> firstNonNull(index.read(), 0); >>>>> >>>>>>>> >> >>>>> >>>>> > context.output(KV.of(current, >>>>> context.element())); >>>>> >>>>>>>> >> >>>>> >>>>> > index.write(current+1); >>>>> >>>>>>>> >> >>>>> >>>>> > } >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > which is replaced with bag state >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > def process(self, element, >>>>> state=DoFn.StateParam(INDEX_STATE)): >>>>> >>>>>>>> >> >>>>> >>>>> > next_index, = list(state.read()) or [0] >>>>> >>>>>>>> >> >>>>> >>>>> > yield (element, next_index) >>>>> >>>>>>>> >> >>>>> >>>>> > state.clear() >>>>> >>>>>>>> >> >>>>> >>>>> > state.add(next_index + 1) >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > whereas CombiningState would be more natural >>>>> (than ListState, and >>>>> >>>>>>>> >> >>>>> >>>>> > arguably than even ValueState), giving >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > def process(self, element, >>>>> index=DoFn.StateParam(INDEX_STATE)): >>>>> >>>>>>>> >> >>>>> >>>>> > yield element, index.read() >>>>> >>>>>>>> >> >>>>> >>>>> > index.add(1) >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>> >> -Max >>>>> >>>>>>>> >> >>>>> >>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>> >> On 25.04.19 16:40, Robert Bradshaw wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>> https://github.com/apache/beam/pull/8402 >>>>> >>>>>>>> >> >>>>> >>>>> >>> >>>>> >>>>>>>> >> >>>>> >>>>> >>> On Thu, Apr 25, 2019 at 4:26 PM Robert >>>>> Bradshaw <[email protected]> wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>> Oh, this is for the indexing example. >>>>> >>>>>>>> >> >>>>> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>> I actually think using CombiningState is >>>>> more cleaner than ValueState. >>>>> >>>>>>>> >> >>>>> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>> >>>>> https://github.com/apache/beam/blob/release-2.12.0/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py#L262 >>>>> >>>>>>>> >> >>>>> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>> (The fact that one must specify the >>>>> accumulator coder is, however, >>>>> >>>>>>>> >> >>>>> >>>>> >>>> unfortunate. We should probably infer >>>>> that if we can.) >>>>> >>>>>>>> >> >>>>> >>>>> >>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>> On Thu, Apr 25, 2019 at 4:19 PM Robert >>>>> Bradshaw <[email protected]> wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>> The desire was to avoid the implicit >>>>> disallowed combination wart in >>>>> >>>>>>>> >> >>>>> >>>>> >>>>> Python (until we could make sense of >>>>> it), and also ValueState could be >>>>> >>>>>>>> >> >>>>> >>>>> >>>>> surprising with respect to older values >>>>> overwriting newer ones. What >>>>> >>>>>>>> >> >>>>> >>>>> >>>>> was the specific example that was less >>>>> natural? >>>>> >>>>>>>> >> >>>>> >>>>> >>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>> On Thu, Apr 25, 2019 at 3:01 PM >>>>> Maximilian Michels <[email protected]> wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> @Pablo: Thanks for following up with >>>>> the PR! :) >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> @Brian: I was wondering about this as >>>>> well. It makes the Python state >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> code a bit unnatural. I'd suggest to >>>>> add a ValueState wrapper around >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> ListState/CombiningState. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> @Robert: Like Reuven pointed out, we >>>>> can disallow ValueState for merging >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> windows with state. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> @Reza: Great. Let's make sure it has >>>>> Python examples out of the box. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> Either Pablo or me could help there. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> Thanks, >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> Max >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>> On 25.04.19 04:14, Reza Ardeshir Rokni >>>>> wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> Pablo, Kenneth and I have a new blog >>>>> ready for publication which covers >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> how to create a "looping timer" it >>>>> allows for default values to be >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> created in a window when no incoming >>>>> elements exists. We just need to >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> clear a few bits before publication, >>>>> but would be great to have that >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> also include a python example, I wrote >>>>> it in java... >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> Cheers >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> Reza >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> On Thu, 25 Apr 2019 at 04:34, Reuven >>>>> Lax <[email protected] >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> <mailto:[email protected]>> wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> Well state is still not >>>>> implemented for merging windows even for >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> Java (though I believe the idea >>>>> was to disallow ValueState there). >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> On Wed, Apr 24, 2019 at 1:11 PM >>>>> Robert Bradshaw <[email protected] >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> <mailto:[email protected]>> >>>>> wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> It was unclear what the >>>>> semantics were for ValueState for merging >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> windows. (It's also a bit >>>>> weird as it's inherently a race condition >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> wrt element ordering, unlike >>>>> Bag and CombineState, though you can >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> always implement it as a >>>>> CombineState that always returns the latest >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> value which is a bit more >>>>> explicit about the dangers here.) >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> On Wed, Apr 24, 2019 at >>>>> 10:08 PM Brian Hulette >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > That's a great idea! I >>>>> thought about this too after those >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> posts came up on the list >>>>> recently. I started to look into it, >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> but I noticed that there's >>>>> actually no implementation of >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> ValueState in userstate. Is >>>>> there a reason for that? I started >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> to work on a patch to add it >>>>> but I was just curious if there was >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> some reason it was omitted >>>>> that I should be aware of. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > We could certainly >>>>> replicate the example without ValueState >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> by using BagState and >>>>> clearing it before each write, but it >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> would be nice if we could >>>>> draw a direct parallel. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > Brian >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > On Fri, Apr 12, 2019 at >>>>> 7:05 AM Maximilian Michels >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> <[email protected] <mailto: >>>>> [email protected]>> wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > It would probably be >>>>> pretty easy to add the corresponding >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> code snippets to the docs as >>>>> well. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> It's probably a bit more >>>>> work because there is no section >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> dedicated to >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> state/timer yet in the >>>>> documentation. Tracked here: >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>>> https://jira.apache.org/jira/browse/BEAM-2472 >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > I've been going over >>>>> this topic a bit. I'll add the >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> snippets next week, if >>>>> that's fine by y'all. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> That would be great. The >>>>> blog posts are a great way to get >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> started with >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> state/timers. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> Thanks, >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> Max >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> On 11.04.19 20:21, Pablo >>>>> Estrada wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > I've been going over >>>>> this topic a bit. I'll add the >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> snippets next week, >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > if that's fine by >>>>> y'all. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > Best >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > -P. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > On Thu, Apr 11, 2019 >>>>> at 5:27 AM Robert Bradshaw >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> <[email protected] >>>>> <mailto:[email protected]> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > <mailto: >>>>> [email protected] <mailto:[email protected]>>> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > That's a great >>>>> idea! It would probably be pretty easy >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> to add the >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > corresponding code >>>>> snippets to the docs as well. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > On Thu, Apr 11, >>>>> 2019 at 2:00 PM Maximilian Michels >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> <[email protected] <mailto: >>>>> [email protected]> >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > <mailto: >>>>> [email protected] <mailto:[email protected]>>> wrote: >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > Hi everyone, >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > The Python SDK >>>>> still lacks documentation on state >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> and timers. >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > As a first >>>>> step, what do you think about updating >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> these two blog >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > posts >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > with the >>>>> corresponding Python code? >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > Thanks, >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > Max >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > >>>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>>> >>>>>>>> >> >>>>> >> >>>>> >>>>>>>> >> >>>>> >> >>>>> >>>>>>>> >> >>>>> >> >>>>> >>>>>>>> >> >>>>> >> -- >>>>> >>>>>>>> >> >>>>> >> >>>>> >>>>>>>> >> >>>>> >> This email may be confidential and privileged. If >>>>> you received this communication by mistake, please don't forward it to >>>>> anyone else, please erase all copies and attachments, and please let me >>>>> know that it has gone to the wrong person. >>>>> >>>>>>>> >> >>>>> >> >>>>> >>>>>>>> >> >>>>> >> The above terms reflect a potential business >>>>> arrangement, are provided solely as a basis for further discussion, and >>>>> are >>>>> not intended to be and do not constitute a legally binding obligation. No >>>>> legally binding obligations will be created, implied, or inferred until an >>>>> agreement in final form is executed in writing by all parties involved. >>>>> >>>>>>>> >> > >>>>> >>>>>>>> >> > >>>>> >>>>>>>> >> > >>>>> >>>>>>>> >> > -- >>>>> >>>>>>>> >> > >>>>> >>>>>>>> >> > This email may be confidential and privileged. If you >>>>> received this communication by mistake, please don't forward it to anyone >>>>> else, please erase all copies and attachments, and please let me know that >>>>> it has gone to the wrong person. >>>>> >>>>>>>> >> > >>>>> >>>>>>>> >> > The above terms reflect a potential business >>>>> arrangement, are provided solely as a basis for further discussion, and >>>>> are >>>>> not intended to be and do not constitute a legally binding obligation. No >>>>> legally binding obligations will be created, implied, or inferred until an >>>>> agreement in final form is executed in writing by all parties involved. >>>>> >>>> >>> >>> -- >>> >>> This email may be confidential and privileged. If you received this >>> communication by mistake, please don't forward it to anyone else, please >>> erase all copies and attachments, and please let me know that it has gone >>> to the wrong person. >>> >>> The above terms reflect a potential business arrangement, are provided >>> solely as a basis for further discussion, and are not intended to be and do >>> not constitute a legally binding obligation. No legally binding obligations >>> will be created, implied, or inferred until an agreement in final form is >>> executed in writing by all parties involved. >>> >>
