Good point that it is required to have a cross-language spec here. Yes, I think it is a property of the WindowFn but maybe also a property of the pipeline as a whole. I've only really seen sessions, sessions-with-some-limitation, and the wacky Nexmark WindowFns that merge everything for an auction then snap to the begin/end bounds based on seeing begin/end events.
I'm thinking we choose a default and let people re-window if they want different behavior. Is there a reason that this won't work? (like they need to change the behavior deep inside a composite that they cannot access?) I'm leaning toward 1 (make SDKs use the ALREADY_MERGED bit and allow windows to be carried along) because it is the most flexible default. It sounds like you are too? Kenn On Mon, Feb 22, 2021 at 2:01 PM Robert Bradshaw <[email protected]> wrote: > So, what we want to prohibit is stacked merging aggregations. (Open > question: is that a property of the WindowFn, in which case some merging > WindowFns could allow stacking, and some non-merging ones prohibit it, or > is this really tied to merging itself?) > > In order to do this in a cross-language way (e.g. two Java aggregations > separated by a Go DoFn) we need to preserve this "don't re-aggregate" bit > in the proto. I thought that's what ALREADY_MERGED was for. > > > On Thu, Feb 18, 2021 at 8:30 PM Kenneth Knowles <[email protected]> wrote: > >> So there's a bit of an open question about the Java SDK behavior and >> whether we should keep the unused ALREADY_MERGED in the model proto. >> >> Here is a proposal that maintains the intent of everything: >> >> - Remove MergeStatus.ALREADY_MERGED since there is no SDK that has ever >> had any semantics like that. >> - InvalidWindows is merging and translates as NEEDS_MERGE so that it >> gets invoked and crashes. This contradicts *both* PRs linked. >> - This means that embracing runners that only support a fixed set of >> windowing primitives requires them to at least be able to carry along >> InvalidWindows without invoking it >> >> I think the last bullet is unfortunate. So two proposals that allow >> runners to support only a fixed set of windowing primitives: >> >> (1) Don't convert merging WindowFns to InvalidWindows. Instead set an >> "already merged bit" that makes it into a non-merging WindowFn and >> translate as ALREADY_MERGED. This would allow a later GBK to make no sense >> in the case of sessions because there's not much chance windows will >> coincide. But merging WindowFns don't have to work like sessions so maybe >> there is some case where actually there's a small number of possible output >> windows. >> >> OR >> >> (2) Don't convert merging WindowFns to InvalidWindows. Instead leave it >> just the way it is (like Python) and translate as NEEDS_MERGE. We still >> remove ALREADY_MERGED. This would allow a later GBK to make no sense >> because there's not likely to be any merging for the same reason. But >> merging WindowFns don't have to work like sessions so they might merge >> based on some other interesting criteria. >> >> I think (2) does seem more likely to have uses. I don't think either are >> likely to have very many, especially if there are very few user-authored >> merging WindowFns out there (and I agree that this is probably true). >> Choice (2) also has the benefit that it matches Python and that it is >> trivial to implement. >> >> Kenn >> >> On Thu, Feb 18, 2021 at 3:18 PM Robert Bradshaw <[email protected]> >> wrote: >> >>> I think you're right about Python. I think it's fine for the SDK to >>> prohibit (or require explicit user action) for ambiguous things like >>> stacked sessions. This illegal state wouldn't generally need to be >>> represented in proto (but maybe it'd be nice for quicker errors in cross >>> language). >>> >>> On Thu, Feb 18, 2021 at 1:38 PM Kenneth Knowles <[email protected]> wrote: >>> >>>> Great. Should be easy to sort this out before Go has to make any >>>> decisions. >>>> >>>> I will take this opportunity to get on my soapbox and suggest instead >>>> of "custom WindowFn" we simply call them "WindowFn". The suffix "Fn" >>>> indicates that it is definable code, not just an enum that selects baked-in >>>> functionality. If you can't run user code for a particular type of Fn, you >>>> don't support it. If you don't support "custom WindowFns" you don't support >>>> WindowFns (but you may support "windowing" in some predefined ways). >>>> >>> >>> Or maybe we should call the ones off the short list "arbitrary >>> WindowFns." I think the reason "not supporting WindowFns" feels odd is that >>> with the enumerated list one may hist 90+% of usecases, which is much >>> better than not supporting the concept of windowing (timestamps, ...) at >>> all. >>> >>> >>>> Kenn >>>> >>>> On Thu, Feb 18, 2021 at 10:13 AM Robert Burke <[email protected]> >>>> wrote: >>>> >>>>> A bit more information: graphx/translate.go has the handling of >>>>> WindowingStrategy at pipeline encoding and we only use Non Merging. >>>>> >>>>> Presumably this is something that would need to be fixed when >>>>> supporting Session windows in BEAM-4152 >>>>> >>>>> >>>>> On Thu, Feb 18, 2021, 10:02 AM Robert Burke <[email protected]> >>>>> wrote: >>>>> >>>>>> Go has very basic windowing support that is managed entirely by the >>>>>> runner. Session windowing isn't implemented yet, let alone custom >>>>>> windowfns >>>>>> which i asume is what would need to specify these things. >>>>>> >>>>>> Session windowing is tracked in BEAM-4152 >>>>>> and Custome windowFns are tracked in BEAM-11100. >>>>>> >>>>>> On Wed, Feb 17, 2021, 12:06 PM Kenneth Knowles <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> Yet another exciting corner of portability, discovered during >>>>>>> debugging. Some discussion at >>>>>>> https://github.com/apache/beam/pull/14001 and >>>>>>> https://github.com/apache/beam/pull/13998 >>>>>>> >>>>>>> **In Java since around the beginning of Beam** >>>>>>> When a merging WindowFn goes through a GBK/Combine and windows are >>>>>>> merged, the downstream windowing is changed to "InvalidWindows" which >>>>>>> will >>>>>>> fail any downstream GBK. The user is required to re-window before >>>>>>> another >>>>>>> GBK. >>>>>>> >>>>>>> It was to protect a user from this: >>>>>>> >>>>>>> 1. User sets keys and chooses session windowing >>>>>>> 2. User groups/combines by session >>>>>>> 3. User computes the outputs to produce some new keys >>>>>>> 4. User groups again >>>>>>> >>>>>>> The result usually does not make sense. Because it was forbidden we >>>>>>> never decided whether things should merge again or not. >>>>>>> >>>>>>> **In protos** >>>>>>> The MergeStatus enum has NON_MERGING, NEEDS_MERGE, and >>>>>>> ALREADY_MERGED. It is documented that ALREADY_MERGED is for >>>>>>> sessions/merging windows after a GBK. >>>>>>> >>>>>>> This is _maybe_ better. It allows the windows to just be carried >>>>>>> along. It is a major model change and would require SDK support. But it >>>>>>> might still not make sense because the chances that two elements have >>>>>>> exactly the same merging window are very low for something like >>>>>>> sessions. >>>>>>> It may be useful for advanced tricks with merging windows, but noone is >>>>>>> doing that because no SDK supports it. >>>>>>> >>>>>>> **In Python** >>>>>>> I think nothing is done. The next GBK will merge again. I could be >>>>>>> wrong - I just read the code very quickly and don't know it that well. >>>>>>> >>>>>>> **In Go** >>>>>>> I didn't even check. Maybe someone can add the status to the thread. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>
