The default now is end of window, right? Doesn't that alleviate the problem
that the original change was supposed to fix?

On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <rober...@google.com>
wrote:

> The default timestamp combiner used to be earliest as well.
>
> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>
>> IIRC, this was introduced because at the time users complained that
>> sliding windows were virtually unusable for reasonably-sized windows.
>> However this was before we allowed customizing the timestamp combiner, so
>> maybe this is less of a problem now?
>>
>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into a
>>>>>> discussion of a very old and strange feature of Beam that I think we 
>>>>>> should
>>>>>> revisit.
>>>>>>
>>>>>> The WindowFn has the ability to shift timestamps forward in order to
>>>>>> unblock downstream watermarks. Why? Specifically in this situation:
>>>>>>
>>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of the
>>>>>> inputs
>>>>>>  - there is another downstream aggregation/GBK
>>>>>>
>>>>>> The output watermark of the upstream aggregation is held to the
>>>>>> minimum of the inputs. When an output is emitted, we desire the output to
>>>>>> flow through the rest of the pipeline without delay. However, the
>>>>>> downstream aggregation can (and often will) be delayed by the window size
>>>>>> because of *watermark holds in other later windows that are not
>>>>>> released until those windows output.*
>>>>>>
>>>>> Could you describe this a bit more? Why would later windows hold up
>>>>> the watermark for upstream steps. (Is it due to some subtlety? Such as
>>>>> tracking the watermark for each stage, rather than for each step?)
>>>>>
>>>>
>>>> It does not have to do with stages/fusion (a runner-specific concept)
>>>> but is a necessity of watermarks being per-PCollection.
>>>>
>>>> Suppose:
>>>>
>>>>  - Default triggering
>>>>  - Timestamp combiner EARLIEST
>>>>  - 60s windows sliding every 10s
>>>>  - An element with timestamp 42
>>>>  - Aggregation (A) with downstream aggregation (B)
>>>>
>>>> Here is what happens:
>>>>
>>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and [20,
>>>> 80) and [30, 90) and [40, 100)
>>>>  - For each of those windows the output watermark hold is set to 42
>>>> (the element's timestamp)
>>>>  - At time 50 the aggregation (A) over the first window is emitted; the
>>>> other windows remain buffered and held
>>>>  - The element arrives at aggregation (B) and is buffered because the
>>>> input watermark (which is the held output watermark from A) is still 42,
>>>> even though no other data will arrive for that window (WLOG if elements
>>>> from other keys are shuffled in)
>>>>  - The input watermark for aggregation (B) does not advance past 42
>>>> until the [40, 100) window is fired and releases its watermark hold
>>>>
>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not delayed
>>>> by the window size, but by the difference in end-of-window timestamps to
>>>> all assigned windows (window size minus slide?)
>>>>
>>>> So to avoid this, what actually happens in Java today is that the
>>>> watermark hold, and output timestamp, is set not to 42 but altered to 50 to
>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since
>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an
>>>> important role in CoGBK based joins in SQL, where the iterables are
>>>> re-exploded with timestamps that may be the minimum of input elements. This
>>>> shifting may actually break SQL...
>>>>
>>>> This predated our switch away from "delta from watermark" late data
>>>> dropping to "window expiry" data dropping. So maybe there is some new way
>>>> to set a hold that does not make data late or droppable but still use the
>>>> EARLIEST timestamp. That is my question, for which I have not figured out
>>>> the answer.
>>>>
>>>
>>> This is, indeed, a very tough question...
>>>
>>> I'd say this is generally a problem with EARLIEST and non-aligned
>>> windows. E.g. for sessions, a long key can hold up the watermark for all
>>> others. Here we "know" what the hold up is, and can adjust for it. But I
>>> don't think doing this adjustment is the right thing. It would certainly
>>> seem to mess up the timestamp of the outputs from a join. And it's possible
>>> that the values get re-windowed in which case this element should get
>>> joined with itself from a later window (which I'll admit is a bit odd, but
>>> maybe a reflection that multiple-windowing, like multi-firing triggering,
>>> is non-local).
>>>
>>> Logicaly, the reason we want [-10 50) window for B to fire shortly after
>>> the input watermark for A passes 50 because no non-late data coming out of
>>> A could influence it. In some sense, the "watermark" for the [-10, 50)
>>> windows has indeed passed, but not that for later windows. I don't think
>>> the beam model requires that we have a single watermark, just that we fire
>>> triggers/timers once we have seen all the on-time data that we think we
>>> could, and a runner could be smart about this.
>>>
>>> We may want to keep the ability to shift timestamps for WindowFns, but I
>>> think we shouldn't be doing so for the default sliding windows. Correctness
>>> (of output timestamps) over latency unless one asks otherwise.
>>>
>>>
>>>> Kenn
>>>>
>>>>
>>>>>
>>>>>> To avoid this problem, element x in window w will have its timestamp
>>>>>> shifted to not overlap with any earlier windows. It is a weird behavior. 
>>>>>> It
>>>>>> fixes the watermark hold problem but introduces a strange output with a
>>>>>> mysterious timestamp that is hard to justify.
>>>>>>
>>>>>> Any other ideas?
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>

Reply via email to