Hi Kenn,

I do not agree with the last part. We are talking about definition of semantics. If GBK can be implemented on top of stateful dofn, then stateful dofn is the more generic transform. Therefore, semantics should be defined on this transform, and _derived_ (or transferred) to the less generic ones. If you execute GBK as a stateful dofn or not (probably not) is just a runtime optimization (these optimizations are possible due to discrete - and predictable - movements of time defined by triggers). But semantics should adhere to the generic definition and not be affected by runtime optimizations.

Last remark, yes, if we disallow moving element's timestamp to the past, then we don't need window.minTimestamp, because the minTimestamp is the defined implicitly by window open time. It opens a question if a droppable element should or should not be dropped not only when arriving too late after window close, but if arriving too late after window open.

But disallowing timestamp to move back in time seems impractical, because I can imagine source assigning elements ingestion time timestamps (e.g. kafka by default), which are later remapped to event time in user code. That will necessarily mean moving time backwards.

Jan

On 1/22/20 11:53 PM, Kenneth Knowles wrote:
Had a lunch chat about this issue.

Moving elements back in time can make them late or droppable. You just can't really do it safely.

Moving elements into the future is fine up to the end of the window. It is not safe to move further. The watermark for a PCollection is based on the element timestamps. If an element's timestamp is in the future, the watermark can advance to that point in the future. This may cause the watermark to expire the window. So this can also make data late or droppable.

It is actually not true that GBK is based on stateful DoFn. That is one way to implement it, but not the only way nor always the best way. They are qualitatively different.

Kenn

On Wed, Jan 22, 2020 at 1:52 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    I sense this discussion might be (remotely) related to [1] (and
    especially [2]). The common ground here is that we need a sound
    definition of window. I think people might be currently having
    different definitions, which leads to this sort of
    misunderstandings. The definition should be created in terms of
    stateful dofn (not GBK, which might probably be the case today),
    because that is the most low level transform, all the others are
    being built upon it. Looking at this with this optics, it seems
    that window actually scopes state of stateful dofn. The scope can be:

     (a) one sided (having only defined max timestamp)

     (b) both sided (having minimum and maximum)

    We have currently approach (a), which results in ability to move
    timestamp *arbitrarily far to the past*, which moving timestamp to
    future is limited by window's maxTimestamp. If we extend this to
    (b), then windowFn starts to create something like universe
    (actually multiverse, because it can return multiple windows). It
    should be invalid for element to escape its universe, that would
    be counter intuitive. If we disallow emission of data elements
    that are _late even when created_ (i.e. are emitted with timestamp
    less than output watermark) and we disallow setting timers with
    timestamp higher than window.maxTimestamp (which we currently do),
    then we have disallowed any element to escape its window
    (universe, range of validity). It would also require the output
    watermark of stateful dofn to be keyed and set to at least
    window.minTimestamp when window is opened. This would remove a
    sort of asymmetry (why to know maxTimestamp and not
    minTimestamp?). Also note that (a) is equal to (b) if and only if
    we disallow shifting time to past.

    Jan

    [1]
    
https://lists.apache.org/thread.html/c37dfb6c545fba7d794a13c507dccebb654bbd8b317dab748a6775dc%40%3Cdev.beam.apache.org%3E

    [2]
    
https://lists.apache.org/thread.html/r7f38860557d6571869e8e0989275f6ed610cf8c99b2f56fc6418a1d1%40%3Cdev.beam.apache.org%3E

    On 1/21/20 10:08 PM, Ankur Goenka wrote:


    On Thu, Jan 16, 2020 at 9:52 PM Kenneth Knowles <k...@apache.org
    <mailto:k...@apache.org>> wrote:



        On Thu, Jan 16, 2020 at 11:38 AM Robert Bradshaw
        <rober...@google.com <mailto:rober...@google.com>> wrote:

            On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles
            <k...@apache.org <mailto:k...@apache.org>> wrote:
            >
            > IIRC in Java it is forbidden to output an element with
            a timestamp outside its current window.

            I don't think this is checked anywhere. (Not sure how you
            would check
            it, as there's not generic window containment function--I
            suppose you
            could check if it's past the end of the window (and of
            course skew
            limits how far you can go back). I suppose you could try
            re-windowing
            and then fail if it didn't agree with what was already there.


        I think you are right. This is governed by how a runner
        invoked utilities from runners-core (output ultimately
        reaches this point without validation:
        
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L258)

            > An exception is outputs from @FinishBundle, where the
            output timestamp is required and the window is applied.
            TBH it seems more of an artifact of a mismatch between
            the pre-windowing and post-windowing worlds.

            Elements are always in some window, even if just the
            global window.


        I mean that the existence of a window-unaware @FinishBundle
        method is an artifact of the method existing prior to
        windowing as a concept. The idea that a user can use a DoFn's
        local variables to buffer stuff and then output
        in @FinishBundle predates the existence of windowing.

            > Most of the time, mixing processing across windows is
            simply wrong. But there are fears that calling
            @FinishBundle once per window would be a performance
            problem. On the other hand, don't most correct
            implementations have to separate processing for each
            window anyhow?

            Processing needs to be done per window iff the result
            depends on the
            window or if there are side effects.

            > Anyhow I think the Java behavior is better, so window
            assignment happens exactly and only at window transforms.

            But then one ends up with timestamps that are unrelated
            to the windows, right?


        As far as the model goes, I think windows provide an upper
        bound but not a lower bound. If we take the approach that
        windows are a "secondary key with a max timestamp" then the
        timestamps should be related to the window in the sense that
        they are <= the window's max timestamp.

    A window only makes sense when a trigger or timer is fired. And
    the timestamp of the elements in the window should be within the
    window's time range when a trigger is set. For consistency, I
    think element timestamp should remain within the corresponding
    time range at every stage of the graph.
    IIUC based on the discussion, users can violate this requirement
    easily in the pipeline code which might give inconsistent
    behavior across runners.

    I think we should stick to a consistent behavior across languages
    and runners. We have multiple options here like
    1. Don't have any promised correlation between element timestamp
    and window. Window will just behave like a secondary key for the
    element.
    2. Making it explicit that the last window function can be
    applied out of order anytime on the elements.
    3. Not letting users change the timestamp without applying a
    windowing function after the changed timestamp and before a
    trigger. Though, this can only be validated at the runtime in python.
    4. Revalidating the window after changing the timestamp. Also
    provide additional methods to explicitly change the timestamp and
    window in oneshot.
    5. etc....


        Kenn

            > Kenn
            >
            > On Wed, Jan 15, 2020 at 4:59 PM Ankur Goenka
            <goe...@google.com <mailto:goe...@google.com>> wrote:
            >>
            >> The case where a plan vanilla value or a windowed
            value is emitted seems as expected as the user intent is
            honored without any surprises.
            >>
            >> If I understand correctly in the case when timestamp
            is changed then applying window function again can have
            unintended behavior in following cases
            >> * Custom windows: User code can be executed in
            unintended order.
            >> * User emit a windowed value in a previous transform:
            Timestamping the value in this case would overwrite the
            user assigned window in earlier step even when the actual
            timestamp is the same. Semantically, emitting an element
            or a timestamped value with the same timestamp should
            have the same behaviour.
            >>
            >> What do you think?
            >>
            >>
            >> On Wed, Jan 15, 2020 at 4:04 PM Robert Bradshaw
            <rober...@google.com <mailto:rober...@google.com>> wrote:
            >>>
            >>> If an element is emitted with a timestamp, the window
            assignment is
            >>> re-applied at that time. At least that's how it is in
            Python. You can
            >>> emit the full windowed value (accepted without
            checking...), a
            >>> timestamped value (in which case the window will be
            computed), or a
            >>> plain old element (in which case the window and
            timestamp will be
            >>> computed (really, propagated)).
            >>>
            >>> On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka
            <goe...@google.com <mailto:goe...@google.com>> wrote:
            >>> >
            >>> > Yup, This might result in unintended behavior as
            timestamp is changed after the window assignment as
            elements in windows do not have timestamp in the window
            time range.
            >>> >
            >>> > Shall we start validating atleast one window
            assignment between timestamp assignment and GBK/triggers
            to avoid unintended behaviors mentioned above?
            >>> >
            >>> > On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik
            <lc...@google.com <mailto:lc...@google.com>> wrote:
            >>> >>
            >>> >> Window assignment happens at the point in the
            pipeline the WindowInto transform was applied. So in this
            case the window would have been assigned using the
            original timestamp.
            >>> >>
            >>> >> Grouping is by key and window.
            >>> >>
            >>> >> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka
            <goe...@google.com <mailto:goe...@google.com>> wrote:
            >>> >>>
            >>> >>> Hi,
            >>> >>>
            >>> >>> I am not sure about the effect of the order of
            element timestamp change and window association has on a
            group by key.
            >>> >>> More specifically, what would be the behavior if
            we apply window -> change element timestamp -> Group By key.
            >>> >>> I think we should always apply window function
            after changing the timestamp of elements. Though this is
            neither checked nor a recommended practice in Beam.
            >>> >>>
            >>> >>> Example pipeline would look like this:
            >>> >>>
            >>> >>>       def applyTimestamp(value):
            >>> >>>             return window.TimestampedValue((key,
            value), int(time.time())
            >>> >>>
            >>> >>>         p \
            >>> >>>             | 'Create' >> beam.Create(range(0, 10)) \
            >>> >>>             | 'Fixed Window' >>
            beam.WindowInto(window.FixedWindows(5)) \
            >>> >>>             | 'Apply Timestamp' >>
            beam.Map(applyTimestamp) \ # Timestamp is changed after
            windowing and before GBK
            >>> >>>             | 'Group By Key' >> beam.GroupByKey() \
            >>> >>>             | 'Print' >> beam.Map(print)
            >>> >>>
            >>> >>> Thanks,
            >>> >>> Ankur

Reply via email to