IIRC, this was introduced because at the time users complained that sliding windows were virtually unusable for reasonably-sized windows. However this was before we allowed customizing the timestamp combiner, so maybe this is less of a problem now?
On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <rober...@google.com> wrote: > On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <k...@apache.org> wrote: > >> >> >> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com> wrote: >> >>> >>> >>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> >>> wrote: >>> >>>> On a PR (https://github.com/apache/beam/pull/13927) we got into a >>>> discussion of a very old and strange feature of Beam that I think we should >>>> revisit. >>>> >>>> The WindowFn has the ability to shift timestamps forward in order to >>>> unblock downstream watermarks. Why? Specifically in this situation: >>>> >>>> - aggregation/GBK with overlapping windows like SlidingWindows >>>> - timestamp combiner of the aggregated outputs is EARLIEST of the >>>> inputs >>>> - there is another downstream aggregation/GBK >>>> >>>> The output watermark of the upstream aggregation is held to the minimum >>>> of the inputs. When an output is emitted, we desire the output to flow >>>> through the rest of the pipeline without delay. However, the downstream >>>> aggregation can (and often will) be delayed by the window size because of >>>> *watermark >>>> holds in other later windows that are not released until those windows >>>> output.* >>>> >>> Could you describe this a bit more? Why would later windows hold up the >>> watermark for upstream steps. (Is it due to some subtlety? Such as tracking >>> the watermark for each stage, rather than for each step?) >>> >> >> It does not have to do with stages/fusion (a runner-specific concept) but >> is a necessity of watermarks being per-PCollection. >> >> Suppose: >> >> - Default triggering >> - Timestamp combiner EARLIEST >> - 60s windows sliding every 10s >> - An element with timestamp 42 >> - Aggregation (A) with downstream aggregation (B) >> >> Here is what happens: >> >> - The element falls into [-10, 50) and [0, 60) and [10, 70) and [20, 80) >> and [30, 90) and [40, 100) >> - For each of those windows the output watermark hold is set to 42 (the >> element's timestamp) >> - At time 50 the aggregation (A) over the first window is emitted; the >> other windows remain buffered and held >> - The element arrives at aggregation (B) and is buffered because the >> input watermark (which is the held output watermark from A) is still 42, >> even though no other data will arrive for that window (WLOG if elements >> from other keys are shuffled in) >> - The input watermark for aggregation (B) does not advance past 42 until >> the [40, 100) window is fired and releases its watermark hold >> >> It is, indeed, subtle. To me, anyhow. I was wrong - it is not delayed by >> the window size, but by the difference in end-of-window timestamps to all >> assigned windows (window size minus slide?) >> >> So to avoid this, what actually happens in Java today is that the >> watermark hold, and output timestamp, is set not to 42 but altered to 50 to >> not overlap the prior window. Timestamp of 50 is very nonintuitive since >> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an >> important role in CoGBK based joins in SQL, where the iterables are >> re-exploded with timestamps that may be the minimum of input elements. This >> shifting may actually break SQL... >> >> This predated our switch away from "delta from watermark" late data >> dropping to "window expiry" data dropping. So maybe there is some new way >> to set a hold that does not make data late or droppable but still use the >> EARLIEST timestamp. That is my question, for which I have not figured out >> the answer. >> > > This is, indeed, a very tough question... > > I'd say this is generally a problem with EARLIEST and non-aligned windows. > E.g. for sessions, a long key can hold up the watermark for all others. > Here we "know" what the hold up is, and can adjust for it. But I don't > think doing this adjustment is the right thing. It would certainly seem to > mess up the timestamp of the outputs from a join. And it's possible that > the values get re-windowed in which case this element should get joined > with itself from a later window (which I'll admit is a bit odd, but maybe a > reflection that multiple-windowing, like multi-firing triggering, is > non-local). > > Logicaly, the reason we want [-10 50) window for B to fire shortly after > the input watermark for A passes 50 because no non-late data coming out of > A could influence it. In some sense, the "watermark" for the [-10, 50) > windows has indeed passed, but not that for later windows. I don't think > the beam model requires that we have a single watermark, just that we fire > triggers/timers once we have seen all the on-time data that we think we > could, and a runner could be smart about this. > > We may want to keep the ability to shift timestamps for WindowFns, but I > think we shouldn't be doing so for the default sliding windows. Correctness > (of output timestamps) over latency unless one asks otherwise. > > >> Kenn >> >> >>> >>>> To avoid this problem, element x in window w will have its timestamp >>>> shifted to not overlap with any earlier windows. It is a weird behavior. It >>>> fixes the watermark hold problem but introduces a strange output with a >>>> mysterious timestamp that is hard to justify. >>>> >>>> Any other ideas? >>>> >>>> Kenn >>>> >>>