On Wed, Feb 17, 2021 at 12:30 PM Kenneth Knowles <k...@apache.org> wrote:
> Slight correction: you mean default SlidingWindows? This is the only > nontrivial implementation I know of. > Yes. > Sessions does not shift the timestamp because you can't really guess how > far to shift it. Sessions with EARLIEST just get hung entirely whenever > there is a long-lived session. > > I've thought about it a bit more, and I think the issue is not best solved > by a WindowFn. It requires all the data in the WindowingStrategy to know > whether the hack is useful. > I agree. The intent is not to have shifted timestamps, it is to have more eager data propagation. Shifting timestamps is a (lossy) mechanism to achieve this with our current watermark implementation. > So here is a proposal: > > - add a TimestampCombiner (aka OutputTime enum in the proto) for > EARLIEST_NON_OVERLAPPING > - only call WindowFn#getOutputTime in this case > Or should this be called EARLIEST_SHIFTING_TIMESTAMP or similar (as WindowFn#getOutputTime is not restricted to non-overlapping). > This is necessarily a breaking change. Users who are using EARLIEST with > SlidingWindows will see a change in behavior. It can be flipped: > > - add a TimestampCombiner EARLIEST_XYZ (not sure what to call it) that > does not call WindowFn#getOutputTime and implement it > - deprecate EARLIEST but have it keep calling WindowFn#getOutputTime > > Or third option: > > - delete WindowFn#getOutputTime and pretend it never existed. > SlidingWindows simply don't work well with EARLIEST. CoGBK joins don't work > well with EARLIEST. > I would go with this. Possibly with a transition period in which we support an opt-in option for the old behavior on Java on non-portable runners. At least until we can figure out what we really want and add it to the model. > > Kenn > > On Wed, Feb 17, 2021 at 11:16 AM Robert Bradshaw <rober...@google.com> > wrote: > >> OK, so to move forward, shall we update the default Sessions to not do >> this timestamp shifting, perhaps with a (deprecated) timestamp-shifting >> opt-in variant to ease the transition for those that want the old (marked >> experimental) behavior? >> >> On Fri, Feb 12, 2021 at 9:12 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> All of this is right. Things have changed a lot. Nowadays the default >>> will work well, and we can caveat to users that EARLIEST may hold up >>> downstream output for overlapping windows. >>> >>> I'm slightly concerned about the fact that EARLIEST is necessary for >>> CoGBK joins, unless there is some special consideration why it doesn't >>> matter. So I wonder what happens when a pipeline has a few different joins. >>> >>> Kenn >>> >>> On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> Yes, unless you manually set the timestamp combiner to earliest, which >>>> in this case gives earliest + shifted. >>>> >>>> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote: >>>> >>>>> The default now is end of window, right? Doesn't that alleviate the >>>>> problem that the original change was supposed to fix? >>>>> >>>>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> >>>>>> The default timestamp combiner used to be earliest as well. >>>>>> >>>>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> IIRC, this was introduced because at the time users complained that >>>>>>> sliding windows were virtually unusable for reasonably-sized windows. >>>>>>> However this was before we allowed customizing the timestamp combiner, >>>>>>> so >>>>>>> maybe this is less of a problem now? >>>>>>> >>>>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw < >>>>>>> rober...@google.com> wrote: >>>>>>> >>>>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <k...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into >>>>>>>>>>> a discussion of a very old and strange feature of Beam that I think >>>>>>>>>>> we >>>>>>>>>>> should revisit. >>>>>>>>>>> >>>>>>>>>>> The WindowFn has the ability to shift timestamps forward in >>>>>>>>>>> order to unblock downstream watermarks. Why? Specifically in this >>>>>>>>>>> situation: >>>>>>>>>>> >>>>>>>>>>> - aggregation/GBK with overlapping windows like SlidingWindows >>>>>>>>>>> - timestamp combiner of the aggregated outputs is EARLIEST of >>>>>>>>>>> the inputs >>>>>>>>>>> - there is another downstream aggregation/GBK >>>>>>>>>>> >>>>>>>>>>> The output watermark of the upstream aggregation is held to the >>>>>>>>>>> minimum of the inputs. When an output is emitted, we desire the >>>>>>>>>>> output to >>>>>>>>>>> flow through the rest of the pipeline without delay. However, the >>>>>>>>>>> downstream aggregation can (and often will) be delayed by the >>>>>>>>>>> window size >>>>>>>>>>> because of *watermark holds in other later windows that are not >>>>>>>>>>> released until those windows output.* >>>>>>>>>>> >>>>>>>>>> Could you describe this a bit more? Why would later windows hold >>>>>>>>>> up the watermark for upstream steps. (Is it due to some subtlety? >>>>>>>>>> Such as >>>>>>>>>> tracking the watermark for each stage, rather than for each step?) >>>>>>>>>> >>>>>>>>> >>>>>>>>> It does not have to do with stages/fusion (a runner-specific >>>>>>>>> concept) but is a necessity of watermarks being per-PCollection. >>>>>>>>> >>>>>>>>> Suppose: >>>>>>>>> >>>>>>>>> - Default triggering >>>>>>>>> - Timestamp combiner EARLIEST >>>>>>>>> - 60s windows sliding every 10s >>>>>>>>> - An element with timestamp 42 >>>>>>>>> - Aggregation (A) with downstream aggregation (B) >>>>>>>>> >>>>>>>>> Here is what happens: >>>>>>>>> >>>>>>>>> - The element falls into [-10, 50) and [0, 60) and [10, 70) and >>>>>>>>> [20, 80) and [30, 90) and [40, 100) >>>>>>>>> - For each of those windows the output watermark hold is set to >>>>>>>>> 42 (the element's timestamp) >>>>>>>>> - At time 50 the aggregation (A) over the first window is >>>>>>>>> emitted; the other windows remain buffered and held >>>>>>>>> - The element arrives at aggregation (B) and is buffered because >>>>>>>>> the input watermark (which is the held output watermark from A) is >>>>>>>>> still >>>>>>>>> 42, even though no other data will arrive for that window (WLOG if >>>>>>>>> elements >>>>>>>>> from other keys are shuffled in) >>>>>>>>> - The input watermark for aggregation (B) does not advance past >>>>>>>>> 42 until the [40, 100) window is fired and releases its watermark hold >>>>>>>>> >>>>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not >>>>>>>>> delayed by the window size, but by the difference in end-of-window >>>>>>>>> timestamps to all assigned windows (window size minus slide?) >>>>>>>>> >>>>>>>>> So to avoid this, what actually happens in Java today is that the >>>>>>>>> watermark hold, and output timestamp, is set not to 42 but altered to >>>>>>>>> 50 to >>>>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive >>>>>>>>> since >>>>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner >>>>>>>>> plays an >>>>>>>>> important role in CoGBK based joins in SQL, where the iterables are >>>>>>>>> re-exploded with timestamps that may be the minimum of input >>>>>>>>> elements. This >>>>>>>>> shifting may actually break SQL... >>>>>>>>> >>>>>>>>> This predated our switch away from "delta from watermark" late >>>>>>>>> data dropping to "window expiry" data dropping. So maybe there is >>>>>>>>> some new >>>>>>>>> way to set a hold that does not make data late or droppable but still >>>>>>>>> use >>>>>>>>> the EARLIEST timestamp. That is my question, for which I have not >>>>>>>>> figured >>>>>>>>> out the answer. >>>>>>>>> >>>>>>>> >>>>>>>> This is, indeed, a very tough question... >>>>>>>> >>>>>>>> I'd say this is generally a problem with EARLIEST and non-aligned >>>>>>>> windows. E.g. for sessions, a long key can hold up the watermark for >>>>>>>> all >>>>>>>> others. Here we "know" what the hold up is, and can adjust for it. But >>>>>>>> I >>>>>>>> don't think doing this adjustment is the right thing. It would >>>>>>>> certainly >>>>>>>> seem to mess up the timestamp of the outputs from a join. And it's >>>>>>>> possible >>>>>>>> that the values get re-windowed in which case this element should get >>>>>>>> joined with itself from a later window (which I'll admit is a bit odd, >>>>>>>> but >>>>>>>> maybe a reflection that multiple-windowing, like multi-firing >>>>>>>> triggering, >>>>>>>> is non-local). >>>>>>>> >>>>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly >>>>>>>> after the input watermark for A passes 50 because no non-late data >>>>>>>> coming >>>>>>>> out of A could influence it. In some sense, the "watermark" for the >>>>>>>> [-10, >>>>>>>> 50) windows has indeed passed, but not that for later windows. I don't >>>>>>>> think the beam model requires that we have a single watermark, just >>>>>>>> that we >>>>>>>> fire triggers/timers once we have seen all the on-time data that we >>>>>>>> think >>>>>>>> we could, and a runner could be smart about this. >>>>>>>> >>>>>>>> We may want to keep the ability to shift timestamps for WindowFns, >>>>>>>> but I think we shouldn't be doing so for the default sliding windows. >>>>>>>> Correctness (of output timestamps) over latency unless one asks >>>>>>>> otherwise. >>>>>>>> >>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>>> To avoid this problem, element x in window w will have its >>>>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a >>>>>>>>>>> weird >>>>>>>>>>> behavior. It fixes the watermark hold problem but introduces a >>>>>>>>>>> strange >>>>>>>>>>> output with a mysterious timestamp that is hard to justify. >>>>>>>>>>> >>>>>>>>>>> Any other ideas? >>>>>>>>>>> >>>>>>>>>>> Kenn >>>>>>>>>>> >>>>>>>>>>