OK. I also prefer the delete option. The main issue that remains is SQL, or
joins in general.

Kenn

On Wed, Feb 17, 2021 at 1:17 PM Robert Bradshaw <rober...@google.com> wrote:

> On Wed, Feb 17, 2021 at 12:30 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> Slight correction: you mean default SlidingWindows? This is the only
>> nontrivial implementation I know of.
>>
>
> Yes.
>
>
>> Sessions does not shift the timestamp because you can't really guess how
>> far to shift it. Sessions with EARLIEST just get hung entirely whenever
>> there is a long-lived session.
>>
>> I've thought about it a bit more, and I think the issue is not best
>> solved by a WindowFn. It requires all the data in the WindowingStrategy to
>> know whether the hack is useful.
>>
>
> I agree. The intent is not to have shifted timestamps, it is to have more
> eager data propagation. Shifting timestamps is a (lossy) mechanism to
> achieve this with our current watermark implementation.
>
>
>> So here is a proposal:
>>
>>  - add a TimestampCombiner (aka OutputTime enum in the proto) for
>> EARLIEST_NON_OVERLAPPING
>>  - only call WindowFn#getOutputTime in this case
>>
>
> Or should this be called EARLIEST_SHIFTING_TIMESTAMP or similar
> (as WindowFn#getOutputTime is not restricted to non-overlapping).
>
>
>> This is necessarily a breaking change. Users who are using EARLIEST with
>> SlidingWindows will see a change in behavior. It can be flipped:
>>
>>  - add a TimestampCombiner EARLIEST_XYZ (not sure what to call it) that
>> does not call WindowFn#getOutputTime and implement it
>>  - deprecate EARLIEST but have it keep calling WindowFn#getOutputTime
>>
>> Or third option:
>>
>>  - delete WindowFn#getOutputTime and pretend it never existed.
>> SlidingWindows simply don't work well with EARLIEST. CoGBK joins don't work
>> well with EARLIEST.
>>
>
> I would go with this. Possibly with a transition period in which we
> support an opt-in option for the old behavior on Java on non-portable
> runners. At least until we can figure out what we really want and add it to
> the model.
>
>
>>
>> Kenn
>>
>> On Wed, Feb 17, 2021 at 11:16 AM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> OK, so to move forward, shall we update the default Sessions to not do
>>> this timestamp shifting, perhaps with a (deprecated) timestamp-shifting
>>> opt-in variant to ease the transition for those that want the old (marked
>>> experimental) behavior?
>>>
>>> On Fri, Feb 12, 2021 at 9:12 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> All of this is right. Things have changed a lot. Nowadays the default
>>>> will work well, and we can caveat to users that EARLIEST may hold up
>>>> downstream output for overlapping windows.
>>>>
>>>> I'm slightly concerned about the fact that EARLIEST is necessary for
>>>> CoGBK joins, unless there is some special consideration why it doesn't
>>>> matter. So I wonder what happens when a pipeline has a few different joins.
>>>>
>>>> Kenn
>>>>
>>>> On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> Yes, unless you manually set the timestamp combiner to earliest, which
>>>>> in this case gives earliest + shifted.
>>>>>
>>>>> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> The default now is end of window, right? Doesn't that alleviate the
>>>>>> problem that the original change was supposed to fix?
>>>>>>
>>>>>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <rober...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> The default timestamp combiner used to be earliest as well.
>>>>>>>
>>>>>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> IIRC, this was introduced because at the time users complained that
>>>>>>>> sliding windows were virtually unusable for reasonably-sized windows.
>>>>>>>> However this was before we allowed customizing the timestamp combiner, 
>>>>>>>> so
>>>>>>>> maybe this is less of a problem now?
>>>>>>>>
>>>>>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <
>>>>>>>> rober...@google.com> wrote:
>>>>>>>>
>>>>>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <k...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <ajam...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <
>>>>>>>>>>> k...@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got
>>>>>>>>>>>> into a discussion of a very old and strange feature of Beam that I 
>>>>>>>>>>>> think we
>>>>>>>>>>>> should revisit.
>>>>>>>>>>>>
>>>>>>>>>>>> The WindowFn has the ability to shift timestamps forward in
>>>>>>>>>>>> order to unblock downstream watermarks. Why? Specifically in this 
>>>>>>>>>>>> situation:
>>>>>>>>>>>>
>>>>>>>>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>>>>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of
>>>>>>>>>>>> the inputs
>>>>>>>>>>>>  - there is another downstream aggregation/GBK
>>>>>>>>>>>>
>>>>>>>>>>>> The output watermark of the upstream aggregation is held to the
>>>>>>>>>>>> minimum of the inputs. When an output is emitted, we desire the 
>>>>>>>>>>>> output to
>>>>>>>>>>>> flow through the rest of the pipeline without delay. However, the
>>>>>>>>>>>> downstream aggregation can (and often will) be delayed by the 
>>>>>>>>>>>> window size
>>>>>>>>>>>> because of *watermark holds in other later windows that are
>>>>>>>>>>>> not released until those windows output.*
>>>>>>>>>>>>
>>>>>>>>>>> Could you describe this a bit more? Why would later windows hold
>>>>>>>>>>> up the watermark for upstream steps. (Is it due to some subtlety? 
>>>>>>>>>>> Such as
>>>>>>>>>>> tracking the watermark for each stage, rather than for each step?)
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> It does not have to do with stages/fusion (a runner-specific
>>>>>>>>>> concept) but is a necessity of watermarks being per-PCollection.
>>>>>>>>>>
>>>>>>>>>> Suppose:
>>>>>>>>>>
>>>>>>>>>>  - Default triggering
>>>>>>>>>>  - Timestamp combiner EARLIEST
>>>>>>>>>>  - 60s windows sliding every 10s
>>>>>>>>>>  - An element with timestamp 42
>>>>>>>>>>  - Aggregation (A) with downstream aggregation (B)
>>>>>>>>>>
>>>>>>>>>> Here is what happens:
>>>>>>>>>>
>>>>>>>>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and
>>>>>>>>>> [20, 80) and [30, 90) and [40, 100)
>>>>>>>>>>  - For each of those windows the output watermark hold is set to
>>>>>>>>>> 42 (the element's timestamp)
>>>>>>>>>>  - At time 50 the aggregation (A) over the first window is
>>>>>>>>>> emitted; the other windows remain buffered and held
>>>>>>>>>>  - The element arrives at aggregation (B) and is buffered because
>>>>>>>>>> the input watermark (which is the held output watermark from A) is 
>>>>>>>>>> still
>>>>>>>>>> 42, even though no other data will arrive for that window (WLOG if 
>>>>>>>>>> elements
>>>>>>>>>> from other keys are shuffled in)
>>>>>>>>>>  - The input watermark for aggregation (B) does not advance past
>>>>>>>>>> 42 until the [40, 100) window is fired and releases its watermark 
>>>>>>>>>> hold
>>>>>>>>>>
>>>>>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not
>>>>>>>>>> delayed by the window size, but by the difference in end-of-window
>>>>>>>>>> timestamps to all assigned windows (window size minus slide?)
>>>>>>>>>>
>>>>>>>>>> So to avoid this, what actually happens in Java today is that the
>>>>>>>>>> watermark hold, and output timestamp, is set not to 42 but altered 
>>>>>>>>>> to 50 to
>>>>>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive 
>>>>>>>>>> since
>>>>>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner 
>>>>>>>>>> plays an
>>>>>>>>>> important role in CoGBK based joins in SQL, where the iterables are
>>>>>>>>>> re-exploded with timestamps that may be the minimum of input 
>>>>>>>>>> elements. This
>>>>>>>>>> shifting may actually break SQL...
>>>>>>>>>>
>>>>>>>>>> This predated our switch away from "delta from watermark" late
>>>>>>>>>> data dropping to "window expiry" data dropping. So maybe there is 
>>>>>>>>>> some new
>>>>>>>>>> way to set a hold that does not make data late or droppable but 
>>>>>>>>>> still use
>>>>>>>>>> the EARLIEST timestamp. That is my question, for which I have not 
>>>>>>>>>> figured
>>>>>>>>>> out the answer.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is, indeed, a very tough question...
>>>>>>>>>
>>>>>>>>> I'd say this is generally a problem with EARLIEST and non-aligned
>>>>>>>>> windows. E.g. for sessions, a long key can hold up the watermark for 
>>>>>>>>> all
>>>>>>>>> others. Here we "know" what the hold up is, and can adjust for it. 
>>>>>>>>> But I
>>>>>>>>> don't think doing this adjustment is the right thing. It would 
>>>>>>>>> certainly
>>>>>>>>> seem to mess up the timestamp of the outputs from a join. And it's 
>>>>>>>>> possible
>>>>>>>>> that the values get re-windowed in which case this element should get
>>>>>>>>> joined with itself from a later window (which I'll admit is a bit 
>>>>>>>>> odd, but
>>>>>>>>> maybe a reflection that multiple-windowing, like multi-firing 
>>>>>>>>> triggering,
>>>>>>>>> is non-local).
>>>>>>>>>
>>>>>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly
>>>>>>>>> after the input watermark for A passes 50 because no non-late data 
>>>>>>>>> coming
>>>>>>>>> out of A could influence it. In some sense, the "watermark" for the 
>>>>>>>>> [-10,
>>>>>>>>> 50) windows has indeed passed, but not that for later windows. I don't
>>>>>>>>> think the beam model requires that we have a single watermark, just 
>>>>>>>>> that we
>>>>>>>>> fire triggers/timers once we have seen all the on-time data that we 
>>>>>>>>> think
>>>>>>>>> we could, and a runner could be smart about this.
>>>>>>>>>
>>>>>>>>> We may want to keep the ability to shift timestamps for WindowFns,
>>>>>>>>> but I think we shouldn't be doing so for the default sliding windows.
>>>>>>>>> Correctness (of output timestamps) over latency unless one asks 
>>>>>>>>> otherwise.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> To avoid this problem, element x in window w will have its
>>>>>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a 
>>>>>>>>>>>> weird
>>>>>>>>>>>> behavior. It fixes the watermark hold problem but introduces a 
>>>>>>>>>>>> strange
>>>>>>>>>>>> output with a mysterious timestamp that is hard to justify.
>>>>>>>>>>>>
>>>>>>>>>>>> Any other ideas?
>>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to