It sounds like consensus is to do this in finishBundle, but require specifying output timestamps. I'll alter the PR appropriately.
Reuven On Mon, May 11, 2020 at 5:06 PM Robert Bradshaw <rober...@google.com> wrote: > StartBundle pre-dated setUp, which makes it less useful than before. With > DoFn re-use, however, startBundle can be used to ensure the DoFn is > instantiated to a clean state. > > On Thu, May 7, 2020 at 2:03 PM Reuven Lax <re...@google.com> wrote: > >> I think startBundle is useful for convenience and performance, but not >> necessarily needed semantically (as Kenn said, you could write your >> pipeline without startBundle). finishBundle has a stronger semantic >> meaning when interpreted as a way of finalizing elements. >> >> On Thu, May 7, 2020 at 2:00 PM Luke Cwik <lc...@google.com> wrote: >> >>> Start bundle is useful since the framework provides the necessary >>> synchronization while using lazy init requires you to write it yourself and >>> also pay for it on each process element call. >>> >>> On Wed, May 6, 2020 at 8:46 AM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> This is a great idea. I thought that (long ago) we decided not to >>>> execute finishBundle per window for these reasons: (1) perf fears (2) >>>> naming bikeshed (3) backwards compatibility (even though we hadn't >>>> stabilized, it is pervasive). That was before the annotation-driven DoFn I >>>> believe, so we didn't have the ability to do it this way. Now this seems >>>> like a clear win. >>>> >>>> Regarding @StartBundle: I always use and advise others to use lazy init >>>> instead of @StartBundle and to think of @FinishBundle as "flush". >>>> State-sensitive APIs like "start(); process(); finish()" are usually an >>>> anti-pattern since you can almost always write them in a less dangerous way >>>> (try-with-resources, Python context managers, etc). Conveniently, this >>>> eliminates any consideration of symmetry. Can anyone refresh me on >>>> when/whether it is important to have @StartBundle instead of running the >>>> same code via lazy init? >>>> >>>> >>>> On Tue, May 5, 2020 at 3:16 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> On Tue, May 5, 2020 at 3:08 PM Reuven Lax <re...@google.com> wrote: >>>>> > >>>>> > On Tue, May 5, 2020 at 2:58 PM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> >> >>>>> >> On Mon, May 4, 2020 at 11:08 AM Reuven Lax <re...@google.com> >>>>> wrote: >>>>> >> > >>>>> >> > This should not affect the ability of the user to specify the >>>>> output timestamp. >>>>> >> >>>>> >> My question was whether we would require it. >>>>> > >>>>> > >>>>> > My current PR does not - it defaults to end-of-window as the >>>>> timestamp. However we could also decide to require it. >>>>> >>>>> I'd be more comfortable requiring it for the time being. >>>>> >>>> >>>> +1 for requiring it >>>> >>>> Kenn >>>> >>>> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <je...@seznam.cz> >>>>> wrote: >>>>> >> > >>>>> >> > There was a mention in some other thread, that in order to make >>>>> user experience as predictable as possible, we should try to make windows >>>>> idempotent, and once window is assigned, it should be never changed (and >>>>> timestamp move outside of the scope of window, unless a different windowfn >>>>> is applied). Because all Beam window functions are actually time based, >>>>> and >>>>> output timestamp is known, what is the issue of applying windowfn to >>>>> elements output from @FinishBundle and assign the windows automatically? >>>>> >> >>>>> >> We used to do exactly this. (I don't recall why it was removed.) If >>>>> >> the input element and/or window was queried by the WindowFn >>>>> >> (admittedly rare), it would fail at runtime. >>>>> > >>>>> > When did we used to do this? We've had users writing WindowFns that >>>>> queried the input element since long before Beam existed. e.g a window fn >>>>> that inspected a userId field, and created different sized windows based >>>>> on >>>>> the userId. >>>>> >>>>> This is how it started. In particular WindowFn.AssignContext would be >>>>> created that through an exception on accessing the unavailable fields >>>>> (which would make finalize bundle unsuitable for such WindowFns). >>>>> >>>> >>>> Yea this was not great. This also broke the equivalence between >>>> WithKeys and AssignWindows. It was really a workaround for the lack of the >>>> feature Reuven is proposing. >>>> >>>> >>>> >>>>> >> On Tue, May 5, 2020 at 2:51 PM Reuven Lax <re...@google.com> wrote: >>>>> >> > >>>>> >> > It's a good question about startBundle - it's something I thought >>>>> about. The problem is that a runner doesn't always know at startBundle >>>>> what >>>>> windows are in the bundle, and even if it does know it might require the >>>>> runner to run two passes over the bundle to figure this out. Alternatively >>>>> the runner could keep calling startBundle the first time it's seen a new >>>>> window in the bundle, but I think that makes things even weirder. It's >>>>> also >>>>> worth noting that startBundle is already more limited today - we do not >>>>> support calling output from startBundle, but we do allow calling output >>>>> from finishBundle. >>>>> >> > >>>>> >> > Reuven >>>>> >> > >>>>> >> > On Mon, May 4, 2020 at 11:59 PM Jan Lukavský <je...@seznam.cz> >>>>> wrote: >>>>> >> >> >>>>> >> >> Ah, interesting. That makes windowFn non-idempotent by >>>>> definition, because its first application (e.g. global window -> interval >>>>> window) _might_ yield different result than second application with >>>>> interval window already assigned. On the other hand, let's suppose for a >>>>> moment we can make windowFn idempotent, would that solve the issue of >>>>> window assignment for elements output from finishBundle? I understand that >>>>> window assignment is not only motivation for adding optional window >>>>> parameter to @FinishBundle, but users might be confused why OutputReceiver >>>>> is working only when there is Window parameter. It would be nice to have >>>>> this somewhat more "consistent". And last note - adding the parameter to >>>>> @FinishBundle seems a little imbalanced - could this be made possible for >>>>> @StartBundle as well? Should we enforce that both @StartBundle and >>>>> @FinishBundle have the same signature, or should we accept all >>>>> combinations? >>>>> >> >> >>>>> >> >> Jan >>>>> >> >> >>>>> >> >> On 5/4/20 11:02 PM, Reuven Lax wrote: >>>>> >> >> >>>>> >> >> I assume you are referring to elements output from finishBundle. >>>>> >> >> >>>>> >> >> The problem is that the current window is an input to >>>>> WindowFn.assignWindows. The new window can depend on the timestamp, the >>>>> element itself, and the original window. I'm not sure how many users rely >>>>> on this, however it has been part of our public windowing API for a long >>>>> time, so I would guess that some users do use this in their WindowFns. >>>>> >> >> >>>>> >> >> Reuven >>>>> >> >> >>>>> >> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <je...@seznam.cz> >>>>> wrote: >>>>> >> >>> >>>>> >> >>> There was a mention in some other thread, that in order to make >>>>> user experience as predictable as possible, we should try to make windows >>>>> idempotent, and once window is assigned, it should be never changed (and >>>>> timestamp move outside of the scope of window, unless a different windowfn >>>>> is applied). Because all Beam window functions are actually time based, >>>>> and >>>>> output timestamp is known, what is the issue of applying windowfn to >>>>> elements output from @FinishBundle and assign the windows automatically? >>>>> >> >>> >>>>> >> >>> On 5/4/20 8:07 PM, Reuven Lax wrote: >>>>> >> >>> >>>>> >> >>> This should not affect the ability of the user to specify the >>>>> output timestamp. Today FinishBundleContext.output forces you to specify >>>>> the window as well as the timestamp, which is a bit awkward. (I believe >>>>> that it also lets you create brand new windows in finishBundle, which is >>>>> interesting, but I'm not quite sure of the use case). >>>>> >> >>> >>>>> >> >>> On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw < >>>>> rober...@google.com> wrote: >>>>> >> >>>> >>>>> >> >>>> This is a really nice idea. Would the user still need to >>>>> specify the >>>>> >> >>>> timestamp of the output? I'm a bit ambivalent about calling it >>>>> >> >>>> multiple times if OuptutReceiver alone is in the parameter >>>>> list; this >>>>> >> >>>> might not be obvious and could be surprising behavior. >>>>> >> >>>> >>>>> >> >>>> On Mon, May 4, 2020 at 10:13 AM Reuven Lax <re...@google.com> >>>>> wrote: >>>>> >> >>>> > >>>>> >> >>>> > I would like to discuss a minor extension to the Beam model. >>>>> >> >>>> > >>>>> >> >>>> > Beam bundles have very few restrictions on what can be in a >>>>> bundle, in particular s bundle might contain records for many different >>>>> windows. This was an explicit decision as bundling primarily exists for >>>>> performance reasons and we found that limiting bundling based on windows >>>>> or >>>>> timestamps often led to severe performance problems. However it sometimes >>>>> makes finishBundle hard to use. >>>>> >> >>>> > >>>>> >> >>>> > I've seen multiple cases where users maintain some state in >>>>> their DoFn that needs finalizing (e.g. writing to an external service) in >>>>> finishBundle. Often users end up keeping lists of all windows seen in the >>>>> bundle so they can be processed separately (or sometimes not realizing >>>>> that >>>>> their can be multiple windows and writing incorrect code). >>>>> >> >>>> > >>>>> >> >>>> > The lack of a window also means that we don't currently >>>>> support injecting an OuptutReceiver into finishBundle, as there's no good >>>>> way of knowing which window output should be put into. >>>>> >> >>>> > >>>>> >> >>>> > I would like to propose adding a way for finishBundle to >>>>> inspect the window, either directly (via a BoundedWindow parameter) or >>>>> indirectly (via an OutputReceiver parameter). In this case, we will >>>>> execute >>>>> finishBundle once per window in the bundle. Otherwise, we will execute >>>>> finishBundle once at the end of the bundle as before. This behavior is >>>>> backwards compatible, as previously these parameters were disallowed in >>>>> finishBundle. >>>>> >> >>>> > >>>>> >> >>>> > Note that this is similar to something Beam already does in >>>>> processElement. A single element can exist in multiple windows, however if >>>>> the processElement "observes" the window then Beam will call >>>>> processElement >>>>> once per window. >>>>> >> >>>> > >>>>> >> >>>> > In Java, the user code could look like this: >>>>> >> >>>> > >>>>> >> >>>> > DoFn<> { >>>>> >> >>>> > ... >>>>> >> >>>> > @FinishBundle >>>>> >> >>>> > public void finishBundle(IntervalWindow window, >>>>> OutputReceiver<T> o) { >>>>> >> >>>> > // This finishBundle will be called once per window >>>>> in the bundle since it has >>>>> >> >>>> > // a parameter that observes the window. >>>>> >> >>>> > } >>>>> >> >>>> > } >>>>> >> >>>> > >>>>> >> >>>> > This PR shows an implementation of this extension for the >>>>> Java SDK. >>>>> >> >>>> > >>>>> >> >>>> > Thoughts? >>>>> >> >>>> > >>>>> >> >>>> > Reuven >>>>> >>>>