On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles <k...@apache.org> wrote:
>
> IIRC in Java it is forbidden to output an element with a timestamp outside 
> its current window.

I don't think this is checked anywhere. (Not sure how you would check
it, as there's not generic window containment function--I suppose you
could check if it's past the end of the window (and of course skew
limits how far you can go back). I suppose you could try re-windowing
and then fail if it didn't agree with what was already there.

> An exception is outputs from @FinishBundle, where the output timestamp is 
> required and the window is applied. TBH it seems more of an artifact of a 
> mismatch between the pre-windowing and post-windowing worlds.

Elements are always in some window, even if just the global window.

> Most of the time, mixing processing across windows is simply wrong. But there 
> are fears that calling @FinishBundle once per window would be a performance 
> problem. On the other hand, don't most correct implementations have to 
> separate processing for each window anyhow?

Processing needs to be done per window iff the result depends on the
window or if there are side effects.

> Anyhow I think the Java behavior is better, so window assignment happens 
> exactly and only at window transforms.

But then one ends up with timestamps that are unrelated to the windows, right?

> Kenn
>
> On Wed, Jan 15, 2020 at 4:59 PM Ankur Goenka <goe...@google.com> wrote:
>>
>> The case where a plan vanilla value or a windowed value is emitted seems as 
>> expected as the user intent is honored without any surprises.
>>
>> If I understand correctly in the case when timestamp is changed then 
>> applying window function again can have unintended behavior in following 
>> cases
>> * Custom windows: User code can be executed in unintended order.
>> * User emit a windowed value in a previous transform: Timestamping the value 
>> in this case would overwrite the user assigned window in earlier step even 
>> when the actual timestamp is the same. Semantically, emitting an element or 
>> a timestamped value with the same timestamp should have the same behaviour.
>>
>> What do you think?
>>
>>
>> On Wed, Jan 15, 2020 at 4:04 PM Robert Bradshaw <rober...@google.com> wrote:
>>>
>>> If an element is emitted with a timestamp, the window assignment is
>>> re-applied at that time. At least that's how it is in Python. You can
>>> emit the full windowed value (accepted without checking...), a
>>> timestamped value (in which case the window will be computed), or a
>>> plain old element (in which case the window and timestamp will be
>>> computed (really, propagated)).
>>>
>>> On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka <goe...@google.com> wrote:
>>> >
>>> > Yup, This might result in unintended behavior as timestamp is changed 
>>> > after the window assignment as elements in windows do not have timestamp 
>>> > in the window time range.
>>> >
>>> > Shall we start validating atleast one window assignment between timestamp 
>>> > assignment and GBK/triggers to avoid unintended behaviors mentioned above?
>>> >
>>> > On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik <lc...@google.com> wrote:
>>> >>
>>> >> Window assignment happens at the point in the pipeline the WindowInto 
>>> >> transform was applied. So in this case the window would have been 
>>> >> assigned using the original timestamp.
>>> >>
>>> >> Grouping is by key and window.
>>> >>
>>> >> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka <goe...@google.com> wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> I am not sure about the effect of the order of element timestamp change 
>>> >>> and window association has on a group by key.
>>> >>> More specifically, what would be the behavior if we apply window -> 
>>> >>> change element timestamp -> Group By key.
>>> >>> I think we should always apply window function after changing the 
>>> >>> timestamp of elements. Though this is neither checked nor a recommended 
>>> >>> practice in Beam.
>>> >>>
>>> >>> Example pipeline would look like this:
>>> >>>
>>> >>>       def applyTimestamp(value):
>>> >>>             return window.TimestampedValue((key, value), 
>>> >>> int(time.time())
>>> >>>
>>> >>>         p \
>>> >>>             | 'Create' >> beam.Create(range(0, 10)) \
>>> >>>             | 'Fixed Window' >> beam.WindowInto(window.FixedWindows(5)) 
>>> >>> \
>>> >>>             | 'Apply Timestamp' >> beam.Map(applyTimestamp) \ # 
>>> >>> Timestamp is changed after windowing and before GBK
>>> >>>             | 'Group By Key' >> beam.GroupByKey() \
>>> >>>             | 'Print' >> beam.Map(print)
>>> >>>
>>> >>> Thanks,
>>> >>> Ankur

Reply via email to