On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles <k...@apache.org> wrote: > > IIRC in Java it is forbidden to output an element with a timestamp outside > its current window.
I don't think this is checked anywhere. (Not sure how you would check it, as there's not generic window containment function--I suppose you could check if it's past the end of the window (and of course skew limits how far you can go back). I suppose you could try re-windowing and then fail if it didn't agree with what was already there. > An exception is outputs from @FinishBundle, where the output timestamp is > required and the window is applied. TBH it seems more of an artifact of a > mismatch between the pre-windowing and post-windowing worlds. Elements are always in some window, even if just the global window. > Most of the time, mixing processing across windows is simply wrong. But there > are fears that calling @FinishBundle once per window would be a performance > problem. On the other hand, don't most correct implementations have to > separate processing for each window anyhow? Processing needs to be done per window iff the result depends on the window or if there are side effects. > Anyhow I think the Java behavior is better, so window assignment happens > exactly and only at window transforms. But then one ends up with timestamps that are unrelated to the windows, right? > Kenn > > On Wed, Jan 15, 2020 at 4:59 PM Ankur Goenka <goe...@google.com> wrote: >> >> The case where a plan vanilla value or a windowed value is emitted seems as >> expected as the user intent is honored without any surprises. >> >> If I understand correctly in the case when timestamp is changed then >> applying window function again can have unintended behavior in following >> cases >> * Custom windows: User code can be executed in unintended order. >> * User emit a windowed value in a previous transform: Timestamping the value >> in this case would overwrite the user assigned window in earlier step even >> when the actual timestamp is the same. Semantically, emitting an element or >> a timestamped value with the same timestamp should have the same behaviour. >> >> What do you think? >> >> >> On Wed, Jan 15, 2020 at 4:04 PM Robert Bradshaw <rober...@google.com> wrote: >>> >>> If an element is emitted with a timestamp, the window assignment is >>> re-applied at that time. At least that's how it is in Python. You can >>> emit the full windowed value (accepted without checking...), a >>> timestamped value (in which case the window will be computed), or a >>> plain old element (in which case the window and timestamp will be >>> computed (really, propagated)). >>> >>> On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka <goe...@google.com> wrote: >>> > >>> > Yup, This might result in unintended behavior as timestamp is changed >>> > after the window assignment as elements in windows do not have timestamp >>> > in the window time range. >>> > >>> > Shall we start validating atleast one window assignment between timestamp >>> > assignment and GBK/triggers to avoid unintended behaviors mentioned above? >>> > >>> > On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik <lc...@google.com> wrote: >>> >> >>> >> Window assignment happens at the point in the pipeline the WindowInto >>> >> transform was applied. So in this case the window would have been >>> >> assigned using the original timestamp. >>> >> >>> >> Grouping is by key and window. >>> >> >>> >> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka <goe...@google.com> wrote: >>> >>> >>> >>> Hi, >>> >>> >>> >>> I am not sure about the effect of the order of element timestamp change >>> >>> and window association has on a group by key. >>> >>> More specifically, what would be the behavior if we apply window -> >>> >>> change element timestamp -> Group By key. >>> >>> I think we should always apply window function after changing the >>> >>> timestamp of elements. Though this is neither checked nor a recommended >>> >>> practice in Beam. >>> >>> >>> >>> Example pipeline would look like this: >>> >>> >>> >>> def applyTimestamp(value): >>> >>> return window.TimestampedValue((key, value), >>> >>> int(time.time()) >>> >>> >>> >>> p \ >>> >>> | 'Create' >> beam.Create(range(0, 10)) \ >>> >>> | 'Fixed Window' >> beam.WindowInto(window.FixedWindows(5)) >>> >>> \ >>> >>> | 'Apply Timestamp' >> beam.Map(applyTimestamp) \ # >>> >>> Timestamp is changed after windowing and before GBK >>> >>> | 'Group By Key' >> beam.GroupByKey() \ >>> >>> | 'Print' >> beam.Map(print) >>> >>> >>> >>> Thanks, >>> >>> Ankur