Slight correction: you mean default SlidingWindows? This is the only
nontrivial implementation I know of. Sessions does not shift the timestamp
because you can't really guess how far to shift it. Sessions with EARLIEST
just get hung entirely whenever there is a long-lived session.

I've thought about it a bit more, and I think the issue is not best solved
by a WindowFn. It requires all the data in the WindowingStrategy to know
whether the hack is useful. So here is a proposal:

 - add a TimestampCombiner (aka OutputTime enum in the proto) for
EARLIEST_NON_OVERLAPPING
 - only call WindowFn#getOutputTime in this case

This is necessarily a breaking change. Users who are using EARLIEST with
SlidingWindows will see a change in behavior. It can be flipped:

 - add a TimestampCombiner EARLIEST_XYZ (not sure what to call it) that
does not call WindowFn#getOutputTime and implement it
 - deprecate EARLIEST but have it keep calling WindowFn#getOutputTime

Or third option:

 - delete WindowFn#getOutputTime and pretend it never existed.
SlidingWindows simply don't work well with EARLIEST. CoGBK joins don't work
well with EARLIEST.

Kenn

On Wed, Feb 17, 2021 at 11:16 AM Robert Bradshaw <rober...@google.com>
wrote:

> OK, so to move forward, shall we update the default Sessions to not do
> this timestamp shifting, perhaps with a (deprecated) timestamp-shifting
> opt-in variant to ease the transition for those that want the old (marked
> experimental) behavior?
>
> On Fri, Feb 12, 2021 at 9:12 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> All of this is right. Things have changed a lot. Nowadays the default
>> will work well, and we can caveat to users that EARLIEST may hold up
>> downstream output for overlapping windows.
>>
>> I'm slightly concerned about the fact that EARLIEST is necessary for
>> CoGBK joins, unless there is some special consideration why it doesn't
>> matter. So I wonder what happens when a pipeline has a few different joins.
>>
>> Kenn
>>
>> On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> Yes, unless you manually set the timestamp combiner to earliest, which
>>> in this case gives earliest + shifted.
>>>
>>> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> The default now is end of window, right? Doesn't that alleviate the
>>>> problem that the original change was supposed to fix?
>>>>
>>>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> The default timestamp combiner used to be earliest as well.
>>>>>
>>>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> IIRC, this was introduced because at the time users complained that
>>>>>> sliding windows were virtually unusable for reasonably-sized windows.
>>>>>> However this was before we allowed customizing the timestamp combiner, so
>>>>>> maybe this is less of a problem now?
>>>>>>
>>>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <rober...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <k...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into
>>>>>>>>>> a discussion of a very old and strange feature of Beam that I think 
>>>>>>>>>> we
>>>>>>>>>> should revisit.
>>>>>>>>>>
>>>>>>>>>> The WindowFn has the ability to shift timestamps forward in order
>>>>>>>>>> to unblock downstream watermarks. Why? Specifically in this 
>>>>>>>>>> situation:
>>>>>>>>>>
>>>>>>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of
>>>>>>>>>> the inputs
>>>>>>>>>>  - there is another downstream aggregation/GBK
>>>>>>>>>>
>>>>>>>>>> The output watermark of the upstream aggregation is held to the
>>>>>>>>>> minimum of the inputs. When an output is emitted, we desire the 
>>>>>>>>>> output to
>>>>>>>>>> flow through the rest of the pipeline without delay. However, the
>>>>>>>>>> downstream aggregation can (and often will) be delayed by the window 
>>>>>>>>>> size
>>>>>>>>>> because of *watermark holds in other later windows that are not
>>>>>>>>>> released until those windows output.*
>>>>>>>>>>
>>>>>>>>> Could you describe this a bit more? Why would later windows hold
>>>>>>>>> up the watermark for upstream steps. (Is it due to some subtlety? 
>>>>>>>>> Such as
>>>>>>>>> tracking the watermark for each stage, rather than for each step?)
>>>>>>>>>
>>>>>>>>
>>>>>>>> It does not have to do with stages/fusion (a runner-specific
>>>>>>>> concept) but is a necessity of watermarks being per-PCollection.
>>>>>>>>
>>>>>>>> Suppose:
>>>>>>>>
>>>>>>>>  - Default triggering
>>>>>>>>  - Timestamp combiner EARLIEST
>>>>>>>>  - 60s windows sliding every 10s
>>>>>>>>  - An element with timestamp 42
>>>>>>>>  - Aggregation (A) with downstream aggregation (B)
>>>>>>>>
>>>>>>>> Here is what happens:
>>>>>>>>
>>>>>>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and
>>>>>>>> [20, 80) and [30, 90) and [40, 100)
>>>>>>>>  - For each of those windows the output watermark hold is set to 42
>>>>>>>> (the element's timestamp)
>>>>>>>>  - At time 50 the aggregation (A) over the first window is emitted;
>>>>>>>> the other windows remain buffered and held
>>>>>>>>  - The element arrives at aggregation (B) and is buffered because
>>>>>>>> the input watermark (which is the held output watermark from A) is 
>>>>>>>> still
>>>>>>>> 42, even though no other data will arrive for that window (WLOG if 
>>>>>>>> elements
>>>>>>>> from other keys are shuffled in)
>>>>>>>>  - The input watermark for aggregation (B) does not advance past 42
>>>>>>>> until the [40, 100) window is fired and releases its watermark hold
>>>>>>>>
>>>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not
>>>>>>>> delayed by the window size, but by the difference in end-of-window
>>>>>>>> timestamps to all assigned windows (window size minus slide?)
>>>>>>>>
>>>>>>>> So to avoid this, what actually happens in Java today is that the
>>>>>>>> watermark hold, and output timestamp, is set not to 42 but altered to 
>>>>>>>> 50 to
>>>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive 
>>>>>>>> since
>>>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner 
>>>>>>>> plays an
>>>>>>>> important role in CoGBK based joins in SQL, where the iterables are
>>>>>>>> re-exploded with timestamps that may be the minimum of input elements. 
>>>>>>>> This
>>>>>>>> shifting may actually break SQL...
>>>>>>>>
>>>>>>>> This predated our switch away from "delta from watermark" late data
>>>>>>>> dropping to "window expiry" data dropping. So maybe there is some new 
>>>>>>>> way
>>>>>>>> to set a hold that does not make data late or droppable but still use 
>>>>>>>> the
>>>>>>>> EARLIEST timestamp. That is my question, for which I have not figured 
>>>>>>>> out
>>>>>>>> the answer.
>>>>>>>>
>>>>>>>
>>>>>>> This is, indeed, a very tough question...
>>>>>>>
>>>>>>> I'd say this is generally a problem with EARLIEST and non-aligned
>>>>>>> windows. E.g. for sessions, a long key can hold up the watermark for all
>>>>>>> others. Here we "know" what the hold up is, and can adjust for it. But I
>>>>>>> don't think doing this adjustment is the right thing. It would certainly
>>>>>>> seem to mess up the timestamp of the outputs from a join. And it's 
>>>>>>> possible
>>>>>>> that the values get re-windowed in which case this element should get
>>>>>>> joined with itself from a later window (which I'll admit is a bit odd, 
>>>>>>> but
>>>>>>> maybe a reflection that multiple-windowing, like multi-firing 
>>>>>>> triggering,
>>>>>>> is non-local).
>>>>>>>
>>>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly
>>>>>>> after the input watermark for A passes 50 because no non-late data 
>>>>>>> coming
>>>>>>> out of A could influence it. In some sense, the "watermark" for the 
>>>>>>> [-10,
>>>>>>> 50) windows has indeed passed, but not that for later windows. I don't
>>>>>>> think the beam model requires that we have a single watermark, just 
>>>>>>> that we
>>>>>>> fire triggers/timers once we have seen all the on-time data that we 
>>>>>>> think
>>>>>>> we could, and a runner could be smart about this.
>>>>>>>
>>>>>>> We may want to keep the ability to shift timestamps for WindowFns,
>>>>>>> but I think we shouldn't be doing so for the default sliding windows.
>>>>>>> Correctness (of output timestamps) over latency unless one asks 
>>>>>>> otherwise.
>>>>>>>
>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>> To avoid this problem, element x in window w will have its
>>>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a 
>>>>>>>>>> weird
>>>>>>>>>> behavior. It fixes the watermark hold problem but introduces a 
>>>>>>>>>> strange
>>>>>>>>>> output with a mysterious timestamp that is hard to justify.
>>>>>>>>>>
>>>>>>>>>> Any other ideas?
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>

Reply via email to