Re: Ordering of element timestamp change and window function

2020-01-23 Thread Jan Lukavský

Hi Reuven,

> Your first statement is partially true, but the second statement 
doesn't follow from that. Stateful DoFn is in some sense a more general 
transform, yes. However that doesn't mean that semantics should be 
defined in terms of stateful DoFn. There are other ways of implementing 
GroupByKey, and it's far from clear that stateful DoFn is always the 
best way.


These are two independent things - how semantics (correctness) is 
defined and how transform is implemented. I'm not suggesting implement 
GBK on top of stateful dofn (this might be default, but runners will 
override it to provide more efficient implementation). There is the same 
relation between Combine and GBK -> semantics of GBK are equal to GBK + 
Combine.groupedValues() (ParDo), but that is not how you want to 
implement and run it, because there exist more efficient implementation. 
The same holds true for stateful dofn -> GBK.


> For example, batch runners never implement GroupByKey on top of state.

This is just another kind of optimization that follows from the fact, 
that batch sources can be re-read and so state can be held locally and 
recomputed on failures. This is just optimization that follows from 
specific conditions in batch case and doesn't affect semantics.


> Even in streaming, the current released Beam does not have sufficient 
functionality in Stateful DoFn to properly implement GroupByKey. You 
would need watermark holds for instance (now added to Beam, but not yet 
released). To implement things somewhat efficiently you would also need 
dynamic states, and Beam currently supports only static state tags 
(hopefully dynamic is coming soon)


This is true, but it only shows that these features are actually missing 
in stateful dofn. Another example is missing support for merging windows.


> This is a valid point, but also problematic. The watermark cannot 
work when element times move backwards (partially because the watermark 
is defined to be monotonic). Usually such pipelines end up being 
restricted to using non-watermark techniques for aggregation - i.e. 
processing-time triggers or state+timers.


Two questions here:
 a) why not to recompute watermark when we reassign event time?
 b) why state+timers should be non-watermark approach? (event-time) 
timers AFAIK work on watermark.


Jan

On 1/23/20 10:00 AM, Reuven Lax wrote:



On Wed, Jan 22, 2020 at 11:37 PM Jan Lukavský > wrote:


Hi Kenn,

I do not agree with the last part. We are talking about definition
of semantics. If GBK can be implemented on top of stateful dofn,
then stateful dofn is the more generic transform. Therefore,
semantics should be defined on this transform, and _derived_ (or
transferred) to the less generic ones.


Your first statement is partially true, but the second statement 
doesn't follow from that. Stateful DoFn is in some sense a more 
general transform, yes. However that doesn't mean that semantics 
should be defined in terms of stateful DoFn. There are other ways of 
implementing GroupByKey, and it's far from clear that stateful DoFn is 
always the best way. For example, batch runners never implement 
GroupByKey on top of state. Even in streaming, the current released 
Beam does not have sufficient functionality in Stateful DoFn to 
properly implement GroupByKey. You would need watermark holds for 
instance (now added to Beam, but not yet released). To implement 
things somewhat efficiently you would also need dynamic states, and 
Beam currently supports only static state tags (hopefully dynamic is 
coming soon)


If you execute GBK as a stateful dofn or not (probably not) is
just a runtime optimization (these optimizations are possible due
to discrete - and predictable - movements of time defined by
triggers). But semantics should adhere to the generic definition
and not be affected by runtime optimizations.

Last remark, yes, if we disallow moving element's timestamp to the
past, then we don't need window.minTimestamp, because the
minTimestamp is the defined implicitly by window open time. It
opens a question if a droppable element should or should not be
dropped not only when arriving too late after window close, but if
arriving too late after window open.

But disallowing timestamp to move back in time seems impractical,
because I can imagine source assigning elements ingestion time
timestamps (e.g. kafka by default), which are later remapped to
event time in user code. That will necessarily mean moving time
backwards.

This is a valid point, but also problematic. The watermark cannot work 
when element times move backwards (partially because the watermark is 
defined to be monotonic). Usually such pipelines end up being 
restricted to using non-watermark techniques for aggregation - i.e. 
processing-time triggers or state+timers.


Jan

On 1/22/20 11:53 PM, Kenneth Knowles wrote:

Had a lunch cha

Re: Ordering of element timestamp change and window function

2020-01-23 Thread Reuven Lax
On Wed, Jan 22, 2020 at 11:37 PM Jan Lukavský  wrote:

> Hi Kenn,
>
> I do not agree with the last part. We are talking about definition of
> semantics. If GBK can be implemented on top of stateful dofn, then stateful
> dofn is the more generic transform. Therefore, semantics should be defined
> on this transform, and _derived_ (or transferred) to the less generic ones.
>

Your first statement is partially true, but the second statement doesn't
follow from that. Stateful DoFn is in some sense a more general transform,
yes. However that doesn't mean that semantics should be defined in terms of
stateful DoFn. There are other ways of implementing GroupByKey, and it's
far from clear that stateful DoFn is always the best way. For example,
batch runners never implement GroupByKey on top of state. Even in
streaming, the current released Beam does not have sufficient functionality
in Stateful DoFn to properly implement GroupByKey. You would need watermark
holds for instance (now added to Beam, but not yet released). To implement
things somewhat efficiently you would also need dynamic states, and Beam
currently supports only static state tags (hopefully dynamic is coming soon)

> If you execute GBK as a stateful dofn or not (probably not) is just a
> runtime optimization (these optimizations are possible due to discrete -
> and predictable - movements of time defined by triggers). But semantics
> should adhere to the generic definition and not be affected by runtime
> optimizations.
>
> Last remark, yes, if we disallow moving element's timestamp to the past,
> then we don't need window.minTimestamp, because the minTimestamp is the
> defined implicitly by window open time. It opens a question if a droppable
> element should or should not be dropped not only when arriving too late
> after window close, but if arriving too late after window open.
>
> But disallowing timestamp to move back in time seems impractical, because
> I can imagine source assigning elements ingestion time timestamps (e.g.
> kafka by default), which are later remapped to event time in user code.
> That will necessarily mean moving time backwards.
>
This is a valid point, but also problematic. The watermark cannot work when
element times move backwards (partially because the watermark is defined to
be monotonic). Usually such pipelines end up being restricted to using
non-watermark techniques for aggregation - i.e. processing-time triggers or
state+timers.

Jan
> On 1/22/20 11:53 PM, Kenneth Knowles wrote:
>
> Had a lunch chat about this issue.
>
> Moving elements back in time can make them late or droppable. You just
> can't really do it safely.
>
> Moving elements into the future is fine up to the end of the window. It is
> not safe to move further. The watermark for a PCollection is based on the
> element timestamps. If an element's timestamp is in the future, the
> watermark can advance to that point in the future. This may cause the
> watermark to expire the window. So this can also make data late or
> droppable.
>
> It is actually not true that GBK is based on stateful DoFn. That is one
> way to implement it, but not the only way nor always the best way. They are
> qualitatively different.
>
> Kenn
>
> On Wed, Jan 22, 2020 at 1:52 AM Jan Lukavský  wrote:
>
>> I sense this discussion might be (remotely) related to [1] (and
>> especially [2]). The common ground here is that we need a sound definition
>> of window. I think people might be currently having different definitions,
>> which leads to this sort of misunderstandings. The definition should be
>> created in terms of stateful dofn (not GBK, which might probably be the
>> case today), because that is the most low level transform, all the others
>> are being built upon it. Looking at this with this optics, it seems that
>> window actually scopes state of stateful dofn. The scope can be:
>>
>>  (a) one sided (having only defined max timestamp)
>>
>>  (b) both sided (having minimum and maximum)
>>
>> We have currently approach (a), which results in ability to move
>> timestamp *arbitrarily far to the past*, which moving timestamp to future
>> is limited by window's maxTimestamp. If we extend this to (b), then
>> windowFn starts to create something like universe (actually multiverse,
>> because it can return multiple windows). It should be invalid for element
>> to escape its universe, that would be counter intuitive. If we disallow
>> emission of data elements that are _late even when created_ (i.e. are
>> emitted with timestamp less than output watermark) and we disallow setting
>> timers with timestamp higher than window.maxTimestamp (which we currently
>> do), then we have disallowed any element to escape its window (universe,
>> range of validity). It would also require the output watermark of stateful
>> dofn to be keyed and set to at least window.minTimestamp when window is
>> opened. This would remove a sort of asymmetry (why to know maxTimestamp and
>> not minTimestamp?). Also note 

Re: Ordering of element timestamp change and window function

2020-01-22 Thread Jan Lukavský

Hi Kenn,

I do not agree with the last part. We are talking about definition of 
semantics. If GBK can be implemented on top of stateful dofn, then 
stateful dofn is the more generic transform. Therefore, semantics should 
be defined on this transform, and _derived_ (or transferred) to the less 
generic ones. If you execute GBK as a stateful dofn or not (probably 
not) is just a runtime optimization (these optimizations are possible 
due to discrete - and predictable - movements of time defined by 
triggers). But semantics should adhere to the generic definition and not 
be affected by runtime optimizations.


Last remark, yes, if we disallow moving element's timestamp to the past, 
then we don't need window.minTimestamp, because the minTimestamp is the 
defined implicitly by window open time. It opens a question if a 
droppable element should or should not be dropped not only when arriving 
too late after window close, but if arriving too late after window open.


But disallowing timestamp to move back in time seems impractical, 
because I can imagine source assigning elements ingestion time 
timestamps (e.g. kafka by default), which are later remapped to event 
time in user code. That will necessarily mean moving time backwards.


Jan

On 1/22/20 11:53 PM, Kenneth Knowles wrote:

Had a lunch chat about this issue.

Moving elements back in time can make them late or droppable. You just 
can't really do it safely.


Moving elements into the future is fine up to the end of the window. 
It is not safe to move further. The watermark for a PCollection is 
based on the element timestamps. If an element's timestamp is in the 
future, the watermark can advance to that point in the future. This 
may cause the watermark to expire the window. So this can also make 
data late or droppable.


It is actually not true that GBK is based on stateful DoFn. That is 
one way to implement it, but not the only way nor always the best way. 
They are qualitatively different.


Kenn

On Wed, Jan 22, 2020 at 1:52 AM Jan Lukavský > wrote:


I sense this discussion might be (remotely) related to [1] (and
especially [2]). The common ground here is that we need a sound
definition of window. I think people might be currently having
different definitions, which leads to this sort of
misunderstandings. The definition should be created in terms of
stateful dofn (not GBK, which might probably be the case today),
because that is the most low level transform, all the others are
being built upon it. Looking at this with this optics, it seems
that window actually scopes state of stateful dofn. The scope can be:

 (a) one sided (having only defined max timestamp)

 (b) both sided (having minimum and maximum)

We have currently approach (a), which results in ability to move
timestamp *arbitrarily far to the past*, which moving timestamp to
future is limited by window's maxTimestamp. If we extend this to
(b), then windowFn starts to create something like universe
(actually multiverse, because it can return multiple windows). It
should be invalid for element to escape its universe, that would
be counter intuitive. If we disallow emission of data elements
that are _late even when created_ (i.e. are emitted with timestamp
less than output watermark) and we disallow setting timers with
timestamp higher than window.maxTimestamp (which we currently do),
then we have disallowed any element to escape its window
(universe, range of validity). It would also require the output
watermark of stateful dofn to be keyed and set to at least
window.minTimestamp when window is opened. This would remove a
sort of asymmetry (why to know maxTimestamp and not
minTimestamp?). Also note that (a) is equal to (b) if and only if
we disallow shifting time to past.

Jan

[1]

https://lists.apache.org/thread.html/c37dfb6c545fba7d794a13c507dccebb654bbd8b317dab748a6775dc%40%3Cdev.beam.apache.org%3E

[2]

https://lists.apache.org/thread.html/r7f38860557d6571869e8e0989275f6ed610cf8c99b2f56fc6418a1d1%40%3Cdev.beam.apache.org%3E

On 1/21/20 10:08 PM, Ankur Goenka wrote:



On Thu, Jan 16, 2020 at 9:52 PM Kenneth Knowles mailto:k...@apache.org>> wrote:



On Thu, Jan 16, 2020 at 11:38 AM Robert Bradshaw
mailto:rober...@google.com>> wrote:

On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles
mailto:k...@apache.org>> wrote:
>
> IIRC in Java it is forbidden to output an element with
a timestamp outside its current window.

I don't think this is checked anywhere. (Not sure how you
would check
it, as there's not generic window containment function--I
suppose you
could check if it's past the end of the window (and of
course skew
limits how far you can go back). I suppose yo

Re: Ordering of element timestamp change and window function

2020-01-22 Thread Kenneth Knowles
Had a lunch chat about this issue.

Moving elements back in time can make them late or droppable. You just
can't really do it safely.

Moving elements into the future is fine up to the end of the window. It is
not safe to move further. The watermark for a PCollection is based on the
element timestamps. If an element's timestamp is in the future, the
watermark can advance to that point in the future. This may cause the
watermark to expire the window. So this can also make data late or
droppable.

It is actually not true that GBK is based on stateful DoFn. That is one way
to implement it, but not the only way nor always the best way. They are
qualitatively different.

Kenn

On Wed, Jan 22, 2020 at 1:52 AM Jan Lukavský  wrote:

> I sense this discussion might be (remotely) related to [1] (and especially
> [2]). The common ground here is that we need a sound definition of window.
> I think people might be currently having different definitions, which leads
> to this sort of misunderstandings. The definition should be created in
> terms of stateful dofn (not GBK, which might probably be the case today),
> because that is the most low level transform, all the others are being
> built upon it. Looking at this with this optics, it seems that window
> actually scopes state of stateful dofn. The scope can be:
>
>  (a) one sided (having only defined max timestamp)
>
>  (b) both sided (having minimum and maximum)
>
> We have currently approach (a), which results in ability to move timestamp
> *arbitrarily far to the past*, which moving timestamp to future is limited
> by window's maxTimestamp. If we extend this to (b), then windowFn starts to
> create something like universe (actually multiverse, because it can return
> multiple windows). It should be invalid for element to escape its universe,
> that would be counter intuitive. If we disallow emission of data elements
> that are _late even when created_ (i.e. are emitted with timestamp less
> than output watermark) and we disallow setting timers with timestamp higher
> than window.maxTimestamp (which we currently do), then we have disallowed
> any element to escape its window (universe, range of validity). It would
> also require the output watermark of stateful dofn to be keyed and set to
> at least window.minTimestamp when window is opened. This would remove a
> sort of asymmetry (why to know maxTimestamp and not minTimestamp?). Also
> note that (a) is equal to (b) if and only if we disallow shifting time to
> past.
> Jan
>
> [1]
> https://lists.apache.org/thread.html/c37dfb6c545fba7d794a13c507dccebb654bbd8b317dab748a6775dc%40%3Cdev.beam.apache.org%3E
>
> [2]
> https://lists.apache.org/thread.html/r7f38860557d6571869e8e0989275f6ed610cf8c99b2f56fc6418a1d1%40%3Cdev.beam.apache.org%3E
> On 1/21/20 10:08 PM, Ankur Goenka wrote:
>
>
>
> On Thu, Jan 16, 2020 at 9:52 PM Kenneth Knowles  wrote:
>
>>
>>
>> On Thu, Jan 16, 2020 at 11:38 AM Robert Bradshaw 
>> wrote:
>>
>>> On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles 
>>> wrote:
>>> >
>>> > IIRC in Java it is forbidden to output an element with a timestamp
>>> outside its current window.
>>>
>>> I don't think this is checked anywhere. (Not sure how you would check
>>> it, as there's not generic window containment function--I suppose you
>>> could check if it's past the end of the window (and of course skew
>>> limits how far you can go back). I suppose you could try re-windowing
>>> and then fail if it didn't agree with what was already there.
>>>
>>
>> I think you are right. This is governed by how a runner invoked utilities
>> from runners-core (output ultimately reaches this point without validation:
>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L258
>> )
>>
>>
>>> > An exception is outputs from @FinishBundle, where the output timestamp
>>> is required and the window is applied. TBH it seems more of an artifact of
>>> a mismatch between the pre-windowing and post-windowing worlds.
>>>
>>> Elements are always in some window, even if just the global window.
>>>
>>
>> I mean that the existence of a window-unaware @FinishBundle method is an
>> artifact of the method existing prior to windowing as a concept. The idea
>> that a user can use a DoFn's local variables to buffer stuff and then
>> output in @FinishBundle predates the existence of windowing.
>>
>> > Most of the time, mixing processing across windows is simply wrong. But
>>> there are fears that calling @FinishBundle once per window would be a
>>> performance problem. On the other hand, don't most correct implementations
>>> have to separate processing for each window anyhow?
>>>
>>> Processing needs to be done per window iff the result depends on the
>>> window or if there are side effects.
>>>
>>> > Anyhow I think the Java behavior is better, so window assignment
>>> happens exactly and only at window transforms.
>>>
>>> But then one ends up with timestamps that are unrelated to t

Re: Ordering of element timestamp change and window function

2020-01-22 Thread Jan Lukavský
I sense this discussion might be (remotely) related to [1] (and 
especially [2]). The common ground here is that we need a sound 
definition of window. I think people might be currently having different 
definitions, which leads to this sort of misunderstandings. The 
definition should be created in terms of stateful dofn (not GBK, which 
might probably be the case today), because that is the most low level 
transform, all the others are being built upon it. Looking at this with 
this optics, it seems that window actually scopes state of stateful 
dofn. The scope can be:


 (a) one sided (having only defined max timestamp)

 (b) both sided (having minimum and maximum)

We have currently approach (a), which results in ability to move 
timestamp *arbitrarily far to the past*, which moving timestamp to 
future is limited by window's maxTimestamp. If we extend this to (b), 
then windowFn starts to create something like universe (actually 
multiverse, because it can return multiple windows). It should be 
invalid for element to escape its universe, that would be counter 
intuitive. If we disallow emission of data elements that are _late even 
when created_ (i.e. are emitted with timestamp less than output 
watermark) and we disallow setting timers with timestamp higher than 
window.maxTimestamp (which we currently do), then we have disallowed any 
element to escape its window (universe, range of validity). It would 
also require the output watermark of stateful dofn to be keyed and set 
to at least window.minTimestamp when window is opened. This would remove 
a sort of asymmetry (why to know maxTimestamp and not minTimestamp?). 
Also note that (a) is equal to (b) if and only if we disallow shifting 
time to past.


Jan

[1] 
https://lists.apache.org/thread.html/c37dfb6c545fba7d794a13c507dccebb654bbd8b317dab748a6775dc%40%3Cdev.beam.apache.org%3E


[2] 
https://lists.apache.org/thread.html/r7f38860557d6571869e8e0989275f6ed610cf8c99b2f56fc6418a1d1%40%3Cdev.beam.apache.org%3E


On 1/21/20 10:08 PM, Ankur Goenka wrote:



On Thu, Jan 16, 2020 at 9:52 PM Kenneth Knowles > wrote:




On Thu, Jan 16, 2020 at 11:38 AM Robert Bradshaw
mailto:rober...@google.com>> wrote:

On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles
mailto:k...@apache.org>> wrote:
>
> IIRC in Java it is forbidden to output an element with a
timestamp outside its current window.

I don't think this is checked anywhere. (Not sure how you
would check
it, as there's not generic window containment function--I
suppose you
could check if it's past the end of the window (and of course skew
limits how far you can go back). I suppose you could try
re-windowing
and then fail if it didn't agree with what was already there.


I think you are right. This is governed by how a runner invoked
utilities from runners-core (output ultimately reaches this point
without validation:

https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L258)

> An exception is outputs from @FinishBundle, where the output
timestamp is required and the window is applied. TBH it seems
more of an artifact of a mismatch between the pre-windowing
and post-windowing worlds.

Elements are always in some window, even if just the global
window.


I mean that the existence of a window-unaware @FinishBundle method
is an artifact of the method existing prior to windowing as a
concept. The idea that a user can use a DoFn's local variables to
buffer stuff and then output in @FinishBundle predates the
existence of windowing.

> Most of the time, mixing processing across windows is simply
wrong. But there are fears that calling @FinishBundle once per
window would be a performance problem. On the other hand,
don't most correct implementations have to separate processing
for each window anyhow?

Processing needs to be done per window iff the result depends
on the
window or if there are side effects.

> Anyhow I think the Java behavior is better, so window
assignment happens exactly and only at window transforms.

But then one ends up with timestamps that are unrelated to the
windows, right?


As far as the model goes, I think windows provide an upper bound
but not a lower bound. If we take the approach that windows are a
"secondary key with a max timestamp" then the timestamps should be
related to the window in the sense that they are <= the window's
max timestamp.

A window only makes sense when a trigger or timer is fired. And the 
timestamp of the elements in the window should be within the window's 
time range when a trigger is set. For consistency, I think element 
timestamp should remain within the cor

Re: Ordering of element timestamp change and window function

2020-01-21 Thread Ankur Goenka
On Thu, Jan 16, 2020 at 9:52 PM Kenneth Knowles  wrote:

>
>
> On Thu, Jan 16, 2020 at 11:38 AM Robert Bradshaw 
> wrote:
>
>> On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles  wrote:
>> >
>> > IIRC in Java it is forbidden to output an element with a timestamp
>> outside its current window.
>>
>> I don't think this is checked anywhere. (Not sure how you would check
>> it, as there's not generic window containment function--I suppose you
>> could check if it's past the end of the window (and of course skew
>> limits how far you can go back). I suppose you could try re-windowing
>> and then fail if it didn't agree with what was already there.
>>
>
> I think you are right. This is governed by how a runner invoked utilities
> from runners-core (output ultimately reaches this point without validation:
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L258
> )
>
>
>> > An exception is outputs from @FinishBundle, where the output timestamp
>> is required and the window is applied. TBH it seems more of an artifact of
>> a mismatch between the pre-windowing and post-windowing worlds.
>>
>> Elements are always in some window, even if just the global window.
>>
>
> I mean that the existence of a window-unaware @FinishBundle method is an
> artifact of the method existing prior to windowing as a concept. The idea
> that a user can use a DoFn's local variables to buffer stuff and then
> output in @FinishBundle predates the existence of windowing.
>
> > Most of the time, mixing processing across windows is simply wrong. But
>> there are fears that calling @FinishBundle once per window would be a
>> performance problem. On the other hand, don't most correct implementations
>> have to separate processing for each window anyhow?
>>
>> Processing needs to be done per window iff the result depends on the
>> window or if there are side effects.
>>
>> > Anyhow I think the Java behavior is better, so window assignment
>> happens exactly and only at window transforms.
>>
>> But then one ends up with timestamps that are unrelated to the windows,
>> right?
>>
>
> As far as the model goes, I think windows provide an upper bound but not a
> lower bound. If we take the approach that windows are a "secondary key with
> a max timestamp" then the timestamps should be related to the window in the
> sense that they are <= the window's max timestamp.
>
A window only makes sense when a trigger or timer is fired. And the
timestamp of the elements in the window should be within the window's time
range when a trigger is set. For consistency, I think element timestamp
should remain within the corresponding time range at every stage of the
graph.
IIUC based on the discussion, users can violate this requirement easily in
the pipeline code which might give inconsistent behavior across runners.

I think we should stick to a consistent behavior across languages and
runners. We have multiple options here like
1. Don't have any promised correlation between element timestamp and
window. Window will just behave like a secondary key for the element.
2. Making it explicit that the last window function can be applied out of
order anytime on the elements.
3. Not letting users change the timestamp without applying a windowing
function after the changed timestamp and before a trigger. Though, this can
only be validated at the runtime in python.
4. Revalidating the window after changing the timestamp. Also provide
additional methods to explicitly change the timestamp and window in oneshot.
5. etc


> Kenn
>
>
>
>> > Kenn
>> >
>> > On Wed, Jan 15, 2020 at 4:59 PM Ankur Goenka  wrote:
>> >>
>> >> The case where a plan vanilla value or a windowed value is emitted
>> seems as expected as the user intent is honored without any surprises.
>> >>
>> >> If I understand correctly in the case when timestamp is changed then
>> applying window function again can have unintended behavior in following
>> cases
>> >> * Custom windows: User code can be executed in unintended order.
>> >> * User emit a windowed value in a previous transform: Timestamping the
>> value in this case would overwrite the user assigned window in earlier step
>> even when the actual timestamp is the same. Semantically, emitting an
>> element or a timestamped value with the same timestamp should have the same
>> behaviour.
>> >>
>> >> What do you think?
>> >>
>> >>
>> >> On Wed, Jan 15, 2020 at 4:04 PM Robert Bradshaw 
>> wrote:
>> >>>
>> >>> If an element is emitted with a timestamp, the window assignment is
>> >>> re-applied at that time. At least that's how it is in Python. You can
>> >>> emit the full windowed value (accepted without checking...), a
>> >>> timestamped value (in which case the window will be computed), or a
>> >>> plain old element (in which case the window and timestamp will be
>> >>> computed (really, propagated)).
>> >>>
>> >>> On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka 
>> wrote:
>> >>> >
>> >>> >

Re: Ordering of element timestamp change and window function

2020-01-16 Thread Kenneth Knowles
On Thu, Jan 16, 2020 at 11:38 AM Robert Bradshaw 
wrote:

> On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles  wrote:
> >
> > IIRC in Java it is forbidden to output an element with a timestamp
> outside its current window.
>
> I don't think this is checked anywhere. (Not sure how you would check
> it, as there's not generic window containment function--I suppose you
> could check if it's past the end of the window (and of course skew
> limits how far you can go back). I suppose you could try re-windowing
> and then fail if it didn't agree with what was already there.
>

I think you are right. This is governed by how a runner invoked utilities
from runners-core (output ultimately reaches this point without validation:
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L258
)


> > An exception is outputs from @FinishBundle, where the output timestamp
> is required and the window is applied. TBH it seems more of an artifact of
> a mismatch between the pre-windowing and post-windowing worlds.
>
> Elements are always in some window, even if just the global window.
>

I mean that the existence of a window-unaware @FinishBundle method is an
artifact of the method existing prior to windowing as a concept. The idea
that a user can use a DoFn's local variables to buffer stuff and then
output in @FinishBundle predates the existence of windowing.

> Most of the time, mixing processing across windows is simply wrong. But
> there are fears that calling @FinishBundle once per window would be a
> performance problem. On the other hand, don't most correct implementations
> have to separate processing for each window anyhow?
>
> Processing needs to be done per window iff the result depends on the
> window or if there are side effects.
>
> > Anyhow I think the Java behavior is better, so window assignment happens
> exactly and only at window transforms.
>
> But then one ends up with timestamps that are unrelated to the windows,
> right?
>

As far as the model goes, I think windows provide an upper bound but not a
lower bound. If we take the approach that windows are a "secondary key with
a max timestamp" then the timestamps should be related to the window in the
sense that they are <= the window's max timestamp.

Kenn



> > Kenn
> >
> > On Wed, Jan 15, 2020 at 4:59 PM Ankur Goenka  wrote:
> >>
> >> The case where a plan vanilla value or a windowed value is emitted
> seems as expected as the user intent is honored without any surprises.
> >>
> >> If I understand correctly in the case when timestamp is changed then
> applying window function again can have unintended behavior in following
> cases
> >> * Custom windows: User code can be executed in unintended order.
> >> * User emit a windowed value in a previous transform: Timestamping the
> value in this case would overwrite the user assigned window in earlier step
> even when the actual timestamp is the same. Semantically, emitting an
> element or a timestamped value with the same timestamp should have the same
> behaviour.
> >>
> >> What do you think?
> >>
> >>
> >> On Wed, Jan 15, 2020 at 4:04 PM Robert Bradshaw 
> wrote:
> >>>
> >>> If an element is emitted with a timestamp, the window assignment is
> >>> re-applied at that time. At least that's how it is in Python. You can
> >>> emit the full windowed value (accepted without checking...), a
> >>> timestamped value (in which case the window will be computed), or a
> >>> plain old element (in which case the window and timestamp will be
> >>> computed (really, propagated)).
> >>>
> >>> On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka 
> wrote:
> >>> >
> >>> > Yup, This might result in unintended behavior as timestamp is
> changed after the window assignment as elements in windows do not have
> timestamp in the window time range.
> >>> >
> >>> > Shall we start validating atleast one window assignment between
> timestamp assignment and GBK/triggers to avoid unintended behaviors
> mentioned above?
> >>> >
> >>> > On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik  wrote:
> >>> >>
> >>> >> Window assignment happens at the point in the pipeline the
> WindowInto transform was applied. So in this case the window would have
> been assigned using the original timestamp.
> >>> >>
> >>> >> Grouping is by key and window.
> >>> >>
> >>> >> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka 
> wrote:
> >>> >>>
> >>> >>> Hi,
> >>> >>>
> >>> >>> I am not sure about the effect of the order of element timestamp
> change and window association has on a group by key.
> >>> >>> More specifically, what would be the behavior if we apply window
> -> change element timestamp -> Group By key.
> >>> >>> I think we should always apply window function after changing the
> timestamp of elements. Though this is neither checked nor a recommended
> practice in Beam.
> >>> >>>
> >>> >>> Example pipeline would look like this:
> >>> >>>
> >>> >>>   def applyTimestamp(value):
> >>> >>> re

Re: Ordering of element timestamp change and window function

2020-01-16 Thread Robert Bradshaw
On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles  wrote:
>
> IIRC in Java it is forbidden to output an element with a timestamp outside 
> its current window.

I don't think this is checked anywhere. (Not sure how you would check
it, as there's not generic window containment function--I suppose you
could check if it's past the end of the window (and of course skew
limits how far you can go back). I suppose you could try re-windowing
and then fail if it didn't agree with what was already there.

> An exception is outputs from @FinishBundle, where the output timestamp is 
> required and the window is applied. TBH it seems more of an artifact of a 
> mismatch between the pre-windowing and post-windowing worlds.

Elements are always in some window, even if just the global window.

> Most of the time, mixing processing across windows is simply wrong. But there 
> are fears that calling @FinishBundle once per window would be a performance 
> problem. On the other hand, don't most correct implementations have to 
> separate processing for each window anyhow?

Processing needs to be done per window iff the result depends on the
window or if there are side effects.

> Anyhow I think the Java behavior is better, so window assignment happens 
> exactly and only at window transforms.

But then one ends up with timestamps that are unrelated to the windows, right?

> Kenn
>
> On Wed, Jan 15, 2020 at 4:59 PM Ankur Goenka  wrote:
>>
>> The case where a plan vanilla value or a windowed value is emitted seems as 
>> expected as the user intent is honored without any surprises.
>>
>> If I understand correctly in the case when timestamp is changed then 
>> applying window function again can have unintended behavior in following 
>> cases
>> * Custom windows: User code can be executed in unintended order.
>> * User emit a windowed value in a previous transform: Timestamping the value 
>> in this case would overwrite the user assigned window in earlier step even 
>> when the actual timestamp is the same. Semantically, emitting an element or 
>> a timestamped value with the same timestamp should have the same behaviour.
>>
>> What do you think?
>>
>>
>> On Wed, Jan 15, 2020 at 4:04 PM Robert Bradshaw  wrote:
>>>
>>> If an element is emitted with a timestamp, the window assignment is
>>> re-applied at that time. At least that's how it is in Python. You can
>>> emit the full windowed value (accepted without checking...), a
>>> timestamped value (in which case the window will be computed), or a
>>> plain old element (in which case the window and timestamp will be
>>> computed (really, propagated)).
>>>
>>> On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka  wrote:
>>> >
>>> > Yup, This might result in unintended behavior as timestamp is changed 
>>> > after the window assignment as elements in windows do not have timestamp 
>>> > in the window time range.
>>> >
>>> > Shall we start validating atleast one window assignment between timestamp 
>>> > assignment and GBK/triggers to avoid unintended behaviors mentioned above?
>>> >
>>> > On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik  wrote:
>>> >>
>>> >> Window assignment happens at the point in the pipeline the WindowInto 
>>> >> transform was applied. So in this case the window would have been 
>>> >> assigned using the original timestamp.
>>> >>
>>> >> Grouping is by key and window.
>>> >>
>>> >> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka  wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> I am not sure about the effect of the order of element timestamp change 
>>> >>> and window association has on a group by key.
>>> >>> More specifically, what would be the behavior if we apply window -> 
>>> >>> change element timestamp -> Group By key.
>>> >>> I think we should always apply window function after changing the 
>>> >>> timestamp of elements. Though this is neither checked nor a recommended 
>>> >>> practice in Beam.
>>> >>>
>>> >>> Example pipeline would look like this:
>>> >>>
>>> >>>   def applyTimestamp(value):
>>> >>> return window.TimestampedValue((key, value), 
>>> >>> int(time.time())
>>> >>>
>>> >>> p \
>>> >>> | 'Create' >> beam.Create(range(0, 10)) \
>>> >>> | 'Fixed Window' >> beam.WindowInto(window.FixedWindows(5)) 
>>> >>> \
>>> >>> | 'Apply Timestamp' >> beam.Map(applyTimestamp) \ # 
>>> >>> Timestamp is changed after windowing and before GBK
>>> >>> | 'Group By Key' >> beam.GroupByKey() \
>>> >>> | 'Print' >> beam.Map(print)
>>> >>>
>>> >>> Thanks,
>>> >>> Ankur


Re: Ordering of element timestamp change and window function

2020-01-16 Thread Kenneth Knowles
IIRC in Java it is forbidden to output an element with a timestamp outside
its current window. An exception is outputs from @FinishBundle, where the
output timestamp is required and the window is applied. TBH it seems more
of an artifact of a mismatch between the pre-windowing and post-windowing
worlds. Most of the time, mixing processing across windows is simply wrong.
But there are fears that calling @FinishBundle once per window would be a
performance problem. On the other hand, don't most correct implementations
have to separate processing for each window anyhow?

Anyhow I think the Java behavior is better, so window assignment happens
exactly and only at window transforms.

Kenn

On Wed, Jan 15, 2020 at 4:59 PM Ankur Goenka  wrote:

> The case where a plan vanilla value or a windowed value is emitted seems
> as expected as the user intent is honored without any surprises.
>
> If I understand correctly in the case when timestamp is changed then
> applying window function again can have unintended behavior in following
> cases
> * Custom windows: User code can be executed in unintended order.
> * User emit a windowed value in a previous transform: Timestamping the
> value in this case would overwrite the user assigned window in earlier step
> even when the actual timestamp is the same. Semantically, emitting an
> element or a timestamped value with the same timestamp should have the same
> behaviour.
>
> What do you think?
>
>
> On Wed, Jan 15, 2020 at 4:04 PM Robert Bradshaw 
> wrote:
>
>> If an element is emitted with a timestamp, the window assignment is
>> re-applied at that time. At least that's how it is in Python. You can
>> emit the full windowed value (accepted without checking...), a
>> timestamped value (in which case the window will be computed), or a
>> plain old element (in which case the window and timestamp will be
>> computed (really, propagated)).
>>
>> On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka  wrote:
>> >
>> > Yup, This might result in unintended behavior as timestamp is changed
>> after the window assignment as elements in windows do not have timestamp in
>> the window time range.
>> >
>> > Shall we start validating atleast one window assignment between
>> timestamp assignment and GBK/triggers to avoid unintended behaviors
>> mentioned above?
>> >
>> > On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik  wrote:
>> >>
>> >> Window assignment happens at the point in the pipeline the WindowInto
>> transform was applied. So in this case the window would have been assigned
>> using the original timestamp.
>> >>
>> >> Grouping is by key and window.
>> >>
>> >> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka 
>> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I am not sure about the effect of the order of element timestamp
>> change and window association has on a group by key.
>> >>> More specifically, what would be the behavior if we apply window ->
>> change element timestamp -> Group By key.
>> >>> I think we should always apply window function after changing the
>> timestamp of elements. Though this is neither checked nor a recommended
>> practice in Beam.
>> >>>
>> >>> Example pipeline would look like this:
>> >>>
>> >>>   def applyTimestamp(value):
>> >>> return window.TimestampedValue((key, value),
>> int(time.time())
>> >>>
>> >>> p \
>> >>> | 'Create' >> beam.Create(range(0, 10)) \
>> >>> | 'Fixed Window' >>
>> beam.WindowInto(window.FixedWindows(5)) \
>> >>> | 'Apply Timestamp' >> beam.Map(applyTimestamp) \ #
>> Timestamp is changed after windowing and before GBK
>> >>> | 'Group By Key' >> beam.GroupByKey() \
>> >>> | 'Print' >> beam.Map(print)
>> >>>
>> >>> Thanks,
>> >>> Ankur
>>
>


Re: Ordering of element timestamp change and window function

2020-01-15 Thread Ankur Goenka
The case where a plan vanilla value or a windowed value is emitted seems as
expected as the user intent is honored without any surprises.

If I understand correctly in the case when timestamp is changed then
applying window function again can have unintended behavior in following
cases
* Custom windows: User code can be executed in unintended order.
* User emit a windowed value in a previous transform: Timestamping the
value in this case would overwrite the user assigned window in earlier step
even when the actual timestamp is the same. Semantically, emitting an
element or a timestamped value with the same timestamp should have the same
behaviour.

What do you think?


On Wed, Jan 15, 2020 at 4:04 PM Robert Bradshaw  wrote:

> If an element is emitted with a timestamp, the window assignment is
> re-applied at that time. At least that's how it is in Python. You can
> emit the full windowed value (accepted without checking...), a
> timestamped value (in which case the window will be computed), or a
> plain old element (in which case the window and timestamp will be
> computed (really, propagated)).
>
> On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka  wrote:
> >
> > Yup, This might result in unintended behavior as timestamp is changed
> after the window assignment as elements in windows do not have timestamp in
> the window time range.
> >
> > Shall we start validating atleast one window assignment between
> timestamp assignment and GBK/triggers to avoid unintended behaviors
> mentioned above?
> >
> > On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik  wrote:
> >>
> >> Window assignment happens at the point in the pipeline the WindowInto
> transform was applied. So in this case the window would have been assigned
> using the original timestamp.
> >>
> >> Grouping is by key and window.
> >>
> >> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka  wrote:
> >>>
> >>> Hi,
> >>>
> >>> I am not sure about the effect of the order of element timestamp
> change and window association has on a group by key.
> >>> More specifically, what would be the behavior if we apply window ->
> change element timestamp -> Group By key.
> >>> I think we should always apply window function after changing the
> timestamp of elements. Though this is neither checked nor a recommended
> practice in Beam.
> >>>
> >>> Example pipeline would look like this:
> >>>
> >>>   def applyTimestamp(value):
> >>> return window.TimestampedValue((key, value),
> int(time.time())
> >>>
> >>> p \
> >>> | 'Create' >> beam.Create(range(0, 10)) \
> >>> | 'Fixed Window' >>
> beam.WindowInto(window.FixedWindows(5)) \
> >>> | 'Apply Timestamp' >> beam.Map(applyTimestamp) \ #
> Timestamp is changed after windowing and before GBK
> >>> | 'Group By Key' >> beam.GroupByKey() \
> >>> | 'Print' >> beam.Map(print)
> >>>
> >>> Thanks,
> >>> Ankur
>


Re: Ordering of element timestamp change and window function

2020-01-15 Thread Robert Bradshaw
If an element is emitted with a timestamp, the window assignment is
re-applied at that time. At least that's how it is in Python. You can
emit the full windowed value (accepted without checking...), a
timestamped value (in which case the window will be computed), or a
plain old element (in which case the window and timestamp will be
computed (really, propagated)).

On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka  wrote:
>
> Yup, This might result in unintended behavior as timestamp is changed after 
> the window assignment as elements in windows do not have timestamp in the 
> window time range.
>
> Shall we start validating atleast one window assignment between timestamp 
> assignment and GBK/triggers to avoid unintended behaviors mentioned above?
>
> On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik  wrote:
>>
>> Window assignment happens at the point in the pipeline the WindowInto 
>> transform was applied. So in this case the window would have been assigned 
>> using the original timestamp.
>>
>> Grouping is by key and window.
>>
>> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka  wrote:
>>>
>>> Hi,
>>>
>>> I am not sure about the effect of the order of element timestamp change and 
>>> window association has on a group by key.
>>> More specifically, what would be the behavior if we apply window -> change 
>>> element timestamp -> Group By key.
>>> I think we should always apply window function after changing the timestamp 
>>> of elements. Though this is neither checked nor a recommended practice in 
>>> Beam.
>>>
>>> Example pipeline would look like this:
>>>
>>>   def applyTimestamp(value):
>>> return window.TimestampedValue((key, value), int(time.time())
>>>
>>> p \
>>> | 'Create' >> beam.Create(range(0, 10)) \
>>> | 'Fixed Window' >> beam.WindowInto(window.FixedWindows(5)) \
>>> | 'Apply Timestamp' >> beam.Map(applyTimestamp) \ # Timestamp 
>>> is changed after windowing and before GBK
>>> | 'Group By Key' >> beam.GroupByKey() \
>>> | 'Print' >> beam.Map(print)
>>>
>>> Thanks,
>>> Ankur


Re: Ordering of element timestamp change and window function

2020-01-15 Thread Ankur Goenka
Yup, This might result in unintended behavior as timestamp is changed after
the window assignment as elements in windows do not have timestamp in the
window time range.

Shall we start validating atleast one window assignment between timestamp
assignment and GBK/triggers to avoid unintended behaviors mentioned above?

On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik  wrote:

> Window assignment happens at the point in the pipeline the WindowInto
> transform was applied. So in this case the window would have been assigned
> using the original timestamp.
>
> Grouping is by key and window.
>
> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka  wrote:
>
>> Hi,
>>
>> I am not sure about the effect of the order of element timestamp change
>> and window association has on a group by key.
>> More specifically, what would be the behavior if we apply window ->
>> change element timestamp -> Group By key.
>> I think we should always apply window function after changing the
>> timestamp of elements. Though this is neither checked nor a recommended
>> practice in Beam.
>>
>> Example pipeline would look like this:
>>
>>   def applyTimestamp(value):
>> return window.TimestampedValue((key, value), int(time.time())
>>
>> p \
>> | 'Create' >> beam.Create(range(0, 10)) \
>> | 'Fixed Window' >> beam.WindowInto(window.FixedWindows(5)) \
>> | 'Apply Timestamp' >> beam.Map(applyTimestamp) \ # Timestamp
>> is changed after windowing and before GBK
>> | 'Group By Key' >> beam.GroupByKey() \
>> | 'Print' >> beam.Map(print)
>>
>> Thanks,
>> Ankur
>>
>


Re: Ordering of element timestamp change and window function

2020-01-15 Thread Luke Cwik
Window assignment happens at the point in the pipeline the WindowInto
transform was applied. So in this case the window would have been assigned
using the original timestamp.

Grouping is by key and window.

On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka  wrote:

> Hi,
>
> I am not sure about the effect of the order of element timestamp change
> and window association has on a group by key.
> More specifically, what would be the behavior if we apply window -> change
> element timestamp -> Group By key.
> I think we should always apply window function after changing the
> timestamp of elements. Though this is neither checked nor a recommended
> practice in Beam.
>
> Example pipeline would look like this:
>
>   def applyTimestamp(value):
> return window.TimestampedValue((key, value), int(time.time())
>
> p \
> | 'Create' >> beam.Create(range(0, 10)) \
> | 'Fixed Window' >> beam.WindowInto(window.FixedWindows(5)) \
> | 'Apply Timestamp' >> beam.Map(applyTimestamp) \ # Timestamp
> is changed after windowing and before GBK
> | 'Group By Key' >> beam.GroupByKey() \
> | 'Print' >> beam.Map(print)
>
> Thanks,
> Ankur
>