-user@ since this is pretty far afield

On Tue, Apr 30, 2019 at 4:22 PM Robert Bradshaw <[email protected]> wrote:

> In the original version of the dataflow model, windowing was not
> annotated on each PCollection, rather it was inferred based on tracing
> up the graph to the latest WindowInto operation. This tracing logic
> was put in the SDK for simplicity.
>

Had a discussion today where I explained that "the model" in the absence of
the protobufs and formal protocols was an informal agreement about what
programming should more or less feel like. It is still relevant, for sure.
At the level of protobuf, I think we've made some good decisions, though.

 I agree that there is room for a variety of SDK/DSL choices, but would
> strongly argue that for SDKs that implicitly specify triggering, the
> rules should be consistent and defined by the model.


Agree, to the extent that some core SDKs are presented as analogous to each
other. So I have now lost track of what is the exact mismatch between
Python and Java. Apologies for leading us down this track when the real
issue is that a user cannot find resources to understand downstream
continuation triggers (in any language, really). It would be good to
clearly document the status quo. This would be an opportunity to make
explicit the reasoning behind the particular choices, namely that a
completeness-based trigger aims to have a similar level of completeness in
the continuation trigger, while a completeness-unaware trigger tends to
default to "always" downstream.

*For other DSLs/SDKs a major reason to *not* define them implicitly in the
(protocol level) model is that they aren't that well figured out and we
want new abstractions layers to have an easy time explicitly controlling
them.

Likewise, I see sink triggers, once we figure them out, as semantic
> definitions belonging to the model (with likely some flexibility in
> implementation), not a choice each SDK should make on its own (though
> some may be able to declare/support them sooner than others).
>

Agree. Sink triggers are, by definition, not possible to specify elsewhere
than sinks. OTOH today's triggers fundamentally belong to aggregation steps.


> On Tue, Apr 30, 2019 at 6:24 PM Kenneth Knowles <[email protected]> wrote:
>
> > To go in the direction of consistency amongst the core SDKs, we could
> make all triggers downstream of an initial GBK use the "repeat(always)"
> trigger. I think we've discussed and this is simpler and more reliable than
> today's continuation trigger, while keeping its intent.
>
> Well, the default, after watermark trigger probably shouldn't become
> repeat(always).
>

Ah, good point of course. This isn't the first time I've made this mistake,
fixating on non-default triggers like count and processing time where our
continuation triggers are pretty arbitrary and the heuristic explanation of
"let it flow" is the guideline. Some day I'll remember to separate the two
cases.

Kenn



> On Tue, Apr 30, 2019 at 2:41 AM Maximilian Michels <[email protected]> wrote:
> >>
> >> While it might be debatable whether "continuation triggers" are part of
> >> the model, the goal should be to provide a consistent experience across
> >> SDKs. I don't see a reason why the Java SDK would use continuation
> >> triggers while the Python SDK doesn't.
> >>
> >> This makes me think that trigger behavior across transforms should
> >> actually be part of the model. Or at least be standardized for SDK
> >> authors. This would also imply that it is documented for end users.
> >>
> >> In the end, users do not care about whether it's part of the model or
> >> not, but they like having no surprises :)
> >>
> >> On 29.04.19 09:20, Robert Bradshaw wrote:
> >> > I would say that the triggering done in stacked GBKs, with windowings
> >> > in between, is part of the model (at least in the sense that it's not
> >> > something that we'd want different SDKs to do separately.)
> >> >
> >> > OTOH, I'm not sure the continuation trigger should be part of the
> >> > model. Much easier to either let WindowInto with no trigger specified
> >> > either keep the existing one or reset it to the default. A runner can
> >> > mutate this to a continuation trigger under the hood, which should be
> >> > strictly looser (triggers are a promise about the earliest possible
> >> > firing, they don't force firings to happen).
> >> >
> >> > On Mon, Apr 29, 2019 at 4:34 AM Kenneth Knowles <[email protected]>
> wrote:
> >> >>
> >> >> It is accurate to say that the "continuation trigger" is not
> documented in the general programming guide. It shows up in the javadoc
> only, as far as I can tell [1]. Technically, this is accurate. It is not
> part of the core of Beam - each language SDK is required to explicitly
> specify a trigger for every GroupByKey when they submit a pipeline to a
> runner. But, of course, this is pretty much an implementation detail.
> >> >>
> >> >> Kenn
> >> >>
> >> >> [1] https://www.google.com/search?q="continuation+trigger"+site%
> 3Abeam.apache.org
> >> >>
> >> >> On Sun, Apr 28, 2019 at 7:08 PM Reza Rokni <[email protected]> wrote:
> >> >>>
> >> >>> +1 I recall a fun afternoon a few years ago figuring this out ...
> >> >>>
> >> >>> On Mon, 11 Mar 2019 at 18:36, Maximilian Michels <[email protected]>
> wrote:
> >> >>>>
> >> >>>> Hi,
> >> >>>>
> >> >>>> I have seen several users including myself get confused by the
> "default"
> >> >>>> triggering behavior. I think it would be worthwhile to update the
> docs.
> >> >>>>
> >> >>>> In fact, Window.into(windowFn) does not override the existing
> >> >>>> windowing/triggering. It merges the previous input WindowStrategy
> with
> >> >>>> the new one.
> >> >>>>
> >> >>>> So your w1trigger will still be set when you do not set w2trigger.
> The
> >> >>>> default `AfterWatermark.pastEndOfWindow()` trigger will only be
> used
> >> >>>> when windowing for the first time, or when you set it explicitly.
> >> >>>>
> >> >>>> Thanks,
> >> >>>> Max
> >> >>>>
> >> >>>> On 06.03.19 00:28, Daniel Debrunner wrote:
> >> >>>>> Thanks Kenn,.
> >> >>>>>
> >> >>>>> Is it fair to say that this continuation trigger functionality is
> not
> >> >>>>> documented?
> >> >>>>>
> >> >>>>> In the Javadoc it has a similar sentence to the programming guide:
> >> >>>>>
> >> >>>>>> triggering(Trigger) allows specifying a trigger to control when
> (in processing time) results for the given window can be produced. If
> unspecified, the default behavior is to trigger first when the watermark
> passes the end of the window, and then trigger again every time there is
> late arriving data.
> >> >>>>>
> >> >>>>> Thanks,
> >> >>>>> Dan.
> >> >>>>>
> >> >>>>> On Tue, Mar 5, 2019 at 1:46 PM Kenneth Knowles <[email protected]>
> wrote:
> >> >>>>>>
> >> >>>>>> The Window.into transform does not reset the trigger to the
> default. So where you have w2trigger, if you leave it off, then the
> triggering is left as the "continuation trigger" from w1trigger. Basically
> it tries to let any output caused by w1trigger to flow all the way through
> the pipeline without delay.
> >> >>>>>>
> >> >>>>>> Kenn
> >> >>>>>>
> >> >>>>>> On Tue, Mar 5, 2019 at 1:27 PM Daniel Debrunner <
> [email protected]> wrote:
> >> >>>>>>>
> >> >>>>>>> I discover how to fix my issue but not sure I understand why it
> does.
> >> >>>>>>>
> >> >>>>>>> I created a complete sample here:
> >> >>>>>>>
> https://gist.github.com/ddebrunner/5d4ef21c255c1d40a4517a0060ff8b99#file-cascadewindows-java-L104
> >> >>>>>>> Link points to the area of interest.
> >> >>>>>>>
> >> >>>>>>> With the second window I was originally not specifying a
> trigger so
> >> >>>>>>> using the default trigger which lead to multiple triggers of the
> >> >>>>>>> combine on the second window.
> >> >>>>>>>
> >> >>>>>>> However changing the trigger to be
> AfterWatermark.pastEndOfWindow()
> >> >>>>>>> produced the output I expected, a single combine across all the
> >> >>>>>>> elements in the window.
> >> >>>>>>> The gist has comments showing the output and the two code
> variations.
> >> >>>>>>>
> >> >>>>>>> I don't understand why, since according to 8.1.1 [1] I thought
> >> >>>>>>> AfterWatermark.pastEndOfWindow() was the default. Maybe its due
> to
> >> >>>>>>> late data in some way but I'm not sure I understand how the
> data could
> >> >>>>>>> be late in this case.
> >> >>>>>>>
> >> >>>>>>> This is with Beam 2.7 direct runner btw.
> >> >>>>>>>
> >> >>>>>>> Thanks again for your help,
> >> >>>>>>> Dan.
> >> >>>>>>> [1]
> https://beam.apache.org/documentation/programming-guide/#event-time-triggers
> >> >>>>>>>
> >> >>>>>>> On Tue, Mar 5, 2019 at 11:48 AM Daniel Debrunner <
> [email protected]> wrote:
> >> >>>>>>>>
> >> >>>>>>>> Thanks Robert, your description is what I'm expecting, I'm
> working on
> >> >>>>>>>> a simple example to see if what I'm seeing is different and
> then
> >> >>>>>>>> hopefully use that to clarify my misunderstanding.
> >> >>>>>>>>
> >> >>>>>>>> Thanks,
> >> >>>>>>>> Dan.
> >> >>>>>>>>
> >> >>>>>>>> On Tue, Mar 5, 2019 at 11:31 AM Robert Bradshaw <
> [email protected]> wrote:
> >> >>>>>>>>>
> >> >>>>>>>>> Windows are assigned to elements via the Window.into
> transform. They
> >> >>>>>>>>> influence grouping operations such as GroupByKey,
> Combine.perKey, and
> >> >>>>>>>>> Combine.globally. Looking at your example, you start with
> >> >>>>>>>>>
> >> >>>>>>>>>       PCollection<KV<A,B>>
> >> >>>>>>>>>
> >> >>>>>>>>> Presumably via a Read or a Create. These KVs are in a global
> window,
> >> >>>>>>>>> so the elements are really triples (ignoring PaneInfo) of the
> form
> >> >>>>>>>>>
> >> >>>>>>>>>       (KV<A, B>, GlobalWindow, timestamp)
> >> >>>>>>>>>
> >> >>>>>>>>>   From what I gather, the next step you do is a
> >> >>>>>>>>> Window.into(FixedWindows.of(...)), yielding a
> PCollection<KV<A,B>>
> >> >>>>>>>>> whose elements are, implicitly
> >> >>>>>>>>>
> >> >>>>>>>>>       (KV<A, B>, IntervalWindow, timestamp)
> >> >>>>>>>>>
> >> >>>>>>>>> Now you apply a GroupByKey to get elements of the form
> >> >>>>>>>>>
> >> >>>>>>>>>       (KV<A, Iterable<B>>, IntervalWindow, timestamp)
> >> >>>>>>>>>
> >> >>>>>>>>> where there is one Iterable for each distinct key and window.
> You
> >> >>>>>>>>> apply a ParDo to get PCollection<X> which is of the form
> >> >>>>>>>>>
> >> >>>>>>>>>       (X, IntervalWindow, timestamp)
> >> >>>>>>>>>
> >> >>>>>>>>> It looks like your next step is another
> >> >>>>>>>>> Window.into(FixedWindows.of(...)), yielding
> >> >>>>>>>>>
> >> >>>>>>>>>       (X, IntervalWindow, timestamp)
> >> >>>>>>>>>
> >> >>>>>>>>> where the IntervalWindow here may be different if the
> parameters to
> >> >>>>>>>>> FixedWindows were different (e.g. the first was by minute,
> the second
> >> >>>>>>>>> by hours). If it's the same, this is a no-op. Now you apply
> >> >>>>>>>>> Combine.globally(CombineFn<X, R>) to get a PCollection<R>
> whose
> >> >>>>>>>>> elements are of the form
> >> >>>>>>>>>
> >> >>>>>>>>>       (R, IntervalWindow, timestamp)
> >> >>>>>>>>>
> >> >>>>>>>>> where there is now one R per window (the elements in the same
> window
> >> >>>>>>>>> being combined, the elements across windows not).
> >> >>>>>>>>>
> >> >>>>>>>>> FWIW, internally, Combine.globally is implemented as
> PariWithNullKey +
> >> >>>>>>>>> CombinePerKey + StripNullKey.
> >> >>>>>>>>>
> >> >>>>>>>>> Does this help?
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> On Tue, Mar 5, 2019 at 8:09 PM Daniel Debrunner <
> [email protected]> wrote:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks for the reply.
> >> >>>>>>>>>>
> >> >>>>>>>>>> As for every element is always associated with a window,
> when a
> >> >>>>>>>>>> element is produced due to a window trigger (e.g. the
> GroupByKey) what
> >> >>>>>>>>>> window is it associated with? The window it was produced
> from? Maybe
> >> >>>>>>>>>> the question is when is a window assigned to an element?
> >> >>>>>>>>>>
> >> >>>>>>>>>> I'll see if I can come up with an example,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks,
> >> >>>>>>>>>> Dan.
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Tue, Mar 5, 2019 at 10:47 AM Kenneth Knowles <
> [email protected]> wrote:
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Two pieces to this:
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> 1. Every element in a PCollection is always associated with
> a window, and GroupByKey (hence CombinePerKey) operates per-key-and-window
> (w/ window merging).
> >> >>>>>>>>>>> 2. If an element is not explicitly a KV, then there is no
> key associated with it.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> I'm afraid I don't have any guesses at the problem based on
> what you've shared. Can you say more?
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Kenn
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> On Tue, Mar 5, 2019 at 10:29 AM Daniel Debrunner <
> [email protected]> wrote:
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> The windowing section of the Beam programming model guide
> shows a
> >> >>>>>>>>>>>> window defined and used in the GropyByKey transform after
> a ParDo.
> >> >>>>>>>>>>>> (section 7.1.1).
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> However I couldn't find any documentation on how long the
> window
> >> >>>>>>>>>>>> remains in scope for subsequent transforms.
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> I have an application with this pipeline:
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> PCollection<KV<A,B>> -> FixedWindow<KV<A,B>> -> GroupByKey
> ->
> >> >>>>>>>>>>>> PCollection<X> -> FixedWindow<X> -> Combine<X,R>.globally
> ->
> >> >>>>>>>>>>>> PCollection<R>
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> The idea is that the first window is aggregating by key
> but in the
> >> >>>>>>>>>>>> second window I need to combine elements across all keys.
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> With my initial app I was seeing some runtime errors
> in/after the
> >> >>>>>>>>>>>> combine where a KV<null,R> was being seen, even though at
> that point
> >> >>>>>>>>>>>> there should be no key for the PCollection<R>.
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> In a simpler test I can apply  FixedWindow<X> ->
> Combine<X,R>.globally
> >> >>>>>>>>>>>> -> PCollection<R> to a PCollection without an upstream
> window and the
> >> >>>>>>>>>>>> combine correctly happens once.
> >> >>>>>>>>>>>> But then adding the keyed upstream window, the combine
> occurs once per
> >> >>>>>>>>>>>> key without any final combine across the keys.
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> So it seems somehow the memory of the key exists even with
> the new
> >> >>>>>>>>>>>> window transform,
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> I'm probably misunderstanding some detail of windowing,
> but I couldn't
> >> >>>>>>>>>>>> find any deeper documentation than the simple examples in
> the
> >> >>>>>>>>>>>> programming model guide.
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Can anyone point me in the correct direction?
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Thanks,
> >> >>>>>>>>>>>> Dan.
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>>
> >> >>> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
> >> >>>
> >> >>> The above terms reflect a potential business arrangement, are
> provided solely as a basis for further discussion, and are not intended to
> be and do not constitute a legally binding obligation. No legally binding
> obligations will be created, implied, or inferred until an agreement in
> final form is executed in writing by all parties involved.
>

Reply via email to