[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps

2016-09-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15522966#comment-15522966
 ] 

Aljoscha Krettek commented on BEAM-644:
---

[~kenn] I was referring to "two clusters of elements from two separate input 
elements" but that's somewhat besides the point because I was thinking about 
how a Kafka source would be implemented as a combination of {{DoFn}} plus 
{{SplittableDoFn}}. There you need to manage the watermark at the 
{{SplittableDoFn}} which would be responsible for reading from topics.

I think we might be talking about different things here. As I said, the 
proposed changes are very good in how they simplify the API of {{DoFn}} and 
also clean up stuff around allowed time skew.

What I was thinking about is in general a problem with watermarks. I though 
that the proposal here was meant to fixed that but I don't think we can. What I 
was trying to get at essentially boils down to this: If we want our watermark 
to be 100 % correct then we can never advance it because we never know what 
timestamps future elements will have. (For the general case, where any data 
with any timestamp can arrive at any point in (processing) time.). I was just 
pondering that and I'm afraid it derailed the discussion a bit.

> Primitive to shift the watermark while assigning timestamps
> ---
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make 
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps

2016-09-23 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15516981#comment-15516981
 ] 

Kenneth Knowles commented on BEAM-644:
--

[~aljoscha], are you referring to two clusters of output arising from the same 
input element, or two clusters of output from two separate input elements, with 
no data in between? Let me try to write this up again; I am just rephrasing 
your question and Ben's answer to see if I got it right.

In the case where the two clusters around {{t}} and {{t+100}} arise from the 
same input element (for example, the same subscription being read by the 
{{SplittableDoFn}}) then the {{futureOutputWatermark()}} method of the 
{{ProcessContinuation}} manages the watermark in a manner identically to 
{{UnboundedSource}} today. This is #1 from [~bchambers] if I understand 
correctly. This method will be polled regularly in the absence of data, so you 
just have to do the best you can to not move it forward too fast, the same as 
today.

In the case where there are two input elements that results in initial clusters 
of output that are respectively around {{t}} and {{t+100}} (if they are 
non-initial clusters, then I believe it reduces to the above case) then either 
the timestamps of those elements can give a close enough approximation so the 
input watermark is in the right place, or an explicit shift can be added using 
this new proposed primitive, or some similar capability. This is #2 from 
[~bchambers].

Do you have a different case in mind?

> Primitive to shift the watermark while assigning timestamps
> ---
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make 
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps

2016-09-23 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15515875#comment-15515875
 ] 

Aljoscha Krettek commented on BEAM-644:
---

Yes, as a replacement for {{outputWithTimestamp}} and 
{{withAllowedTimestampSkew}} this new proposal is perfect.

[~kenn], I was just thinking about {{SplittableDoFn}} and what happens in the 
absence of data. Say you have some data that you emit form the DoFn that is 
clustered around timestamp {{t}}, then you have no data for a while and then 
you get data that is clustered around {{t + 100}}. In order for that data to 
not be late the watermark has to be held at {{t + 100}} but you cannot know 
that until you actually see the newer data. Holding back by some constant {{D}} 
would not help in that case. Or I might be missing something, of course.

> Primitive to shift the watermark while assigning timestamps
> ---
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make 
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps

2016-09-22 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15513859#comment-15513859
 ] 

Kenneth Knowles commented on BEAM-644:
--

I don't quite understand the comment [~aljoscha]. Can you say more? I have 
incorporated the correction by [~bchambers] and also added one example.

> Primitive to shift the watermark while assigning timestamps
> ---
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement to new timestamp that is >= t + D
> So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make 
> timestamps up to 60 minutes earlier.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps

2016-09-20 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15507379#comment-15507379
 ] 

Ben Chambers commented on BEAM-644:
---

Minor note on "A function from TimestampedElement to new timestamp that 
always falls within D of the original timestamp."

Rather than "within D" I think the requirement is that for an input with 
timestamp t, the output timestamp is >= t+D. This ensures that the output 
timestamps relation to the output watermark is no later than the input 
timestamps relation to the input watermark.

> Primitive to shift the watermark while assigning timestamps
> ---
>
> Key: BEAM-644
> URL: https://issues.apache.org/jira/browse/BEAM-644
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>
> There is a general need, especially important in the presence of 
> SplittableDoFn, to be able to assign new timestamps to elements without 
> making them late or droppable.
>  - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one 
> to produce late data, but does not allow one to shift the watermark so the 
> new data is on-time.
>  - For a SplittableDoFn, one may receive an element such as the name of a log 
> file that contains elements for the day preceding the log file. The timestamp 
> on the filename must currently be the beginning of the log. If such elements 
> are constantly flowing, it may be OK, but since we don't know that element is 
> coming, in that absence of data, the watermark may advance. We need a way to 
> keep it far enough back even in the absence of data holding it back.
> One idea is a new primitive ShiftWatermark / AdjustTimestamps with the 
> following pieces:
>  - A constant duration (positive or negative) D by which to shift the 
> watermark.
>  - A function from TimestampedElement to new timestamp that always falls 
> within D of the original timestamp.
> With this primitive added, outputWithTimestamp and withAllowedTimestampSkew 
> could be removed, simplifying DoFn.
> Alternatively, all of this functionality could be bolted on to DoFn.
> This ticket is not a proposal, but a record of the issue and ideas that were 
> mentioned.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)