On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com> wrote:
> > > On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> wrote: > >> On a PR (https://github.com/apache/beam/pull/13927) we got into a >> discussion of a very old and strange feature of Beam that I think we should >> revisit. >> >> The WindowFn has the ability to shift timestamps forward in order to >> unblock downstream watermarks. Why? Specifically in this situation: >> >> - aggregation/GBK with overlapping windows like SlidingWindows >> - timestamp combiner of the aggregated outputs is EARLIEST of the inputs >> - there is another downstream aggregation/GBK >> >> The output watermark of the upstream aggregation is held to the minimum >> of the inputs. When an output is emitted, we desire the output to flow >> through the rest of the pipeline without delay. However, the downstream >> aggregation can (and often will) be delayed by the window size because of >> *watermark >> holds in other later windows that are not released until those windows >> output.* >> > Could you describe this a bit more? Why would later windows hold up the > watermark for upstream steps. (Is it due to some subtlety? Such as tracking > the watermark for each stage, rather than for each step?) > It does not have to do with stages/fusion (a runner-specific concept) but is a necessity of watermarks being per-PCollection. Suppose: - Default triggering - Timestamp combiner EARLIEST - 60s windows sliding every 10s - An element with timestamp 42 - Aggregation (A) with downstream aggregation (B) Here is what happens: - The element falls into [-10, 50) and [0, 60) and [10, 70) and [20, 80) and [30, 90) and [40, 100) - For each of those windows the output watermark hold is set to 42 (the element's timestamp) - At time 50 the aggregation (A) over the first window is emitted; the other windows remain buffered and held - The element arrives at aggregation (B) and is buffered because the input watermark (which is the held output watermark from A) is still 42, even though no other data will arrive for that window (WLOG if elements from other keys are shuffled in) - The input watermark for aggregation (B) does not advance past 42 until the [40, 100) window is fired and releases its watermark hold It is, indeed, subtle. To me, anyhow. I was wrong - it is not delayed by the window size, but by the difference in end-of-window timestamps to all assigned windows (window size minus slide?) So to avoid this, what actually happens in Java today is that the watermark hold, and output timestamp, is set not to 42 but altered to 50 to not overlap the prior window. Timestamp of 50 is very nonintuitive since you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an important role in CoGBK based joins in SQL, where the iterables are re-exploded with timestamps that may be the minimum of input elements. This shifting may actually break SQL... This predated our switch away from "delta from watermark" late data dropping to "window expiry" data dropping. So maybe there is some new way to set a hold that does not make data late or droppable but still use the EARLIEST timestamp. That is my question, for which I have not figured out the answer. Kenn > >> To avoid this problem, element x in window w will have its timestamp >> shifted to not overlap with any earlier windows. It is a weird behavior. It >> fixes the watermark hold problem but introduces a strange output with a >> mysterious timestamp that is hard to justify. >> >> Any other ideas? >> >> Kenn >> >