Hi Brian/Robert I am moving ahead with implementing ReadModifyWriteState. I will just repurpose this PR: https://github.com/apache/beam/pull/9067 .
On Mon, Jul 15, 2019 at 7:47 PM Rakesh Kumar <[email protected]> wrote: > Brian, > > I just want to follow up. Let me know if you are working on this. > Otherwise, I can implement ReadModifyWriteState. > > On Mon, May 6, 2019 at 4:52 PM Reza Rokni <[email protected]> wrote: > >> When used as metadata I think the ReadModifyWrite naming is very accurate >> for the majority of cases. >> >> The only case that does not follow that pattern is if its being used as a >> Boolean to indicate that something should be done in the OnTimer call based >> on an event that has been seen in the OnProcess code. But even then calling >> it ReadModifyWrite would still be ok and easily understood. >> >> *From: *Kenneth Knowles <[email protected]> >> *Date: *Fri, 3 May 2019 at 10:58 >> *To: *dev >> >> Agree with all of your points about the drawbacks of ValueState. It is >>> definitely a pro/con weighing sort of situation. Considering the number of >>> users who are new to the orthogonality of event time and processing time, >>> ValueState could certainly lead to confusion about why things are not in >>> any particular order. Perhaps a middle ground is to have a bad of "written >>> values" with sequence numbers, and document some patterns/examples of how a >>> user may care to resolve these sequences numbers on read, or perhaps some >>> combiners like you describe. Overall, I'd defer to Reza and Reuven as they >>> seem very familiar with real-world use cases here. >>> >>> Kenn >>> >>> On Thu, May 2, 2019 at 2:53 AM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> On Wed, May 1, 2019 at 8:09 PM Kenneth Knowles <[email protected]> wrote: >>>> > >>>> > On Wed, May 1, 2019 at 8:51 AM Reuven Lax <[email protected]> wrote: >>>> >> >>>> >> ValueState is not necessarily racy if you're doing a >>>> read-modify-write. It's only racy if you're doing something like writing >>>> last element seen. >>>> > >>>> > Race conditions are not inherently a problem. They are neither >>>> necessary nor sufficient for correctness. In this case, it is not the >>>> classic sense of race condition anyhow, it is simply a nondeterministic >>>> result, which may often be perfectly fine. >>>> >>>> One can write correct code with ValueState, but it's harder to do. >>>> This is exacerbated by the fact that at first glance it looks easier >>>> to use. >>>> >>>> >>>>> On Wed, May 1, 2019 at 8:30 AM Lukasz Cwik <[email protected]> >>>> wrote: >>>> >>>>>> >>>> >>>>>> Isn't a value state just a bag state with at most one element >>>> and the usage pattern would be? >>>> >>>>>> 1) value_state.get == bag_state.read.next() (both have to handle >>>> the case when neither have been set) >>>> >>>>>> 2) user logic on what to do with current state + additional >>>> information to produce new state >>>> >>>>>> 3) value_state.set == bag_state.clear + bag_state.append? (note >>>> that Runners should optimize clear + append to become a single >>>> transaction/write) >>>> > >>>> > Your unpacking is accurate, but "X is just a Y" is not accurate. In >>>> this case you've demonstrated that value state *can be implemented using* >>>> bag state / has a workaround. But it is not subsumed by bag state. One >>>> important feature of ValueState is that it is statically determined that >>>> the transform cannot be used with merging windows. >>>> >>>> The flip side is that it makes it easy to write code that cannot be >>>> used with merging windows. Which hurts composition (especially if >>>> these operations are used as part of larger composite operations). >>>> >>>> > Another feature is that it is impossible to accidentally write more >>>> than one value. And a third important feature is that it declares what it >>>> is so that the code is more readable. >>>> >>>> +1. Which is why I think CombiningState is a better substitute than >>>> BagState where it makes sense (and often does, and often can even be >>>> an improvement over ValueState for performance and readability). >>>> >>>> Perhaps instead we could call it ReadModifyWrite state. It could make >>>> sense, as well as a read() and write() operation, that we even offer a >>>> modify(I, (I, S) -> S) operation. >>>> >>>> (Also, yes, when I said Latest I too meant a hypothetical "throw away >>>> everything else when a new element is written" one, not the specific >>>> one in the code. Sorry for the confusion.) >>>> >>>> >>>>>> For example, the blog post with the counter example would be: >>>> >>>>>> @StateId("buffer") >>>> >>>>>> private final StateSpec<BagState<Event>> bufferedEvents = >>>> StateSpecs.bag(); >>>> >>>>>> >>>> >>>>>> @StateId("count") >>>> >>>>>> private final StateSpec<BagState<Integer>> countState = >>>> StateSpecs.bag(); >>>> >>>>>> >>>> >>>>>> @ProcessElement >>>> >>>>>> public void process( >>>> >>>>>> ProcessContext context, >>>> >>>>>> @StateId("buffer") BagState<Event> bufferState, >>>> >>>>>> @StateId("count") BagState<Integer> countState) { >>>> >>>>>> >>>> >>>>>> int count = Iterables.getFirst(countState.read(), 0); >>>> >>>>>> count = count + 1; >>>> >>>>>> countState.clear(); >>>> >>>>>> countState.append(count); >>>> >>>>>> bufferState.add(context.element()); >>>> >>>>>> >>>> >>>>>> if (count > MAX_BUFFER_SIZE) { >>>> >>>>>> for (EnrichedEvent enrichedEvent : >>>> enrichEvents(bufferState.read())) { >>>> >>>>>> context.output(enrichedEvent); >>>> >>>>>> } >>>> >>>>>> bufferState.clear(); >>>> >>>>>> countState.clear(); >>>> >>>>>> } >>>> >>>>>> } >>>> >>>>>> >>>> >>>>>> On Tue, Apr 30, 2019 at 5:39 PM Kenneth Knowles <[email protected]> >>>> wrote: >>>> >>>>>>> >>>> >>>>>>> Anything where the state evolves serially but arbitrarily - the >>>> toy example is the integer counter in my blog post - needs ValueState. You >>>> can't do it with AnyCombineFn. And I think LatestCombineFn is dangerous, >>>> especially when it comes to CombingState. ValueState is more explicit, and >>>> I still maintain that it is status quo, modulo unimplemented features in >>>> the Python SDK. The reads and writes are explicitly serial per (key, >>>> window), unlike for a CombiningState. Because of a CombineFn's requirement >>>> to be associative and commutative, I would interpret it that the runner is >>>> allowed to reorder inputs even after they are "written" by the user's DoFn >>>> - for example doing blind writes to an unordered bag, and only later >>>> reading the elements out and combining them in arbitrary order. Or any >>>> other strategy mixing addInput and mergeAccumulators. It would be a >>>> violation of the meaning of CombineFn to overspecify CombiningState further >>>> than this. So you cannot actually implement a consistent >>>> CombiningState(LatestCombineFn). >>>> >>>>>>> >>>> >>>>>>> Kenn >>>> >>>>>>> >>>> >>>>>>> On Tue, Apr 30, 2019 at 5:19 PM Robert Bradshaw < >>>> [email protected]> wrote: >>>> >>>>>>>> >>>> >>>>>>>> On Wed, May 1, 2019 at 1:55 AM Brian Hulette < >>>> [email protected]> wrote: >>>> >>>>>>>> > >>>> >>>>>>>> > Reza - you're definitely not derailing, that's exactly what >>>> I was looking for! >>>> >>>>>>>> > >>>> >>>>>>>> > I've actually recently encountered an additional use case >>>> where I'd like to use ValueState in the Python SDK. I'm experimenting with >>>> an ArrowBatchingDoFn that uses state and timers to batch up python >>>> dictionaries into arrow record batches (actually my entire purpose for >>>> jumping down this python state rabbit hole). >>>> >>>>>>>> > >>>> >>>>>>>> > At first blush it seems like the best way to do this would >>>> be to just replicate the batching approach in the timely processing post >>>> [1], but when the bag is full combine the elements into an arrow record >>>> batch, rather than enriching all of the elements and writing them out >>>> separately. However, if possible I'd like to pre-allocate buffers for each >>>> column and populate them as elements arrive (at least for columns with a >>>> fixed size type), so a bag state wouldn't be ideal. >>>> >>>>>>>> >>>> >>>>>>>> It seems it'd be preferable to do the conversion from a bag of >>>> >>>>>>>> elements to a single arrow frame all at once, when emitting, >>>> rather >>>> >>>>>>>> than repeatedly reading and writing the partial batch to and >>>> from >>>> >>>>>>>> state with every element that comes in. (Bag state has blind >>>> append.) >>>> >>>>>>>> >>>> >>>>>>>> > Also, a CombiningValueState is not ideal because I'd need to >>>> implement a merge_accumulators function that combines several in-progress >>>> batches. I could certainly implement that, but I'd prefer that it never be >>>> called unless absolutely necessary, which doesn't seem to be the case for >>>> CombiningValueState. (As an aside, maybe there's some room there for a >>>> middle ground between ValueState and CombiningValueState >>>> >>>>>>>> >>>> >>>>>>>> This does actually feel natural (to me), because you're >>>> repeatedly >>>> >>>>>>>> adding elements to build something up. merge_accumulators would >>>> >>>>>>>> probably be pretty easy (concatenation) but unless your >>>> windows are >>>> >>>>>>>> merging could just throw a not implemented error to really >>>> guard >>>> >>>>>>>> against it being used. >>>> >>>>>>>> >>>> >>>>>>>> > I suppose you could argue that this is a pretty low-level >>>> optimization we should be able to shield our users from, but right now I >>>> just wish I had ValueState in python so I didn't have to hack it up with a >>>> BagState :) >>>> >>>>>>>> > >>>> >>>>>>>> > Anyway, in light of this and all the other use-cases >>>> mentioned here, I think the resolution is to just implement ValueState in >>>> python, and document the danger with ValueState in both Python and Java. >>>> Just to be clear, the danger I'm referring to is that users might easily >>>> forget that data can be out of order, and use ValueState in a way that >>>> assumes it's been populated with data from the most recent element in event >>>> time, then in practice out of order data clobbers their state. I'm happy to >>>> write up a PR for this - are there any objections to that? >>>> >>>>>>>> >>>> >>>>>>>> I still haven't seen a good case for it (though I haven't >>>> looked at >>>> >>>>>>>> Reza's BiTemporalStream yet). Much harder to remove things once >>>> >>>>>>>> they're in. Can we just add a Any and/or LatestCombineFn and >>>> use (and >>>> >>>>>>>> point to) that instead? With the comment that if you're doing >>>> >>>>>>>> read-modify-write, an add_input may be better. >>>> >>>>>>>> >>>> >>>>>>>> > [1] >>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html >>>> >>>>>>>> > >>>> >>>>>>>> > On Mon, Apr 29, 2019 at 12:23 AM Robert Bradshaw < >>>> [email protected]> wrote: >>>> >>>>>>>> >> >>>> >>>>>>>> >> On Mon, Apr 29, 2019 at 3:43 AM Reza Rokni <[email protected]> >>>> wrote: >>>> >>>>>>>> >> > >>>> >>>>>>>> >> > @Robert Bradshaw Some examples, mostly built out from >>>> cases around Timeseries data, don't want to derail this thread so at a hi >>>> level : >>>> >>>>>>>> >> >>>> >>>>>>>> >> Thanks. Perfectly on-topic for the thread. >>>> >>>>>>>> >> >>>> >>>>>>>> >> > Looping timers, a timer which allows for creation of a >>>> value within a window when no external input has been seen. Requires >>>> metadata like "is timer set". >>>> >>>>>>>> >> > >>>> >>>>>>>> >> > BiTemporalStream join, where we need to match >>>> leftCol.timestamp to a value == (max(rightCol.timestamp) where >>>> rightCol.timestamp <= leftCol.timestamp)) , this if for a application >>>> matching trades to quotes. >>>> >>>>>>>> >> >>>> >>>>>>>> >> I'd be interested in seeing the code here. The fact that >>>> you have a >>>> >>>>>>>> >> max here makes me wonder if combining would be applicable. >>>> >>>>>>>> >> >>>> >>>>>>>> >> (FWIW, I've long thought it would be useful to do this kind >>>> of thing >>>> >>>>>>>> >> with Windows. Basically, it'd be like session windows with >>>> one side >>>> >>>>>>>> >> being the window from the timestamp forward into the >>>> future, and the >>>> >>>>>>>> >> other side being from the timestamp back a certain amount >>>> in the past. >>>> >>>>>>>> >> This seems a common join pattern.) >>>> >>>>>>>> >> >>>> >>>>>>>> >> > Metadata is used for >>>> >>>>>>>> >> > >>>> >>>>>>>> >> > Taking the Key from the KV for use within the OnTimer >>>> call. >>>> >>>>>>>> >> > Knowing where we are in watermarks for GC of objects in >>>> state. >>>> >>>>>>>> >> > More timer metadata (min timer ..) >>>> >>>>>>>> >> > >>>> >>>>>>>> >> > It could be argued that what we are using state for >>>> mostly workarounds for things that could eventually end up in the API >>>> itself. For example >>>> >>>>>>>> >> > >>>> >>>>>>>> >> > There is a Jira for OnTimer Context to have Key. >>>> >>>>>>>> >> > The GC needs are mostly due to not having a Map State >>>> object in all runners yet. >>>> >>>>>>>> >> >>>> >>>>>>>> >> Yeah. GC could probably be done with a max combine. The Key >>>> (which >>>> >>>>>>>> >> should be in the API) could be an AnyCombine for now (safe >>>> to >>>> >>>>>>>> >> overwrite because it's always the same). >>>> >>>>>>>> >> >>>> >>>>>>>> >> > However I think as folks explore Beam there will always >>>> be little things that require Metadata and so having access to something >>>> which gives us fine grain control ( as Kenneth mentioned) is useful. >>>> >>>>>>>> >> >>>> >>>>>>>> >> Likely. I guess in line with making easy things easy, I'd >>>> like to make >>>> >>>>>>>> >> dangerous things hard(er). As Kenn says, we'll probably >>>> need some kind >>>> >>>>>>>> >> of lower-level thing, especially if we introduce OnMerge. >>>> >>>>>>>> >> >>>> >>>>>>>> >> > Cheers >>>> >>>>>>>> >> > >>>> >>>>>>>> >> > Reza >>>> >>>>>>>> >> > >>>> >>>>>>>> >> > On Sat, 27 Apr 2019 at 02:59, Kenneth Knowles < >>>> [email protected]> wrote: >>>> >>>>>>>> >> >> >>>> >>>>>>>> >> >> To be clear, the intent was always that ValueState would >>>> be not usable in merging pipelines. So no danger of clobbering, but also >>>> limited functionality. Is there a runner than accepts it and clobbers? The >>>> whole idea of the new DoFn is that it is easy to do the construction-time >>>> analysis and reject the invalid pipeline. It is actually runner independent >>>> and I think already implemented in ParDo's validation, no? >>>> >>>>>>>> >> >> >>>> >>>>>>>> >> >> Kenn >>>> >>>>>>>> >> >> >>>> >>>>>>>> >> >> On Fri, Apr 26, 2019 at 10:14 AM Lukasz Cwik < >>>> [email protected]> wrote: >>>> >>>>>>>> >> >>> >>>> >>>>>>>> >> >>> I am in the camp where we should only support merging >>>> state (either naturally via things like bags or via combiners). I believe >>>> that having the wrapper that Brian suggests is useful for users. As for the >>>> @OnMerge method, I believe combiners should have the ability to look at the >>>> window information and we should treat @OnMerge as syntactic sugar over a >>>> combiner if the combiner API is too cumbersome. >>>> >>>>>>>> >> >>> >>>> >>>>>>>> >> >>> I believe using combiners can also extend to side >>>> inputs and help us deal with singleton and map like side inputs when >>>> multiple firings occur. I also like treating everything like a combiner >>>> because it will give us a lot reuse of combiner implementations across all >>>> the places they could be used and will be especially useful when we start >>>> exposing APIs related to retractions on combiners. >>>> >>>>>>>> >> >>> >>>> >>>>>>>> >> >>> On Fri, Apr 26, 2019 at 9:43 AM Brian Hulette < >>>> [email protected]> wrote: >>>> >>>>>>>> >> >>>> >>>> >>>>>>>> >> >>>> Yeah the danger with out of order processing concerns >>>> me more than the merging as well. As a new Beam user, I immediately >>>> gravitated towards ValueState since it was easy to think about and I just >>>> assumed there wasn't anything to be concerned about. So it was shocking to >>>> learn that there is this dangerous edge-case. >>>> >>>>>>>> >> >>>> >>>> >>>>>>>> >> >>>> What if ValueState were just implemented as a wrapper >>>> of CombiningState with a LatestCombineFn and documented as such (and >>>> perhaps we encourage users to consider using a CombiningState explicitly if >>>> at all possible)? >>>> >>>>>>>> >> >>>> >>>> >>>>>>>> >> >>>> Brian >>>> >>>>>>>> >> >>>> >>>> >>>>>>>> >> >>>> >>>> >>>>>>>> >> >>>> >>>> >>>>>>>> >> >>>> On Fri, Apr 26, 2019 at 2:29 AM Robert Bradshaw < >>>> [email protected]> wrote: >>>> >>>>>>>> >> >>>>> >>>> >>>>>>>> >> >>>>> On Fri, Apr 26, 2019 at 6:40 AM Kenneth Knowles < >>>> [email protected]> wrote: >>>> >>>>>>>> >> >>>>> > >>>> >>>>>>>> >> >>>>> > You could use a CombiningState with a CombineFn >>>> that returns the minimum for this case. >>>> >>>>>>>> >> >>>>> >>>> >>>>>>>> >> >>>>> We've also wanted to be able to set data when setting >>>> a timer that >>>> >>>>>>>> >> >>>>> would be returned when the timer fires. (It's in the >>>> FnAPI, but not >>>> >>>>>>>> >> >>>>> the SDKs yet.) >>>> >>>>>>>> >> >>>>> >>>> >>>>>>>> >> >>>>> The metadata is an interesting usecase, do you have >>>> some more specific >>>> >>>>>>>> >> >>>>> examples? Might boil down to not having a rich enough >>>> (single) state >>>> >>>>>>>> >> >>>>> type. >>>> >>>>>>>> >> >>>>> >>>> >>>>>>>> >> >>>>> > But I've come to feel there is a mismatch. On the >>>> one hand, ParDo(<stateful DoFn>) is a way to drop to a lower level and >>>> write logic that does not fit a more general computational pattern, really >>>> taking fine control. On the other hand, automatically merging state via >>>> CombiningState or BagState is more of a no-knobs higher level of >>>> programming. To me there seems to be a bit of a philosophical conflict. >>>> >>>>>>>> >> >>>>> > >>>> >>>>>>>> >> >>>>> > These days, I feel like an @OnMerge method would be >>>> more natural. If you are using state and timers, you probably often want >>>> more direct control over how state from windows gets merged. An of course >>>> we don't even have a design for timers - you would need some kind of >>>> timestamp CombineFn but I think setting/unsetting timers manually makes >>>> more sense. Especially considering the trickiness around merging windows in >>>> the absence of retractions, you really need this callback, so you can issue >>>> retractions manually for any output your stateful DoFn emitted in windows >>>> that no longer exist. >>>> >>>>>>>> >> >>>>> >>>> >>>>>>>> >> >>>>> I agree we'll probably need an @OnMerge. On the other >>>> hand, I like >>>> >>>>>>>> >> >>>>> being able to have good defaults. The high/low level >>>> thing is a >>>> >>>>>>>> >> >>>>> continuum (the indexing example falling towards the >>>> high end). >>>> >>>>>>>> >> >>>>> >>>> >>>>>>>> >> >>>>> Actually, the merging questions bother me less than >>>> how easy it is to >>>> >>>>>>>> >> >>>>> accidentally clobber previous values. It looks so >>>> easy (like the >>>> >>>>>>>> >> >>>>> easiest state to use) but is actually the most >>>> dangerous. If one wants >>>> >>>>>>>> >> >>>>> this behavior, I would rather an explicit >>>> AnyCombineFn or >>>> >>>>>>>> >> >>>>> LatestCombineFn which makes you think about the >>>> semantics. >>>> >>>>>>>> >> >>>>> >>>> >>>>>>>> >> >>>>> - Robert >>>> >>>>>>>> >> >>>>> >>>> >>>>>>>> >> >>>>> > On Thu, Apr 25, 2019 at 5:49 PM Reza Rokni < >>>> [email protected]> wrote: >>>> >>>>>>>> >> >>>>> >> >>>> >>>>>>>> >> >>>>> >> +1 on the metadata use case. >>>> >>>>>>>> >> >>>>> >> >>>> >>>>>>>> >> >>>>> >> For performance reasons the Timer API does not >>>> support a read() operation, which for the vast majority of use cases is >>>> not a required feature. In the small set of use cases where it is needed, >>>> for example when you need to set a Timer in EventTime based on the smallest >>>> timestamp seen in the elements within a DoFn, we can make use of a >>>> ValueState object to keep track of the value. >>>> >>>>>>>> >> >>>>> >> >>>> >>>>>>>> >> >>>>> >> On Fri, 26 Apr 2019 at 00:38, Reuven Lax < >>>> [email protected]> wrote: >>>> >>>>>>>> >> >>>>> >>> >>>> >>>>>>>> >> >>>>> >>> I see examples of people using ValueState that I >>>> think are not captured CombiningState. For example, one common one is users >>>> who set a timer and then record the timestamp of that timer in a >>>> ValueState. In general when you store state that is metadata about other >>>> state you store, then ValueState will usually make more sense than >>>> CombiningState. >>>> >>>>>>>> >> >>>>> >>> >>>> >>>>>>>> >> >>>>> >>> On Thu, Apr 25, 2019 at 9:32 AM Brian Hulette < >>>> [email protected]> wrote: >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> Currently the Python SDK does not make >>>> ValueState available to users. My initial inclination was to go ahead and >>>> implement it there to be consistent with Java, but Robert brings up a great >>>> point here that ValueState has an inherent race condition for out of order >>>> data, and a lot of it's use cases can actually be implemented with a >>>> CombiningState instead. >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> It seems to me that at the very least we should >>>> discourage the use of ValueState by noting the danger in the documentation >>>> and preferring CombiningState in examples, and perhaps we should go further >>>> and deprecate it in Java and not implement it in python. Either way I think >>>> we should be consistent between Java and Python. >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> I'm curious what people think about this, are >>>> there use cases that we really need to keep ValueState around for? >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> Brian >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> ---------- Forwarded message --------- >>>> >>>>>>>> >> >>>>> >>>> From: Robert Bradshaw <[email protected]> >>>> >>>>>>>> >> >>>>> >>>> Date: Thu, Apr 25, 2019, 08:31 >>>> >>>>>>>> >> >>>>> >>>> Subject: Re: [docs] Python State & Timers >>>> >>>>>>>> >> >>>>> >>>> To: dev <[email protected]> >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> On Thu, Apr 25, 2019, 5:26 PM Maximilian Michels >>>> <[email protected]> wrote: >>>> >>>>>>>> >> >>>>> >>>>> >>>> >>>>>>>> >> >>>>> >>>>> Completely agree that CombiningState is nicer >>>> in this example. Users may >>>> >>>>>>>> >> >>>>> >>>>> still want to use ValueState when there is >>>> nothing to combine. >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> I've always had trouble coming up with any good >>>> examples of this. >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>>> Also, >>>> >>>>>>>> >> >>>>> >>>>> users already know ValueState from the Java SDK. >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> Maybe we should deprecate that :) >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>>> On 25.04.19 17:12, Robert Bradshaw wrote: >>>> >>>>>>>> >> >>>>> >>>>> > On Thu, Apr 25, 2019 at 4:58 PM Maximilian >>>> Michels <[email protected]> wrote: >>>> >>>>>>>> >> >>>>> >>>>> >> >>>> >>>>>>>> >> >>>>> >>>>> >> I forgot to give an example, just to clarify >>>> for others: >>>> >>>>>>>> >> >>>>> >>>>> >> >>>> >>>>>>>> >> >>>>> >>>>> >>> What was the specific example that was less >>>> natural? >>>> >>>>>>>> >> >>>>> >>>>> >> >>>> >>>>>>>> >> >>>>> >>>>> >> Basically every time we use ListState to >>>> express ValueState, e.g. >>>> >>>>>>>> >> >>>>> >>>>> >> >>>> >>>>>>>> >> >>>>> >>>>> >> next_index, = list(state.read()) or [0] >>>> >>>>>>>> >> >>>>> >>>>> >> >>>> >>>>>>>> >> >>>>> >>>>> >> Taken from: >>>> >>>>>>>> >> >>>>> >>>>> >> >>>> https://github.com/apache/beam/pull/8363/files#diff-ba1a2aed98079ccce869cd660ca9d97dR301 >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > Yes, ListState is much less natural here. I >>>> think generally >>>> >>>>>>>> >> >>>>> >>>>> > CombiningValue is often a better replacement. >>>> E.g. the Java example >>>> >>>>>>>> >> >>>>> >>>>> > reads >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > public void processElement( >>>> >>>>>>>> >> >>>>> >>>>> > ProcessContext context, >>>> @StateId("index") ValueState<Integer> index) { >>>> >>>>>>>> >> >>>>> >>>>> > int current = firstNonNull(index.read(), >>>> 0); >>>> >>>>>>>> >> >>>>> >>>>> > context.output(KV.of(current, >>>> context.element())); >>>> >>>>>>>> >> >>>>> >>>>> > index.write(current+1); >>>> >>>>>>>> >> >>>>> >>>>> > } >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > which is replaced with bag state >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > def process(self, element, >>>> state=DoFn.StateParam(INDEX_STATE)): >>>> >>>>>>>> >> >>>>> >>>>> > next_index, = list(state.read()) or [0] >>>> >>>>>>>> >> >>>>> >>>>> > yield (element, next_index) >>>> >>>>>>>> >> >>>>> >>>>> > state.clear() >>>> >>>>>>>> >> >>>>> >>>>> > state.add(next_index + 1) >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > whereas CombiningState would be more natural >>>> (than ListState, and >>>> >>>>>>>> >> >>>>> >>>>> > arguably than even ValueState), giving >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > def process(self, element, >>>> index=DoFn.StateParam(INDEX_STATE)): >>>> >>>>>>>> >> >>>>> >>>>> > yield element, index.read() >>>> >>>>>>>> >> >>>>> >>>>> > index.add(1) >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> > >>>> >>>>>>>> >> >>>>> >>>>> >> >>>> >>>>>>>> >> >>>>> >>>>> >> -Max >>>> >>>>>>>> >> >>>>> >>>>> >> >>>> >>>>>>>> >> >>>>> >>>>> >> On 25.04.19 16:40, Robert Bradshaw wrote: >>>> >>>>>>>> >> >>>>> >>>>> >>> https://github.com/apache/beam/pull/8402 >>>> >>>>>>>> >> >>>>> >>>>> >>> >>>> >>>>>>>> >> >>>>> >>>>> >>> On Thu, Apr 25, 2019 at 4:26 PM Robert >>>> Bradshaw <[email protected]> wrote: >>>> >>>>>>>> >> >>>>> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>> Oh, this is for the indexing example. >>>> >>>>>>>> >> >>>>> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>> I actually think using CombiningState is >>>> more cleaner than ValueState. >>>> >>>>>>>> >> >>>>> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>> >>>> https://github.com/apache/beam/blob/release-2.12.0/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py#L262 >>>> >>>>>>>> >> >>>>> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>> (The fact that one must specify the >>>> accumulator coder is, however, >>>> >>>>>>>> >> >>>>> >>>>> >>>> unfortunate. We should probably infer that >>>> if we can.) >>>> >>>>>>>> >> >>>>> >>>>> >>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>> On Thu, Apr 25, 2019 at 4:19 PM Robert >>>> Bradshaw <[email protected]> wrote: >>>> >>>>>>>> >> >>>>> >>>>> >>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>> The desire was to avoid the implicit >>>> disallowed combination wart in >>>> >>>>>>>> >> >>>>> >>>>> >>>>> Python (until we could make sense of it), >>>> and also ValueState could be >>>> >>>>>>>> >> >>>>> >>>>> >>>>> surprising with respect to older values >>>> overwriting newer ones. What >>>> >>>>>>>> >> >>>>> >>>>> >>>>> was the specific example that was less >>>> natural? >>>> >>>>>>>> >> >>>>> >>>>> >>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>> On Thu, Apr 25, 2019 at 3:01 PM >>>> Maximilian Michels <[email protected]> wrote: >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> @Pablo: Thanks for following up with the >>>> PR! :) >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> @Brian: I was wondering about this as >>>> well. It makes the Python state >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> code a bit unnatural. I'd suggest to add >>>> a ValueState wrapper around >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> ListState/CombiningState. >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> @Robert: Like Reuven pointed out, we can >>>> disallow ValueState for merging >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> windows with state. >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> @Reza: Great. Let's make sure it has >>>> Python examples out of the box. >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> Either Pablo or me could help there. >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> Thanks, >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> Max >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>> On 25.04.19 04:14, Reza Ardeshir Rokni >>>> wrote: >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> Pablo, Kenneth and I have a new blog >>>> ready for publication which covers >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> how to create a "looping timer" it >>>> allows for default values to be >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> created in a window when no incoming >>>> elements exists. We just need to >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> clear a few bits before publication, >>>> but would be great to have that >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> also include a python example, I wrote >>>> it in java... >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> Cheers >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> Reza >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> On Thu, 25 Apr 2019 at 04:34, Reuven >>>> Lax <[email protected] >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> <mailto:[email protected]>> wrote: >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> Well state is still not >>>> implemented for merging windows even for >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> Java (though I believe the idea >>>> was to disallow ValueState there). >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> On Wed, Apr 24, 2019 at 1:11 PM >>>> Robert Bradshaw <[email protected] >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> <mailto:[email protected]>> >>>> wrote: >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> It was unclear what the >>>> semantics were for ValueState for merging >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> windows. (It's also a bit >>>> weird as it's inherently a race condition >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> wrt element ordering, unlike >>>> Bag and CombineState, though you can >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> always implement it as a >>>> CombineState that always returns the latest >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> value which is a bit more >>>> explicit about the dangers here.) >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> On Wed, Apr 24, 2019 at 10:08 >>>> PM Brian Hulette >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> <[email protected] <mailto: >>>> [email protected]>> wrote: >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > That's a great idea! I >>>> thought about this too after those >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> posts came up on the list >>>> recently. I started to look into it, >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> but I noticed that there's >>>> actually no implementation of >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> ValueState in userstate. Is >>>> there a reason for that? I started >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> to work on a patch to add it >>>> but I was just curious if there was >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> some reason it was omitted >>>> that I should be aware of. >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > We could certainly >>>> replicate the example without ValueState >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> by using BagState and >>>> clearing it before each write, but it >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> would be nice if we could >>>> draw a direct parallel. >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > Brian >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> > On Fri, Apr 12, 2019 at >>>> 7:05 AM Maximilian Michels >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> <[email protected] <mailto: >>>> [email protected]>> wrote: >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > It would probably be >>>> pretty easy to add the corresponding >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> code snippets to the docs as >>>> well. >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> It's probably a bit more >>>> work because there is no section >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> dedicated to >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> state/timer yet in the >>>> documentation. Tracked here: >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>> https://jira.apache.org/jira/browse/BEAM-2472 >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > I've been going over >>>> this topic a bit. I'll add the >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> snippets next week, if that's >>>> fine by y'all. >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> That would be great. The >>>> blog posts are a great way to get >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> started with >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> state/timers. >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> Thanks, >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> Max >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> On 11.04.19 20:21, Pablo >>>> Estrada wrote: >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > I've been going over >>>> this topic a bit. I'll add the >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> snippets next week, >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > if that's fine by y'all. >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > Best >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > -P. >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > On Thu, Apr 11, 2019 at >>>> 5:27 AM Robert Bradshaw >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> <[email protected] <mailto: >>>> [email protected]> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > <mailto: >>>> [email protected] <mailto:[email protected]>>> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> wrote: >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > That's a great >>>> idea! It would probably be pretty easy >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> to add the >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > corresponding code >>>> snippets to the docs as well. >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > On Thu, Apr 11, >>>> 2019 at 2:00 PM Maximilian Michels >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> <[email protected] <mailto: >>>> [email protected]> >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > <mailto: >>>> [email protected] <mailto:[email protected]>>> wrote: >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > Hi everyone, >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > The Python SDK >>>> still lacks documentation on state >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> and timers. >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > As a first step, >>>> what do you think about updating >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> these two blog >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > posts >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > with the >>>> corresponding Python code? >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>> https://beam.apache.org/blog/2017/02/13/stateful-processing.html >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>> https://beam.apache.org/blog/2017/08/28/timely-processing.html >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > Thanks, >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > > Max >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >> > >>>> >>>>>>>> >> >>>>> >>>>> >>>>>>> >>>> >>>>>>>> >> >>>>> >> >>>> >>>>>>>> >> >>>>> >> >>>> >>>>>>>> >> >>>>> >> >>>> >>>>>>>> >> >>>>> >> -- >>>> >>>>>>>> >> >>>>> >> >>>> >>>>>>>> >> >>>>> >> This email may be confidential and privileged. If >>>> you received this communication by mistake, please don't forward it to >>>> anyone else, please erase all copies and attachments, and please let me >>>> know that it has gone to the wrong person. >>>> >>>>>>>> >> >>>>> >> >>>> >>>>>>>> >> >>>>> >> The above terms reflect a potential business >>>> arrangement, are provided solely as a basis for further discussion, and are >>>> not intended to be and do not constitute a legally binding obligation. No >>>> legally binding obligations will be created, implied, or inferred until an >>>> agreement in final form is executed in writing by all parties involved. >>>> >>>>>>>> >> > >>>> >>>>>>>> >> > >>>> >>>>>>>> >> > >>>> >>>>>>>> >> > -- >>>> >>>>>>>> >> > >>>> >>>>>>>> >> > This email may be confidential and privileged. If you >>>> received this communication by mistake, please don't forward it to anyone >>>> else, please erase all copies and attachments, and please let me know that >>>> it has gone to the wrong person. >>>> >>>>>>>> >> > >>>> >>>>>>>> >> > The above terms reflect a potential business arrangement, >>>> are provided solely as a basis for further discussion, and are not intended >>>> to be and do not constitute a legally binding obligation. No legally >>>> binding obligations will be created, implied, or inferred until an >>>> agreement in final form is executed in writing by all parties involved. >>>> >>> >> >> -- >> >> This email may be confidential and privileged. If you received this >> communication by mistake, please don't forward it to anyone else, please >> erase all copies and attachments, and please let me know that it has gone >> to the wrong person. >> >> The above terms reflect a potential business arrangement, are provided >> solely as a basis for further discussion, and are not intended to be and do >> not constitute a legally binding obligation. No legally binding obligations >> will be created, implied, or inferred until an agreement in final form is >> executed in writing by all parties involved. >> >
