I think startBundle is useful for convenience and performance, but not necessarily needed semantically (as Kenn said, you could write your pipeline without startBundle). finishBundle has a stronger semantic meaning when interpreted as a way of finalizing elements.
On Thu, May 7, 2020 at 2:00 PM Luke Cwik <lc...@google.com> wrote: > Start bundle is useful since the framework provides the necessary > synchronization while using lazy init requires you to write it yourself and > also pay for it on each process element call. > > On Wed, May 6, 2020 at 8:46 AM Kenneth Knowles <k...@apache.org> wrote: > >> This is a great idea. I thought that (long ago) we decided not to execute >> finishBundle per window for these reasons: (1) perf fears (2) naming >> bikeshed (3) backwards compatibility (even though we hadn't stabilized, it >> is pervasive). That was before the annotation-driven DoFn I believe, so we >> didn't have the ability to do it this way. Now this seems like a clear win. >> >> Regarding @StartBundle: I always use and advise others to use lazy init >> instead of @StartBundle and to think of @FinishBundle as "flush". >> State-sensitive APIs like "start(); process(); finish()" are usually an >> anti-pattern since you can almost always write them in a less dangerous way >> (try-with-resources, Python context managers, etc). Conveniently, this >> eliminates any consideration of symmetry. Can anyone refresh me on >> when/whether it is important to have @StartBundle instead of running the >> same code via lazy init? >> >> >> On Tue, May 5, 2020 at 3:16 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> On Tue, May 5, 2020 at 3:08 PM Reuven Lax <re...@google.com> wrote: >>> > >>> > On Tue, May 5, 2020 at 2:58 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >> >>> >> On Mon, May 4, 2020 at 11:08 AM Reuven Lax <re...@google.com> wrote: >>> >> > >>> >> > This should not affect the ability of the user to specify the >>> output timestamp. >>> >> >>> >> My question was whether we would require it. >>> > >>> > >>> > My current PR does not - it defaults to end-of-window as the >>> timestamp. However we could also decide to require it. >>> >>> I'd be more comfortable requiring it for the time being. >>> >> >> +1 for requiring it >> >> Kenn >> >> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >> > >>> >> > There was a mention in some other thread, that in order to make >>> user experience as predictable as possible, we should try to make windows >>> idempotent, and once window is assigned, it should be never changed (and >>> timestamp move outside of the scope of window, unless a different windowfn >>> is applied). Because all Beam window functions are actually time based, and >>> output timestamp is known, what is the issue of applying windowfn to >>> elements output from @FinishBundle and assign the windows automatically? >>> >> >>> >> We used to do exactly this. (I don't recall why it was removed.) If >>> >> the input element and/or window was queried by the WindowFn >>> >> (admittedly rare), it would fail at runtime. >>> > >>> > When did we used to do this? We've had users writing WindowFns that >>> queried the input element since long before Beam existed. e.g a window fn >>> that inspected a userId field, and created different sized windows based on >>> the userId. >>> >>> This is how it started. In particular WindowFn.AssignContext would be >>> created that through an exception on accessing the unavailable fields >>> (which would make finalize bundle unsuitable for such WindowFns). >>> >> >> Yea this was not great. This also broke the equivalence between WithKeys >> and AssignWindows. It was really a workaround for the lack of the feature >> Reuven is proposing. >> >> >> >>> >> On Tue, May 5, 2020 at 2:51 PM Reuven Lax <re...@google.com> wrote: >>> >> > >>> >> > It's a good question about startBundle - it's something I thought >>> about. The problem is that a runner doesn't always know at startBundle what >>> windows are in the bundle, and even if it does know it might require the >>> runner to run two passes over the bundle to figure this out. Alternatively >>> the runner could keep calling startBundle the first time it's seen a new >>> window in the bundle, but I think that makes things even weirder. It's also >>> worth noting that startBundle is already more limited today - we do not >>> support calling output from startBundle, but we do allow calling output >>> from finishBundle. >>> >> > >>> >> > Reuven >>> >> > >>> >> > On Mon, May 4, 2020 at 11:59 PM Jan Lukavský <je...@seznam.cz> >>> wrote: >>> >> >> >>> >> >> Ah, interesting. That makes windowFn non-idempotent by definition, >>> because its first application (e.g. global window -> interval window) >>> _might_ yield different result than second application with interval window >>> already assigned. On the other hand, let's suppose for a moment we can make >>> windowFn idempotent, would that solve the issue of window assignment for >>> elements output from finishBundle? I understand that window assignment is >>> not only motivation for adding optional window parameter to @FinishBundle, >>> but users might be confused why OutputReceiver is working only when there >>> is Window parameter. It would be nice to have this somewhat more >>> "consistent". And last note - adding the parameter to @FinishBundle seems a >>> little imbalanced - could this be made possible for @StartBundle as well? >>> Should we enforce that both @StartBundle and @FinishBundle have the same >>> signature, or should we accept all combinations? >>> >> >> >>> >> >> Jan >>> >> >> >>> >> >> On 5/4/20 11:02 PM, Reuven Lax wrote: >>> >> >> >>> >> >> I assume you are referring to elements output from finishBundle. >>> >> >> >>> >> >> The problem is that the current window is an input to >>> WindowFn.assignWindows. The new window can depend on the timestamp, the >>> element itself, and the original window. I'm not sure how many users rely >>> on this, however it has been part of our public windowing API for a long >>> time, so I would guess that some users do use this in their WindowFns. >>> >> >> >>> >> >> Reuven >>> >> >> >>> >> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <je...@seznam.cz> >>> wrote: >>> >> >>> >>> >> >>> There was a mention in some other thread, that in order to make >>> user experience as predictable as possible, we should try to make windows >>> idempotent, and once window is assigned, it should be never changed (and >>> timestamp move outside of the scope of window, unless a different windowfn >>> is applied). Because all Beam window functions are actually time based, and >>> output timestamp is known, what is the issue of applying windowfn to >>> elements output from @FinishBundle and assign the windows automatically? >>> >> >>> >>> >> >>> On 5/4/20 8:07 PM, Reuven Lax wrote: >>> >> >>> >>> >> >>> This should not affect the ability of the user to specify the >>> output timestamp. Today FinishBundleContext.output forces you to specify >>> the window as well as the timestamp, which is a bit awkward. (I believe >>> that it also lets you create brand new windows in finishBundle, which is >>> interesting, but I'm not quite sure of the use case). >>> >> >>> >>> >> >>> On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw < >>> rober...@google.com> wrote: >>> >> >>>> >>> >> >>>> This is a really nice idea. Would the user still need to specify >>> the >>> >> >>>> timestamp of the output? I'm a bit ambivalent about calling it >>> >> >>>> multiple times if OuptutReceiver alone is in the parameter list; >>> this >>> >> >>>> might not be obvious and could be surprising behavior. >>> >> >>>> >>> >> >>>> On Mon, May 4, 2020 at 10:13 AM Reuven Lax <re...@google.com> >>> wrote: >>> >> >>>> > >>> >> >>>> > I would like to discuss a minor extension to the Beam model. >>> >> >>>> > >>> >> >>>> > Beam bundles have very few restrictions on what can be in a >>> bundle, in particular s bundle might contain records for many different >>> windows. This was an explicit decision as bundling primarily exists for >>> performance reasons and we found that limiting bundling based on windows or >>> timestamps often led to severe performance problems. However it sometimes >>> makes finishBundle hard to use. >>> >> >>>> > >>> >> >>>> > I've seen multiple cases where users maintain some state in >>> their DoFn that needs finalizing (e.g. writing to an external service) in >>> finishBundle. Often users end up keeping lists of all windows seen in the >>> bundle so they can be processed separately (or sometimes not realizing that >>> their can be multiple windows and writing incorrect code). >>> >> >>>> > >>> >> >>>> > The lack of a window also means that we don't currently >>> support injecting an OuptutReceiver into finishBundle, as there's no good >>> way of knowing which window output should be put into. >>> >> >>>> > >>> >> >>>> > I would like to propose adding a way for finishBundle to >>> inspect the window, either directly (via a BoundedWindow parameter) or >>> indirectly (via an OutputReceiver parameter). In this case, we will execute >>> finishBundle once per window in the bundle. Otherwise, we will execute >>> finishBundle once at the end of the bundle as before. This behavior is >>> backwards compatible, as previously these parameters were disallowed in >>> finishBundle. >>> >> >>>> > >>> >> >>>> > Note that this is similar to something Beam already does in >>> processElement. A single element can exist in multiple windows, however if >>> the processElement "observes" the window then Beam will call processElement >>> once per window. >>> >> >>>> > >>> >> >>>> > In Java, the user code could look like this: >>> >> >>>> > >>> >> >>>> > DoFn<> { >>> >> >>>> > ... >>> >> >>>> > @FinishBundle >>> >> >>>> > public void finishBundle(IntervalWindow window, >>> OutputReceiver<T> o) { >>> >> >>>> > // This finishBundle will be called once per window in >>> the bundle since it has >>> >> >>>> > // a parameter that observes the window. >>> >> >>>> > } >>> >> >>>> > } >>> >> >>>> > >>> >> >>>> > This PR shows an implementation of this extension for the Java >>> SDK. >>> >> >>>> > >>> >> >>>> > Thoughts? >>> >> >>>> > >>> >> >>>> > Reuven >>> >>