Slight correction: you mean default SlidingWindows? This is the only nontrivial implementation I know of. Sessions does not shift the timestamp because you can't really guess how far to shift it. Sessions with EARLIEST just get hung entirely whenever there is a long-lived session.
I've thought about it a bit more, and I think the issue is not best solved by a WindowFn. It requires all the data in the WindowingStrategy to know whether the hack is useful. So here is a proposal: - add a TimestampCombiner (aka OutputTime enum in the proto) for EARLIEST_NON_OVERLAPPING - only call WindowFn#getOutputTime in this case This is necessarily a breaking change. Users who are using EARLIEST with SlidingWindows will see a change in behavior. It can be flipped: - add a TimestampCombiner EARLIEST_XYZ (not sure what to call it) that does not call WindowFn#getOutputTime and implement it - deprecate EARLIEST but have it keep calling WindowFn#getOutputTime Or third option: - delete WindowFn#getOutputTime and pretend it never existed. SlidingWindows simply don't work well with EARLIEST. CoGBK joins don't work well with EARLIEST. Kenn On Wed, Feb 17, 2021 at 11:16 AM Robert Bradshaw <rober...@google.com> wrote: > OK, so to move forward, shall we update the default Sessions to not do > this timestamp shifting, perhaps with a (deprecated) timestamp-shifting > opt-in variant to ease the transition for those that want the old (marked > experimental) behavior? > > On Fri, Feb 12, 2021 at 9:12 AM Kenneth Knowles <k...@apache.org> wrote: > >> All of this is right. Things have changed a lot. Nowadays the default >> will work well, and we can caveat to users that EARLIEST may hold up >> downstream output for overlapping windows. >> >> I'm slightly concerned about the fact that EARLIEST is necessary for >> CoGBK joins, unless there is some special consideration why it doesn't >> matter. So I wonder what happens when a pipeline has a few different joins. >> >> Kenn >> >> On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> Yes, unless you manually set the timestamp combiner to earliest, which >>> in this case gives earliest + shifted. >>> >>> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote: >>> >>>> The default now is end of window, right? Doesn't that alleviate the >>>> problem that the original change was supposed to fix? >>>> >>>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> The default timestamp combiner used to be earliest as well. >>>>> >>>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> IIRC, this was introduced because at the time users complained that >>>>>> sliding windows were virtually unusable for reasonably-sized windows. >>>>>> However this was before we allowed customizing the timestamp combiner, so >>>>>> maybe this is less of a problem now? >>>>>> >>>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <rober...@google.com> >>>>>> wrote: >>>>>> >>>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <k...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into >>>>>>>>>> a discussion of a very old and strange feature of Beam that I think >>>>>>>>>> we >>>>>>>>>> should revisit. >>>>>>>>>> >>>>>>>>>> The WindowFn has the ability to shift timestamps forward in order >>>>>>>>>> to unblock downstream watermarks. Why? Specifically in this >>>>>>>>>> situation: >>>>>>>>>> >>>>>>>>>> - aggregation/GBK with overlapping windows like SlidingWindows >>>>>>>>>> - timestamp combiner of the aggregated outputs is EARLIEST of >>>>>>>>>> the inputs >>>>>>>>>> - there is another downstream aggregation/GBK >>>>>>>>>> >>>>>>>>>> The output watermark of the upstream aggregation is held to the >>>>>>>>>> minimum of the inputs. When an output is emitted, we desire the >>>>>>>>>> output to >>>>>>>>>> flow through the rest of the pipeline without delay. However, the >>>>>>>>>> downstream aggregation can (and often will) be delayed by the window >>>>>>>>>> size >>>>>>>>>> because of *watermark holds in other later windows that are not >>>>>>>>>> released until those windows output.* >>>>>>>>>> >>>>>>>>> Could you describe this a bit more? Why would later windows hold >>>>>>>>> up the watermark for upstream steps. (Is it due to some subtlety? >>>>>>>>> Such as >>>>>>>>> tracking the watermark for each stage, rather than for each step?) >>>>>>>>> >>>>>>>> >>>>>>>> It does not have to do with stages/fusion (a runner-specific >>>>>>>> concept) but is a necessity of watermarks being per-PCollection. >>>>>>>> >>>>>>>> Suppose: >>>>>>>> >>>>>>>> - Default triggering >>>>>>>> - Timestamp combiner EARLIEST >>>>>>>> - 60s windows sliding every 10s >>>>>>>> - An element with timestamp 42 >>>>>>>> - Aggregation (A) with downstream aggregation (B) >>>>>>>> >>>>>>>> Here is what happens: >>>>>>>> >>>>>>>> - The element falls into [-10, 50) and [0, 60) and [10, 70) and >>>>>>>> [20, 80) and [30, 90) and [40, 100) >>>>>>>> - For each of those windows the output watermark hold is set to 42 >>>>>>>> (the element's timestamp) >>>>>>>> - At time 50 the aggregation (A) over the first window is emitted; >>>>>>>> the other windows remain buffered and held >>>>>>>> - The element arrives at aggregation (B) and is buffered because >>>>>>>> the input watermark (which is the held output watermark from A) is >>>>>>>> still >>>>>>>> 42, even though no other data will arrive for that window (WLOG if >>>>>>>> elements >>>>>>>> from other keys are shuffled in) >>>>>>>> - The input watermark for aggregation (B) does not advance past 42 >>>>>>>> until the [40, 100) window is fired and releases its watermark hold >>>>>>>> >>>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not >>>>>>>> delayed by the window size, but by the difference in end-of-window >>>>>>>> timestamps to all assigned windows (window size minus slide?) >>>>>>>> >>>>>>>> So to avoid this, what actually happens in Java today is that the >>>>>>>> watermark hold, and output timestamp, is set not to 42 but altered to >>>>>>>> 50 to >>>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive >>>>>>>> since >>>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner >>>>>>>> plays an >>>>>>>> important role in CoGBK based joins in SQL, where the iterables are >>>>>>>> re-exploded with timestamps that may be the minimum of input elements. >>>>>>>> This >>>>>>>> shifting may actually break SQL... >>>>>>>> >>>>>>>> This predated our switch away from "delta from watermark" late data >>>>>>>> dropping to "window expiry" data dropping. So maybe there is some new >>>>>>>> way >>>>>>>> to set a hold that does not make data late or droppable but still use >>>>>>>> the >>>>>>>> EARLIEST timestamp. That is my question, for which I have not figured >>>>>>>> out >>>>>>>> the answer. >>>>>>>> >>>>>>> >>>>>>> This is, indeed, a very tough question... >>>>>>> >>>>>>> I'd say this is generally a problem with EARLIEST and non-aligned >>>>>>> windows. E.g. for sessions, a long key can hold up the watermark for all >>>>>>> others. Here we "know" what the hold up is, and can adjust for it. But I >>>>>>> don't think doing this adjustment is the right thing. It would certainly >>>>>>> seem to mess up the timestamp of the outputs from a join. And it's >>>>>>> possible >>>>>>> that the values get re-windowed in which case this element should get >>>>>>> joined with itself from a later window (which I'll admit is a bit odd, >>>>>>> but >>>>>>> maybe a reflection that multiple-windowing, like multi-firing >>>>>>> triggering, >>>>>>> is non-local). >>>>>>> >>>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly >>>>>>> after the input watermark for A passes 50 because no non-late data >>>>>>> coming >>>>>>> out of A could influence it. In some sense, the "watermark" for the >>>>>>> [-10, >>>>>>> 50) windows has indeed passed, but not that for later windows. I don't >>>>>>> think the beam model requires that we have a single watermark, just >>>>>>> that we >>>>>>> fire triggers/timers once we have seen all the on-time data that we >>>>>>> think >>>>>>> we could, and a runner could be smart about this. >>>>>>> >>>>>>> We may want to keep the ability to shift timestamps for WindowFns, >>>>>>> but I think we shouldn't be doing so for the default sliding windows. >>>>>>> Correctness (of output timestamps) over latency unless one asks >>>>>>> otherwise. >>>>>>> >>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>>> To avoid this problem, element x in window w will have its >>>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a >>>>>>>>>> weird >>>>>>>>>> behavior. It fixes the watermark hold problem but introduces a >>>>>>>>>> strange >>>>>>>>>> output with a mysterious timestamp that is hard to justify. >>>>>>>>>> >>>>>>>>>> Any other ideas? >>>>>>>>>> >>>>>>>>>> Kenn >>>>>>>>>> >>>>>>>>>