Yes, unless you manually set the timestamp combiner to earliest, which in this case gives earliest + shifted.
On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote: > The default now is end of window, right? Doesn't that alleviate the > problem that the original change was supposed to fix? > > On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <rober...@google.com> > wrote: > >> The default timestamp combiner used to be earliest as well. >> >> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote: >> >>> IIRC, this was introduced because at the time users complained that >>> sliding windows were virtually unusable for reasonably-sized windows. >>> However this was before we allowed customizing the timestamp combiner, so >>> maybe this is less of a problem now? >>> >>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> >>>>> >>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into a >>>>>>> discussion of a very old and strange feature of Beam that I think we >>>>>>> should >>>>>>> revisit. >>>>>>> >>>>>>> The WindowFn has the ability to shift timestamps forward in order to >>>>>>> unblock downstream watermarks. Why? Specifically in this situation: >>>>>>> >>>>>>> - aggregation/GBK with overlapping windows like SlidingWindows >>>>>>> - timestamp combiner of the aggregated outputs is EARLIEST of the >>>>>>> inputs >>>>>>> - there is another downstream aggregation/GBK >>>>>>> >>>>>>> The output watermark of the upstream aggregation is held to the >>>>>>> minimum of the inputs. When an output is emitted, we desire the output >>>>>>> to >>>>>>> flow through the rest of the pipeline without delay. However, the >>>>>>> downstream aggregation can (and often will) be delayed by the window >>>>>>> size >>>>>>> because of *watermark holds in other later windows that are not >>>>>>> released until those windows output.* >>>>>>> >>>>>> Could you describe this a bit more? Why would later windows hold up >>>>>> the watermark for upstream steps. (Is it due to some subtlety? Such as >>>>>> tracking the watermark for each stage, rather than for each step?) >>>>>> >>>>> >>>>> It does not have to do with stages/fusion (a runner-specific concept) >>>>> but is a necessity of watermarks being per-PCollection. >>>>> >>>>> Suppose: >>>>> >>>>> - Default triggering >>>>> - Timestamp combiner EARLIEST >>>>> - 60s windows sliding every 10s >>>>> - An element with timestamp 42 >>>>> - Aggregation (A) with downstream aggregation (B) >>>>> >>>>> Here is what happens: >>>>> >>>>> - The element falls into [-10, 50) and [0, 60) and [10, 70) and [20, >>>>> 80) and [30, 90) and [40, 100) >>>>> - For each of those windows the output watermark hold is set to 42 >>>>> (the element's timestamp) >>>>> - At time 50 the aggregation (A) over the first window is emitted; >>>>> the other windows remain buffered and held >>>>> - The element arrives at aggregation (B) and is buffered because the >>>>> input watermark (which is the held output watermark from A) is still 42, >>>>> even though no other data will arrive for that window (WLOG if elements >>>>> from other keys are shuffled in) >>>>> - The input watermark for aggregation (B) does not advance past 42 >>>>> until the [40, 100) window is fired and releases its watermark hold >>>>> >>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not delayed >>>>> by the window size, but by the difference in end-of-window timestamps to >>>>> all assigned windows (window size minus slide?) >>>>> >>>>> So to avoid this, what actually happens in Java today is that the >>>>> watermark hold, and output timestamp, is set not to 42 but altered to 50 >>>>> to >>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since >>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an >>>>> important role in CoGBK based joins in SQL, where the iterables are >>>>> re-exploded with timestamps that may be the minimum of input elements. >>>>> This >>>>> shifting may actually break SQL... >>>>> >>>>> This predated our switch away from "delta from watermark" late data >>>>> dropping to "window expiry" data dropping. So maybe there is some new way >>>>> to set a hold that does not make data late or droppable but still use the >>>>> EARLIEST timestamp. That is my question, for which I have not figured out >>>>> the answer. >>>>> >>>> >>>> This is, indeed, a very tough question... >>>> >>>> I'd say this is generally a problem with EARLIEST and non-aligned >>>> windows. E.g. for sessions, a long key can hold up the watermark for all >>>> others. Here we "know" what the hold up is, and can adjust for it. But I >>>> don't think doing this adjustment is the right thing. It would certainly >>>> seem to mess up the timestamp of the outputs from a join. And it's possible >>>> that the values get re-windowed in which case this element should get >>>> joined with itself from a later window (which I'll admit is a bit odd, but >>>> maybe a reflection that multiple-windowing, like multi-firing triggering, >>>> is non-local). >>>> >>>> Logicaly, the reason we want [-10 50) window for B to fire shortly >>>> after the input watermark for A passes 50 because no non-late data coming >>>> out of A could influence it. In some sense, the "watermark" for the [-10, >>>> 50) windows has indeed passed, but not that for later windows. I don't >>>> think the beam model requires that we have a single watermark, just that we >>>> fire triggers/timers once we have seen all the on-time data that we think >>>> we could, and a runner could be smart about this. >>>> >>>> We may want to keep the ability to shift timestamps for WindowFns, but >>>> I think we shouldn't be doing so for the default sliding windows. >>>> Correctness (of output timestamps) over latency unless one asks otherwise. >>>> >>>> >>>>> Kenn >>>>> >>>>> >>>>>> >>>>>>> To avoid this problem, element x in window w will have its timestamp >>>>>>> shifted to not overlap with any earlier windows. It is a weird >>>>>>> behavior. It >>>>>>> fixes the watermark hold problem but introduces a strange output with a >>>>>>> mysterious timestamp that is hard to justify. >>>>>>> >>>>>>> Any other ideas? >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>