It sounds like consensus is to do this in finishBundle, but require
specifying output timestamps. I'll alter the PR appropriately.

Reuven

On Mon, May 11, 2020 at 5:06 PM Robert Bradshaw <rober...@google.com> wrote:

> StartBundle pre-dated setUp, which makes it less useful than before. With
> DoFn re-use, however, startBundle can be used to ensure the DoFn is
> instantiated to a clean state.
>
> On Thu, May 7, 2020 at 2:03 PM Reuven Lax <re...@google.com> wrote:
>
>> I think startBundle is useful for convenience and performance, but not
>> necessarily needed semantically (as Kenn said, you could write your
>> pipeline without startBundle). finishBundle has a stronger semantic
>> meaning when interpreted as a way of finalizing elements.
>>
>> On Thu, May 7, 2020 at 2:00 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> Start bundle is useful since the framework provides the necessary
>>> synchronization while using lazy init requires you to write it yourself and
>>> also pay for it on each process element call.
>>>
>>> On Wed, May 6, 2020 at 8:46 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> This is a great idea. I thought that (long ago) we decided not to
>>>> execute finishBundle per window for these reasons: (1) perf fears (2)
>>>> naming bikeshed (3) backwards compatibility (even though we hadn't
>>>> stabilized, it is pervasive). That was before the annotation-driven DoFn I
>>>> believe, so we didn't have the ability to do it this way. Now this seems
>>>> like a clear win.
>>>>
>>>> Regarding @StartBundle: I always use and advise others to use lazy init
>>>> instead of @StartBundle and to think of @FinishBundle as "flush".
>>>> State-sensitive APIs like "start(); process(); finish()" are usually an
>>>> anti-pattern since you can almost always write them in a less dangerous way
>>>> (try-with-resources, Python context managers, etc). Conveniently, this
>>>> eliminates any consideration of symmetry. Can anyone refresh me on
>>>> when/whether it is important to have @StartBundle instead of running the
>>>> same code via lazy init?
>>>>
>>>>
>>>> On Tue, May 5, 2020 at 3:16 PM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> On Tue, May 5, 2020 at 3:08 PM Reuven Lax <re...@google.com> wrote:
>>>>> >
>>>>> > On Tue, May 5, 2020 at 2:58 PM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>> >>
>>>>> >> On Mon, May 4, 2020 at 11:08 AM Reuven Lax <re...@google.com>
>>>>> wrote:
>>>>> >> >
>>>>> >> > This should not affect the ability of the user to specify the
>>>>> output timestamp.
>>>>> >>
>>>>> >> My question was whether we would require it.
>>>>> >
>>>>> >
>>>>> > My current PR does not - it defaults to end-of-window as the
>>>>> timestamp. However we could also decide to require it.
>>>>>
>>>>> I'd be more comfortable requiring it for the time being.
>>>>>
>>>>
>>>> +1 for requiring it
>>>>
>>>> Kenn
>>>>
>>>> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <je...@seznam.cz>
>>>>> wrote:
>>>>> >> >
>>>>> >> > There was a mention in some other thread, that in order to make
>>>>> user experience as predictable as possible, we should try to make windows
>>>>> idempotent, and once window is assigned, it should be never changed (and
>>>>> timestamp move outside of the scope of window, unless a different windowfn
>>>>> is applied). Because all Beam window functions are actually time based, 
>>>>> and
>>>>> output timestamp is known, what is the issue of applying windowfn to
>>>>> elements output from @FinishBundle and assign the windows automatically?
>>>>> >>
>>>>> >> We used to do exactly this. (I don't recall why it was removed.) If
>>>>> >> the input element and/or window was queried by the WindowFn
>>>>> >> (admittedly rare), it would fail at runtime.
>>>>> >
>>>>> > When did we used to do this? We've had users writing WindowFns that
>>>>> queried the input element since long before Beam existed.  e.g a window fn
>>>>> that inspected a userId field, and created different sized windows based 
>>>>> on
>>>>> the userId.
>>>>>
>>>>> This is how it started. In particular WindowFn.AssignContext would be
>>>>> created that through an exception on accessing the unavailable fields
>>>>> (which would make finalize bundle unsuitable for such WindowFns).
>>>>>
>>>>
>>>> Yea this was not great. This also broke the equivalence between
>>>> WithKeys and AssignWindows. It was really a workaround for the lack of the
>>>> feature Reuven is proposing.
>>>>
>>>>
>>>>
>>>>> >> On Tue, May 5, 2020 at 2:51 PM Reuven Lax <re...@google.com> wrote:
>>>>> >> >
>>>>> >> > It's a good question about startBundle - it's something I thought
>>>>> about. The problem is that a runner doesn't always know at startBundle 
>>>>> what
>>>>> windows are in the bundle, and even if it does know it might require the
>>>>> runner to run two passes over the bundle to figure this out. Alternatively
>>>>> the runner could keep calling startBundle the first time it's seen a new
>>>>> window in the bundle, but I think that makes things even weirder. It's 
>>>>> also
>>>>> worth noting that startBundle is already more limited today - we do not
>>>>> support calling output from startBundle, but we do allow calling output
>>>>> from finishBundle.
>>>>> >> >
>>>>> >> > Reuven
>>>>> >> >
>>>>> >> > On Mon, May 4, 2020 at 11:59 PM Jan Lukavský <je...@seznam.cz>
>>>>> wrote:
>>>>> >> >>
>>>>> >> >> Ah, interesting. That makes windowFn non-idempotent by
>>>>> definition, because its first application (e.g. global window -> interval
>>>>> window) _might_ yield different result than second application with
>>>>> interval window already assigned. On the other hand, let's suppose for a
>>>>> moment we can make windowFn idempotent, would that solve the issue of
>>>>> window assignment for elements output from finishBundle? I understand that
>>>>> window assignment is not only motivation for adding optional window
>>>>> parameter to @FinishBundle, but users might be confused why OutputReceiver
>>>>> is working only when there is Window parameter. It would be nice to have
>>>>> this somewhat more "consistent". And last note - adding the parameter to
>>>>> @FinishBundle seems a little imbalanced - could this be made possible for
>>>>> @StartBundle as well? Should we enforce that both @StartBundle and
>>>>> @FinishBundle have the same signature, or should we accept all 
>>>>> combinations?
>>>>> >> >>
>>>>> >> >> Jan
>>>>> >> >>
>>>>> >> >> On 5/4/20 11:02 PM, Reuven Lax wrote:
>>>>> >> >>
>>>>> >> >> I assume you are referring to elements output from finishBundle.
>>>>> >> >>
>>>>> >> >> The problem is that the current window is an input to
>>>>> WindowFn.assignWindows. The new window can depend on the timestamp, the
>>>>> element itself, and the original window. I'm not sure how many users rely
>>>>> on this, however it has been part of our public windowing API for a long
>>>>> time, so I would guess that some users do use this in their WindowFns.
>>>>> >> >>
>>>>> >> >> Reuven
>>>>> >> >>
>>>>> >> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <je...@seznam.cz>
>>>>> wrote:
>>>>> >> >>>
>>>>> >> >>> There was a mention in some other thread, that in order to make
>>>>> user experience as predictable as possible, we should try to make windows
>>>>> idempotent, and once window is assigned, it should be never changed (and
>>>>> timestamp move outside of the scope of window, unless a different windowfn
>>>>> is applied). Because all Beam window functions are actually time based, 
>>>>> and
>>>>> output timestamp is known, what is the issue of applying windowfn to
>>>>> elements output from @FinishBundle and assign the windows automatically?
>>>>> >> >>>
>>>>> >> >>> On 5/4/20 8:07 PM, Reuven Lax wrote:
>>>>> >> >>>
>>>>> >> >>> This should not affect the ability of the user to specify the
>>>>> output timestamp. Today FinishBundleContext.output forces you to specify
>>>>> the window as well as the timestamp, which is a bit awkward. (I believe
>>>>> that it also lets you create brand new windows in finishBundle, which is
>>>>> interesting, but I'm not quite sure of the use case).
>>>>> >> >>>
>>>>> >> >>> On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw <
>>>>> rober...@google.com> wrote:
>>>>> >> >>>>
>>>>> >> >>>> This is a really nice idea. Would the user still need to
>>>>> specify the
>>>>> >> >>>> timestamp of the output? I'm a bit ambivalent about calling it
>>>>> >> >>>> multiple times if OuptutReceiver alone is in the parameter
>>>>> list; this
>>>>> >> >>>> might not be obvious and could be surprising behavior.
>>>>> >> >>>>
>>>>> >> >>>> On Mon, May 4, 2020 at 10:13 AM Reuven Lax <re...@google.com>
>>>>> wrote:
>>>>> >> >>>> >
>>>>> >> >>>> > I would like to discuss a minor extension to the Beam model.
>>>>> >> >>>> >
>>>>> >> >>>> > Beam bundles have very few restrictions on what can be in a
>>>>> bundle, in particular s bundle might contain records for many different
>>>>> windows. This was an explicit decision as bundling primarily exists for
>>>>> performance reasons and we found that limiting bundling based on windows 
>>>>> or
>>>>> timestamps often led to severe performance problems. However it sometimes
>>>>> makes finishBundle hard to use.
>>>>> >> >>>> >
>>>>> >> >>>> > I've seen multiple cases where users maintain some state in
>>>>> their DoFn that needs finalizing (e.g. writing to an external service) in
>>>>> finishBundle. Often users end up keeping lists of all windows seen in the
>>>>> bundle so they can be processed separately (or sometimes not realizing 
>>>>> that
>>>>> their can be multiple windows and writing incorrect code).
>>>>> >> >>>> >
>>>>> >> >>>> > The lack of a window also means that we don't currently
>>>>> support injecting an OuptutReceiver into finishBundle, as there's no good
>>>>> way of knowing which window output should be put into.
>>>>> >> >>>> >
>>>>> >> >>>> > I would like to propose adding a way for finishBundle to
>>>>> inspect the window, either directly (via a BoundedWindow parameter) or
>>>>> indirectly (via an OutputReceiver parameter). In this case, we will 
>>>>> execute
>>>>> finishBundle once per window in the bundle. Otherwise, we will execute
>>>>> finishBundle once at the end of the bundle as before. This behavior is
>>>>> backwards compatible, as previously these parameters were disallowed in
>>>>> finishBundle.
>>>>> >> >>>> >
>>>>> >> >>>> > Note that this is similar to something Beam already does in
>>>>> processElement. A single element can exist in multiple windows, however if
>>>>> the processElement "observes" the window then Beam will call 
>>>>> processElement
>>>>> once per window.
>>>>> >> >>>> >
>>>>> >> >>>> > In Java, the user code could look like this:
>>>>> >> >>>> >
>>>>> >> >>>> > DoFn<> {
>>>>> >> >>>> >      ...
>>>>> >> >>>> >    @FinishBundle
>>>>> >> >>>> >    public void finishBundle(IntervalWindow window,
>>>>> OutputReceiver<T> o) {
>>>>> >> >>>> >        // This finishBundle will be called once per window
>>>>> in the bundle since it has
>>>>> >> >>>> >       // a parameter that observes the window.
>>>>> >> >>>> >    }
>>>>> >> >>>> > }
>>>>> >> >>>> >
>>>>> >> >>>> > This PR shows an implementation of this extension for the
>>>>> Java SDK.
>>>>> >> >>>> >
>>>>> >> >>>> > Thoughts?
>>>>> >> >>>> >
>>>>> >> >>>> > Reuven
>>>>>
>>>>

Reply via email to