On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <k...@apache.org> wrote:
> > > On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com> wrote: > >> >> >> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> wrote: >> >>> On a PR (https://github.com/apache/beam/pull/13927) we got into a >>> discussion of a very old and strange feature of Beam that I think we should >>> revisit. >>> >>> The WindowFn has the ability to shift timestamps forward in order to >>> unblock downstream watermarks. Why? Specifically in this situation: >>> >>> - aggregation/GBK with overlapping windows like SlidingWindows >>> - timestamp combiner of the aggregated outputs is EARLIEST of the inputs >>> - there is another downstream aggregation/GBK >>> >>> The output watermark of the upstream aggregation is held to the minimum >>> of the inputs. When an output is emitted, we desire the output to flow >>> through the rest of the pipeline without delay. However, the downstream >>> aggregation can (and often will) be delayed by the window size because of >>> *watermark >>> holds in other later windows that are not released until those windows >>> output.* >>> >> Could you describe this a bit more? Why would later windows hold up the >> watermark for upstream steps. (Is it due to some subtlety? Such as tracking >> the watermark for each stage, rather than for each step?) >> > > It does not have to do with stages/fusion (a runner-specific concept) but > is a necessity of watermarks being per-PCollection. > > Suppose: > > - Default triggering > - Timestamp combiner EARLIEST > - 60s windows sliding every 10s > - An element with timestamp 42 > - Aggregation (A) with downstream aggregation (B) > > Here is what happens: > > - The element falls into [-10, 50) and [0, 60) and [10, 70) and [20, 80) > and [30, 90) and [40, 100) > - For each of those windows the output watermark hold is set to 42 (the > element's timestamp) > - At time 50 the aggregation (A) over the first window is emitted; the > other windows remain buffered and held > - The element arrives at aggregation (B) and is buffered because the > input watermark (which is the held output watermark from A) is still 42, > even though no other data will arrive for that window (WLOG if elements > from other keys are shuffled in) > - The input watermark for aggregation (B) does not advance past 42 until > the [40, 100) window is fired and releases its watermark hold > > It is, indeed, subtle. To me, anyhow. I was wrong - it is not delayed by > the window size, but by the difference in end-of-window timestamps to all > assigned windows (window size minus slide?) > > So to avoid this, what actually happens in Java today is that the > watermark hold, and output timestamp, is set not to 42 but altered to 50 to > not overlap the prior window. Timestamp of 50 is very nonintuitive since > you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an > important role in CoGBK based joins in SQL, where the iterables are > re-exploded with timestamps that may be the minimum of input elements. This > shifting may actually break SQL... > > This predated our switch away from "delta from watermark" late data > dropping to "window expiry" data dropping. So maybe there is some new way > to set a hold that does not make data late or droppable but still use the > EARLIEST timestamp. That is my question, for which I have not figured out > the answer. > This is, indeed, a very tough question... I'd say this is generally a problem with EARLIEST and non-aligned windows. E.g. for sessions, a long key can hold up the watermark for all others. Here we "know" what the hold up is, and can adjust for it. But I don't think doing this adjustment is the right thing. It would certainly seem to mess up the timestamp of the outputs from a join. And it's possible that the values get re-windowed in which case this element should get joined with itself from a later window (which I'll admit is a bit odd, but maybe a reflection that multiple-windowing, like multi-firing triggering, is non-local). Logicaly, the reason we want [-10 50) window for B to fire shortly after the input watermark for A passes 50 because no non-late data coming out of A could influence it. In some sense, the "watermark" for the [-10, 50) windows has indeed passed, but not that for later windows. I don't think the beam model requires that we have a single watermark, just that we fire triggers/timers once we have seen all the on-time data that we think we could, and a runner could be smart about this. We may want to keep the ability to shift timestamps for WindowFns, but I think we shouldn't be doing so for the default sliding windows. Correctness (of output timestamps) over latency unless one asks otherwise. > Kenn > > >> >>> To avoid this problem, element x in window w will have its timestamp >>> shifted to not overlap with any earlier windows. It is a weird behavior. It >>> fixes the watermark hold problem but introduces a strange output with a >>> mysterious timestamp that is hard to justify. >>> >>> Any other ideas? >>> >>> Kenn >>> >>