OK. I also prefer the delete option. The main issue that remains is SQL, or joins in general.
Kenn On Wed, Feb 17, 2021 at 1:17 PM Robert Bradshaw <rober...@google.com> wrote: > On Wed, Feb 17, 2021 at 12:30 PM Kenneth Knowles <k...@apache.org> wrote: > >> Slight correction: you mean default SlidingWindows? This is the only >> nontrivial implementation I know of. >> > > Yes. > > >> Sessions does not shift the timestamp because you can't really guess how >> far to shift it. Sessions with EARLIEST just get hung entirely whenever >> there is a long-lived session. >> >> I've thought about it a bit more, and I think the issue is not best >> solved by a WindowFn. It requires all the data in the WindowingStrategy to >> know whether the hack is useful. >> > > I agree. The intent is not to have shifted timestamps, it is to have more > eager data propagation. Shifting timestamps is a (lossy) mechanism to > achieve this with our current watermark implementation. > > >> So here is a proposal: >> >> - add a TimestampCombiner (aka OutputTime enum in the proto) for >> EARLIEST_NON_OVERLAPPING >> - only call WindowFn#getOutputTime in this case >> > > Or should this be called EARLIEST_SHIFTING_TIMESTAMP or similar > (as WindowFn#getOutputTime is not restricted to non-overlapping). > > >> This is necessarily a breaking change. Users who are using EARLIEST with >> SlidingWindows will see a change in behavior. It can be flipped: >> >> - add a TimestampCombiner EARLIEST_XYZ (not sure what to call it) that >> does not call WindowFn#getOutputTime and implement it >> - deprecate EARLIEST but have it keep calling WindowFn#getOutputTime >> >> Or third option: >> >> - delete WindowFn#getOutputTime and pretend it never existed. >> SlidingWindows simply don't work well with EARLIEST. CoGBK joins don't work >> well with EARLIEST. >> > > I would go with this. Possibly with a transition period in which we > support an opt-in option for the old behavior on Java on non-portable > runners. At least until we can figure out what we really want and add it to > the model. > > >> >> Kenn >> >> On Wed, Feb 17, 2021 at 11:16 AM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> OK, so to move forward, shall we update the default Sessions to not do >>> this timestamp shifting, perhaps with a (deprecated) timestamp-shifting >>> opt-in variant to ease the transition for those that want the old (marked >>> experimental) behavior? >>> >>> On Fri, Feb 12, 2021 at 9:12 AM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> All of this is right. Things have changed a lot. Nowadays the default >>>> will work well, and we can caveat to users that EARLIEST may hold up >>>> downstream output for overlapping windows. >>>> >>>> I'm slightly concerned about the fact that EARLIEST is necessary for >>>> CoGBK joins, unless there is some special consideration why it doesn't >>>> matter. So I wonder what happens when a pipeline has a few different joins. >>>> >>>> Kenn >>>> >>>> On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> Yes, unless you manually set the timestamp combiner to earliest, which >>>>> in this case gives earliest + shifted. >>>>> >>>>> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> The default now is end of window, right? Doesn't that alleviate the >>>>>> problem that the original change was supposed to fix? >>>>>> >>>>>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <rober...@google.com> >>>>>> wrote: >>>>>> >>>>>>> The default timestamp combiner used to be earliest as well. >>>>>>> >>>>>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> IIRC, this was introduced because at the time users complained that >>>>>>>> sliding windows were virtually unusable for reasonably-sized windows. >>>>>>>> However this was before we allowed customizing the timestamp combiner, >>>>>>>> so >>>>>>>> maybe this is less of a problem now? >>>>>>>> >>>>>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw < >>>>>>>> rober...@google.com> wrote: >>>>>>>> >>>>>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <k...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles < >>>>>>>>>>> k...@apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got >>>>>>>>>>>> into a discussion of a very old and strange feature of Beam that I >>>>>>>>>>>> think we >>>>>>>>>>>> should revisit. >>>>>>>>>>>> >>>>>>>>>>>> The WindowFn has the ability to shift timestamps forward in >>>>>>>>>>>> order to unblock downstream watermarks. Why? Specifically in this >>>>>>>>>>>> situation: >>>>>>>>>>>> >>>>>>>>>>>> - aggregation/GBK with overlapping windows like SlidingWindows >>>>>>>>>>>> - timestamp combiner of the aggregated outputs is EARLIEST of >>>>>>>>>>>> the inputs >>>>>>>>>>>> - there is another downstream aggregation/GBK >>>>>>>>>>>> >>>>>>>>>>>> The output watermark of the upstream aggregation is held to the >>>>>>>>>>>> minimum of the inputs. When an output is emitted, we desire the >>>>>>>>>>>> output to >>>>>>>>>>>> flow through the rest of the pipeline without delay. However, the >>>>>>>>>>>> downstream aggregation can (and often will) be delayed by the >>>>>>>>>>>> window size >>>>>>>>>>>> because of *watermark holds in other later windows that are >>>>>>>>>>>> not released until those windows output.* >>>>>>>>>>>> >>>>>>>>>>> Could you describe this a bit more? Why would later windows hold >>>>>>>>>>> up the watermark for upstream steps. (Is it due to some subtlety? >>>>>>>>>>> Such as >>>>>>>>>>> tracking the watermark for each stage, rather than for each step?) >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> It does not have to do with stages/fusion (a runner-specific >>>>>>>>>> concept) but is a necessity of watermarks being per-PCollection. >>>>>>>>>> >>>>>>>>>> Suppose: >>>>>>>>>> >>>>>>>>>> - Default triggering >>>>>>>>>> - Timestamp combiner EARLIEST >>>>>>>>>> - 60s windows sliding every 10s >>>>>>>>>> - An element with timestamp 42 >>>>>>>>>> - Aggregation (A) with downstream aggregation (B) >>>>>>>>>> >>>>>>>>>> Here is what happens: >>>>>>>>>> >>>>>>>>>> - The element falls into [-10, 50) and [0, 60) and [10, 70) and >>>>>>>>>> [20, 80) and [30, 90) and [40, 100) >>>>>>>>>> - For each of those windows the output watermark hold is set to >>>>>>>>>> 42 (the element's timestamp) >>>>>>>>>> - At time 50 the aggregation (A) over the first window is >>>>>>>>>> emitted; the other windows remain buffered and held >>>>>>>>>> - The element arrives at aggregation (B) and is buffered because >>>>>>>>>> the input watermark (which is the held output watermark from A) is >>>>>>>>>> still >>>>>>>>>> 42, even though no other data will arrive for that window (WLOG if >>>>>>>>>> elements >>>>>>>>>> from other keys are shuffled in) >>>>>>>>>> - The input watermark for aggregation (B) does not advance past >>>>>>>>>> 42 until the [40, 100) window is fired and releases its watermark >>>>>>>>>> hold >>>>>>>>>> >>>>>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not >>>>>>>>>> delayed by the window size, but by the difference in end-of-window >>>>>>>>>> timestamps to all assigned windows (window size minus slide?) >>>>>>>>>> >>>>>>>>>> So to avoid this, what actually happens in Java today is that the >>>>>>>>>> watermark hold, and output timestamp, is set not to 42 but altered >>>>>>>>>> to 50 to >>>>>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive >>>>>>>>>> since >>>>>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner >>>>>>>>>> plays an >>>>>>>>>> important role in CoGBK based joins in SQL, where the iterables are >>>>>>>>>> re-exploded with timestamps that may be the minimum of input >>>>>>>>>> elements. This >>>>>>>>>> shifting may actually break SQL... >>>>>>>>>> >>>>>>>>>> This predated our switch away from "delta from watermark" late >>>>>>>>>> data dropping to "window expiry" data dropping. So maybe there is >>>>>>>>>> some new >>>>>>>>>> way to set a hold that does not make data late or droppable but >>>>>>>>>> still use >>>>>>>>>> the EARLIEST timestamp. That is my question, for which I have not >>>>>>>>>> figured >>>>>>>>>> out the answer. >>>>>>>>>> >>>>>>>>> >>>>>>>>> This is, indeed, a very tough question... >>>>>>>>> >>>>>>>>> I'd say this is generally a problem with EARLIEST and non-aligned >>>>>>>>> windows. E.g. for sessions, a long key can hold up the watermark for >>>>>>>>> all >>>>>>>>> others. Here we "know" what the hold up is, and can adjust for it. >>>>>>>>> But I >>>>>>>>> don't think doing this adjustment is the right thing. It would >>>>>>>>> certainly >>>>>>>>> seem to mess up the timestamp of the outputs from a join. And it's >>>>>>>>> possible >>>>>>>>> that the values get re-windowed in which case this element should get >>>>>>>>> joined with itself from a later window (which I'll admit is a bit >>>>>>>>> odd, but >>>>>>>>> maybe a reflection that multiple-windowing, like multi-firing >>>>>>>>> triggering, >>>>>>>>> is non-local). >>>>>>>>> >>>>>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly >>>>>>>>> after the input watermark for A passes 50 because no non-late data >>>>>>>>> coming >>>>>>>>> out of A could influence it. In some sense, the "watermark" for the >>>>>>>>> [-10, >>>>>>>>> 50) windows has indeed passed, but not that for later windows. I don't >>>>>>>>> think the beam model requires that we have a single watermark, just >>>>>>>>> that we >>>>>>>>> fire triggers/timers once we have seen all the on-time data that we >>>>>>>>> think >>>>>>>>> we could, and a runner could be smart about this. >>>>>>>>> >>>>>>>>> We may want to keep the ability to shift timestamps for WindowFns, >>>>>>>>> but I think we shouldn't be doing so for the default sliding windows. >>>>>>>>> Correctness (of output timestamps) over latency unless one asks >>>>>>>>> otherwise. >>>>>>>>> >>>>>>>>> >>>>>>>>>> Kenn >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> To avoid this problem, element x in window w will have its >>>>>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a >>>>>>>>>>>> weird >>>>>>>>>>>> behavior. It fixes the watermark hold problem but introduces a >>>>>>>>>>>> strange >>>>>>>>>>>> output with a mysterious timestamp that is hard to justify. >>>>>>>>>>>> >>>>>>>>>>>> Any other ideas? >>>>>>>>>>>> >>>>>>>>>>>> Kenn >>>>>>>>>>>> >>>>>>>>>>>