All of this is right. Things have changed a lot. Nowadays the default will work well, and we can caveat to users that EARLIEST may hold up downstream output for overlapping windows.
I'm slightly concerned about the fact that EARLIEST is necessary for CoGBK joins, unless there is some special consideration why it doesn't matter. So I wonder what happens when a pipeline has a few different joins. Kenn On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <rober...@google.com> wrote: > Yes, unless you manually set the timestamp combiner to earliest, which in > this case gives earliest + shifted. > > On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote: > >> The default now is end of window, right? Doesn't that alleviate the >> problem that the original change was supposed to fix? >> >> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> The default timestamp combiner used to be earliest as well. >>> >>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote: >>> >>>> IIRC, this was introduced because at the time users complained that >>>> sliding windows were virtually unusable for reasonably-sized windows. >>>> However this was before we allowed customizing the timestamp combiner, so >>>> maybe this is less of a problem now? >>>> >>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <k...@apache.org> >>>>> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into a >>>>>>>> discussion of a very old and strange feature of Beam that I think we >>>>>>>> should >>>>>>>> revisit. >>>>>>>> >>>>>>>> The WindowFn has the ability to shift timestamps forward in order >>>>>>>> to unblock downstream watermarks. Why? Specifically in this situation: >>>>>>>> >>>>>>>> - aggregation/GBK with overlapping windows like SlidingWindows >>>>>>>> - timestamp combiner of the aggregated outputs is EARLIEST of the >>>>>>>> inputs >>>>>>>> - there is another downstream aggregation/GBK >>>>>>>> >>>>>>>> The output watermark of the upstream aggregation is held to the >>>>>>>> minimum of the inputs. When an output is emitted, we desire the output >>>>>>>> to >>>>>>>> flow through the rest of the pipeline without delay. However, the >>>>>>>> downstream aggregation can (and often will) be delayed by the window >>>>>>>> size >>>>>>>> because of *watermark holds in other later windows that are not >>>>>>>> released until those windows output.* >>>>>>>> >>>>>>> Could you describe this a bit more? Why would later windows hold up >>>>>>> the watermark for upstream steps. (Is it due to some subtlety? Such as >>>>>>> tracking the watermark for each stage, rather than for each step?) >>>>>>> >>>>>> >>>>>> It does not have to do with stages/fusion (a runner-specific concept) >>>>>> but is a necessity of watermarks being per-PCollection. >>>>>> >>>>>> Suppose: >>>>>> >>>>>> - Default triggering >>>>>> - Timestamp combiner EARLIEST >>>>>> - 60s windows sliding every 10s >>>>>> - An element with timestamp 42 >>>>>> - Aggregation (A) with downstream aggregation (B) >>>>>> >>>>>> Here is what happens: >>>>>> >>>>>> - The element falls into [-10, 50) and [0, 60) and [10, 70) and [20, >>>>>> 80) and [30, 90) and [40, 100) >>>>>> - For each of those windows the output watermark hold is set to 42 >>>>>> (the element's timestamp) >>>>>> - At time 50 the aggregation (A) over the first window is emitted; >>>>>> the other windows remain buffered and held >>>>>> - The element arrives at aggregation (B) and is buffered because the >>>>>> input watermark (which is the held output watermark from A) is still 42, >>>>>> even though no other data will arrive for that window (WLOG if elements >>>>>> from other keys are shuffled in) >>>>>> - The input watermark for aggregation (B) does not advance past 42 >>>>>> until the [40, 100) window is fired and releases its watermark hold >>>>>> >>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not delayed >>>>>> by the window size, but by the difference in end-of-window timestamps to >>>>>> all assigned windows (window size minus slide?) >>>>>> >>>>>> So to avoid this, what actually happens in Java today is that the >>>>>> watermark hold, and output timestamp, is set not to 42 but altered to 50 >>>>>> to >>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since >>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays >>>>>> an >>>>>> important role in CoGBK based joins in SQL, where the iterables are >>>>>> re-exploded with timestamps that may be the minimum of input elements. >>>>>> This >>>>>> shifting may actually break SQL... >>>>>> >>>>>> This predated our switch away from "delta from watermark" late data >>>>>> dropping to "window expiry" data dropping. So maybe there is some new way >>>>>> to set a hold that does not make data late or droppable but still use the >>>>>> EARLIEST timestamp. That is my question, for which I have not figured out >>>>>> the answer. >>>>>> >>>>> >>>>> This is, indeed, a very tough question... >>>>> >>>>> I'd say this is generally a problem with EARLIEST and non-aligned >>>>> windows. E.g. for sessions, a long key can hold up the watermark for all >>>>> others. Here we "know" what the hold up is, and can adjust for it. But I >>>>> don't think doing this adjustment is the right thing. It would certainly >>>>> seem to mess up the timestamp of the outputs from a join. And it's >>>>> possible >>>>> that the values get re-windowed in which case this element should get >>>>> joined with itself from a later window (which I'll admit is a bit odd, but >>>>> maybe a reflection that multiple-windowing, like multi-firing triggering, >>>>> is non-local). >>>>> >>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly >>>>> after the input watermark for A passes 50 because no non-late data coming >>>>> out of A could influence it. In some sense, the "watermark" for the [-10, >>>>> 50) windows has indeed passed, but not that for later windows. I don't >>>>> think the beam model requires that we have a single watermark, just that >>>>> we >>>>> fire triggers/timers once we have seen all the on-time data that we think >>>>> we could, and a runner could be smart about this. >>>>> >>>>> We may want to keep the ability to shift timestamps for WindowFns, but >>>>> I think we shouldn't be doing so for the default sliding windows. >>>>> Correctness (of output timestamps) over latency unless one asks otherwise. >>>>> >>>>> >>>>>> Kenn >>>>>> >>>>>> >>>>>>> >>>>>>>> To avoid this problem, element x in window w will have its >>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a >>>>>>>> weird >>>>>>>> behavior. It fixes the watermark hold problem but introduces a strange >>>>>>>> output with a mysterious timestamp that is hard to justify. >>>>>>>> >>>>>>>> Any other ideas? >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>