On Thu, Jan 16, 2020 at 9:52 PM Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
On Thu, Jan 16, 2020 at 11:38 AM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>> wrote:
On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
>
> IIRC in Java it is forbidden to output an element with
a timestamp outside its current window.
I don't think this is checked anywhere. (Not sure how you
would check
it, as there's not generic window containment function--I
suppose you
could check if it's past the end of the window (and of
course skew
limits how far you can go back). I suppose you could try
re-windowing
and then fail if it didn't agree with what was already there.
I think you are right. This is governed by how a runner
invoked utilities from runners-core (output ultimately
reaches this point without validation:
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L258)
> An exception is outputs from @FinishBundle, where the
output timestamp is required and the window is applied.
TBH it seems more of an artifact of a mismatch between
the pre-windowing and post-windowing worlds.
Elements are always in some window, even if just the
global window.
I mean that the existence of a window-unaware @FinishBundle
method is an artifact of the method existing prior to
windowing as a concept. The idea that a user can use a DoFn's
local variables to buffer stuff and then output
in @FinishBundle predates the existence of windowing.
> Most of the time, mixing processing across windows is
simply wrong. But there are fears that calling
@FinishBundle once per window would be a performance
problem. On the other hand, don't most correct
implementations have to separate processing for each
window anyhow?
Processing needs to be done per window iff the result
depends on the
window or if there are side effects.
> Anyhow I think the Java behavior is better, so window
assignment happens exactly and only at window transforms.
But then one ends up with timestamps that are unrelated
to the windows, right?
As far as the model goes, I think windows provide an upper
bound but not a lower bound. If we take the approach that
windows are a "secondary key with a max timestamp" then the
timestamps should be related to the window in the sense that
they are <= the window's max timestamp.
A window only makes sense when a trigger or timer is fired. And
the timestamp of the elements in the window should be within the
window's time range when a trigger is set. For consistency, I
think element timestamp should remain within the corresponding
time range at every stage of the graph.
IIUC based on the discussion, users can violate this requirement
easily in the pipeline code which might give inconsistent
behavior across runners.
I think we should stick to a consistent behavior across languages
and runners. We have multiple options here like
1. Don't have any promised correlation between element timestamp
and window. Window will just behave like a secondary key for the
element.
2. Making it explicit that the last window function can be
applied out of order anytime on the elements.
3. Not letting users change the timestamp without applying a
windowing function after the changed timestamp and before a
trigger. Though, this can only be validated at the runtime in python.
4. Revalidating the window after changing the timestamp. Also
provide additional methods to explicitly change the timestamp and
window in oneshot.
5. etc....
Kenn
> Kenn
>
> On Wed, Jan 15, 2020 at 4:59 PM Ankur Goenka
<goe...@google.com <mailto:goe...@google.com>> wrote:
>>
>> The case where a plan vanilla value or a windowed
value is emitted seems as expected as the user intent is
honored without any surprises.
>>
>> If I understand correctly in the case when timestamp
is changed then applying window function again can have
unintended behavior in following cases
>> * Custom windows: User code can be executed in
unintended order.
>> * User emit a windowed value in a previous transform:
Timestamping the value in this case would overwrite the
user assigned window in earlier step even when the actual
timestamp is the same. Semantically, emitting an element
or a timestamped value with the same timestamp should
have the same behaviour.
>>
>> What do you think?
>>
>>
>> On Wed, Jan 15, 2020 at 4:04 PM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>> wrote:
>>>
>>> If an element is emitted with a timestamp, the window
assignment is
>>> re-applied at that time. At least that's how it is in
Python. You can
>>> emit the full windowed value (accepted without
checking...), a
>>> timestamped value (in which case the window will be
computed), or a
>>> plain old element (in which case the window and
timestamp will be
>>> computed (really, propagated)).
>>>
>>> On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka
<goe...@google.com <mailto:goe...@google.com>> wrote:
>>> >
>>> > Yup, This might result in unintended behavior as
timestamp is changed after the window assignment as
elements in windows do not have timestamp in the window
time range.
>>> >
>>> > Shall we start validating atleast one window
assignment between timestamp assignment and GBK/triggers
to avoid unintended behaviors mentioned above?
>>> >
>>> > On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik
<lc...@google.com <mailto:lc...@google.com>> wrote:
>>> >>
>>> >> Window assignment happens at the point in the
pipeline the WindowInto transform was applied. So in this
case the window would have been assigned using the
original timestamp.
>>> >>
>>> >> Grouping is by key and window.
>>> >>
>>> >> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka
<goe...@google.com <mailto:goe...@google.com>> wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> I am not sure about the effect of the order of
element timestamp change and window association has on a
group by key.
>>> >>> More specifically, what would be the behavior if
we apply window -> change element timestamp -> Group By key.
>>> >>> I think we should always apply window function
after changing the timestamp of elements. Though this is
neither checked nor a recommended practice in Beam.
>>> >>>
>>> >>> Example pipeline would look like this:
>>> >>>
>>> >>> def applyTimestamp(value):
>>> >>> return window.TimestampedValue((key,
value), int(time.time())
>>> >>>
>>> >>> p \
>>> >>> | 'Create' >> beam.Create(range(0, 10)) \
>>> >>> | 'Fixed Window' >>
beam.WindowInto(window.FixedWindows(5)) \
>>> >>> | 'Apply Timestamp' >>
beam.Map(applyTimestamp) \ # Timestamp is changed after
windowing and before GBK
>>> >>> | 'Group By Key' >> beam.GroupByKey() \
>>> >>> | 'Print' >> beam.Map(print)
>>> >>>
>>> >>> Thanks,
>>> >>> Ankur