On Wed, Feb 17, 2021 at 12:30 PM Kenneth Knowles <k...@apache.org> wrote:

> Slight correction: you mean default SlidingWindows? This is the only
> nontrivial implementation I know of.
>

Yes.


> Sessions does not shift the timestamp because you can't really guess how
> far to shift it. Sessions with EARLIEST just get hung entirely whenever
> there is a long-lived session.
>
> I've thought about it a bit more, and I think the issue is not best solved
> by a WindowFn. It requires all the data in the WindowingStrategy to know
> whether the hack is useful.
>

I agree. The intent is not to have shifted timestamps, it is to have more
eager data propagation. Shifting timestamps is a (lossy) mechanism to
achieve this with our current watermark implementation.


> So here is a proposal:
>
>  - add a TimestampCombiner (aka OutputTime enum in the proto) for
> EARLIEST_NON_OVERLAPPING
>  - only call WindowFn#getOutputTime in this case
>

Or should this be called EARLIEST_SHIFTING_TIMESTAMP or similar
(as WindowFn#getOutputTime is not restricted to non-overlapping).


> This is necessarily a breaking change. Users who are using EARLIEST with
> SlidingWindows will see a change in behavior. It can be flipped:
>
>  - add a TimestampCombiner EARLIEST_XYZ (not sure what to call it) that
> does not call WindowFn#getOutputTime and implement it
>  - deprecate EARLIEST but have it keep calling WindowFn#getOutputTime
>
> Or third option:
>
>  - delete WindowFn#getOutputTime and pretend it never existed.
> SlidingWindows simply don't work well with EARLIEST. CoGBK joins don't work
> well with EARLIEST.
>

I would go with this. Possibly with a transition period in which we support
an opt-in option for the old behavior on Java on non-portable runners. At
least until we can figure out what we really want and add it to the model.


>
> Kenn
>
> On Wed, Feb 17, 2021 at 11:16 AM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> OK, so to move forward, shall we update the default Sessions to not do
>> this timestamp shifting, perhaps with a (deprecated) timestamp-shifting
>> opt-in variant to ease the transition for those that want the old (marked
>> experimental) behavior?
>>
>> On Fri, Feb 12, 2021 at 9:12 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> All of this is right. Things have changed a lot. Nowadays the default
>>> will work well, and we can caveat to users that EARLIEST may hold up
>>> downstream output for overlapping windows.
>>>
>>> I'm slightly concerned about the fact that EARLIEST is necessary for
>>> CoGBK joins, unless there is some special consideration why it doesn't
>>> matter. So I wonder what happens when a pipeline has a few different joins.
>>>
>>> Kenn
>>>
>>> On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> Yes, unless you manually set the timestamp combiner to earliest, which
>>>> in this case gives earliest + shifted.
>>>>
>>>> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> The default now is end of window, right? Doesn't that alleviate the
>>>>> problem that the original change was supposed to fix?
>>>>>
>>>>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>>
>>>>>> The default timestamp combiner used to be earliest as well.
>>>>>>
>>>>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> IIRC, this was introduced because at the time users complained that
>>>>>>> sliding windows were virtually unusable for reasonably-sized windows.
>>>>>>> However this was before we allowed customizing the timestamp combiner, 
>>>>>>> so
>>>>>>> maybe this is less of a problem now?
>>>>>>>
>>>>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <
>>>>>>> rober...@google.com> wrote:
>>>>>>>
>>>>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <k...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into
>>>>>>>>>>> a discussion of a very old and strange feature of Beam that I think 
>>>>>>>>>>> we
>>>>>>>>>>> should revisit.
>>>>>>>>>>>
>>>>>>>>>>> The WindowFn has the ability to shift timestamps forward in
>>>>>>>>>>> order to unblock downstream watermarks. Why? Specifically in this 
>>>>>>>>>>> situation:
>>>>>>>>>>>
>>>>>>>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>>>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of
>>>>>>>>>>> the inputs
>>>>>>>>>>>  - there is another downstream aggregation/GBK
>>>>>>>>>>>
>>>>>>>>>>> The output watermark of the upstream aggregation is held to the
>>>>>>>>>>> minimum of the inputs. When an output is emitted, we desire the 
>>>>>>>>>>> output to
>>>>>>>>>>> flow through the rest of the pipeline without delay. However, the
>>>>>>>>>>> downstream aggregation can (and often will) be delayed by the 
>>>>>>>>>>> window size
>>>>>>>>>>> because of *watermark holds in other later windows that are not
>>>>>>>>>>> released until those windows output.*
>>>>>>>>>>>
>>>>>>>>>> Could you describe this a bit more? Why would later windows hold
>>>>>>>>>> up the watermark for upstream steps. (Is it due to some subtlety? 
>>>>>>>>>> Such as
>>>>>>>>>> tracking the watermark for each stage, rather than for each step?)
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> It does not have to do with stages/fusion (a runner-specific
>>>>>>>>> concept) but is a necessity of watermarks being per-PCollection.
>>>>>>>>>
>>>>>>>>> Suppose:
>>>>>>>>>
>>>>>>>>>  - Default triggering
>>>>>>>>>  - Timestamp combiner EARLIEST
>>>>>>>>>  - 60s windows sliding every 10s
>>>>>>>>>  - An element with timestamp 42
>>>>>>>>>  - Aggregation (A) with downstream aggregation (B)
>>>>>>>>>
>>>>>>>>> Here is what happens:
>>>>>>>>>
>>>>>>>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and
>>>>>>>>> [20, 80) and [30, 90) and [40, 100)
>>>>>>>>>  - For each of those windows the output watermark hold is set to
>>>>>>>>> 42 (the element's timestamp)
>>>>>>>>>  - At time 50 the aggregation (A) over the first window is
>>>>>>>>> emitted; the other windows remain buffered and held
>>>>>>>>>  - The element arrives at aggregation (B) and is buffered because
>>>>>>>>> the input watermark (which is the held output watermark from A) is 
>>>>>>>>> still
>>>>>>>>> 42, even though no other data will arrive for that window (WLOG if 
>>>>>>>>> elements
>>>>>>>>> from other keys are shuffled in)
>>>>>>>>>  - The input watermark for aggregation (B) does not advance past
>>>>>>>>> 42 until the [40, 100) window is fired and releases its watermark hold
>>>>>>>>>
>>>>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not
>>>>>>>>> delayed by the window size, but by the difference in end-of-window
>>>>>>>>> timestamps to all assigned windows (window size minus slide?)
>>>>>>>>>
>>>>>>>>> So to avoid this, what actually happens in Java today is that the
>>>>>>>>> watermark hold, and output timestamp, is set not to 42 but altered to 
>>>>>>>>> 50 to
>>>>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive 
>>>>>>>>> since
>>>>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner 
>>>>>>>>> plays an
>>>>>>>>> important role in CoGBK based joins in SQL, where the iterables are
>>>>>>>>> re-exploded with timestamps that may be the minimum of input 
>>>>>>>>> elements. This
>>>>>>>>> shifting may actually break SQL...
>>>>>>>>>
>>>>>>>>> This predated our switch away from "delta from watermark" late
>>>>>>>>> data dropping to "window expiry" data dropping. So maybe there is 
>>>>>>>>> some new
>>>>>>>>> way to set a hold that does not make data late or droppable but still 
>>>>>>>>> use
>>>>>>>>> the EARLIEST timestamp. That is my question, for which I have not 
>>>>>>>>> figured
>>>>>>>>> out the answer.
>>>>>>>>>
>>>>>>>>
>>>>>>>> This is, indeed, a very tough question...
>>>>>>>>
>>>>>>>> I'd say this is generally a problem with EARLIEST and non-aligned
>>>>>>>> windows. E.g. for sessions, a long key can hold up the watermark for 
>>>>>>>> all
>>>>>>>> others. Here we "know" what the hold up is, and can adjust for it. But 
>>>>>>>> I
>>>>>>>> don't think doing this adjustment is the right thing. It would 
>>>>>>>> certainly
>>>>>>>> seem to mess up the timestamp of the outputs from a join. And it's 
>>>>>>>> possible
>>>>>>>> that the values get re-windowed in which case this element should get
>>>>>>>> joined with itself from a later window (which I'll admit is a bit odd, 
>>>>>>>> but
>>>>>>>> maybe a reflection that multiple-windowing, like multi-firing 
>>>>>>>> triggering,
>>>>>>>> is non-local).
>>>>>>>>
>>>>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly
>>>>>>>> after the input watermark for A passes 50 because no non-late data 
>>>>>>>> coming
>>>>>>>> out of A could influence it. In some sense, the "watermark" for the 
>>>>>>>> [-10,
>>>>>>>> 50) windows has indeed passed, but not that for later windows. I don't
>>>>>>>> think the beam model requires that we have a single watermark, just 
>>>>>>>> that we
>>>>>>>> fire triggers/timers once we have seen all the on-time data that we 
>>>>>>>> think
>>>>>>>> we could, and a runner could be smart about this.
>>>>>>>>
>>>>>>>> We may want to keep the ability to shift timestamps for WindowFns,
>>>>>>>> but I think we shouldn't be doing so for the default sliding windows.
>>>>>>>> Correctness (of output timestamps) over latency unless one asks 
>>>>>>>> otherwise.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> To avoid this problem, element x in window w will have its
>>>>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a 
>>>>>>>>>>> weird
>>>>>>>>>>> behavior. It fixes the watermark hold problem but introduces a 
>>>>>>>>>>> strange
>>>>>>>>>>> output with a mysterious timestamp that is hard to justify.
>>>>>>>>>>>
>>>>>>>>>>> Any other ideas?
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to