[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps
[ https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15980645#comment-15980645 ] Pei He commented on BEAM-644: - I think this also applies to a more general issue: data unorderedness/delay in the user application level. For example, I have a upstream system collecting data and inject to Kafka, then it is processed by a Beam pipeline. User application level unorderedness has to depends on the SLA of the upstream system. I think this issue is similar with the one discussed in here. (Beam system level unorderedness could be solved by Kafka source who estimate watermarks.) > Primitive to shift the watermark while assigning timestamps > --- > > Key: BEAM-644 > URL: https://issues.apache.org/jira/browse/BEAM-644 > Project: Beam > Issue Type: New Feature > Components: beam-model, beam-model-runner-api >Reporter: Kenneth Knowles > > There is a general need, especially important in the presence of > SplittableDoFn, to be able to assign new timestamps to elements without > making them late or droppable. > - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one > to produce late data, but does not allow one to shift the watermark so the > new data is on-time. > - For a SplittableDoFn, one may receive an element such as the name of a log > file that contains elements for the day preceding the log file. The timestamp > on the filename must currently be the beginning of the log. If such elements > are constantly flowing, it may be OK, but since we don't know that element is > coming, in that absence of data, the watermark may advance. We need a way to > keep it far enough back even in the absence of data holding it back. > One idea is a new primitive ShiftWatermark / AdjustTimestamps with the > following pieces: > - A constant duration (positive or negative) D by which to shift the > watermark. > - A function from TimestampedElement to new timestamp that is >= t + D > So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make > timestamps up to 60 minutes earlier. > With this primitive added, outputWithTimestamp and withAllowedTimestampSkew > could be removed, simplifying DoFn. > Alternatively, all of this functionality could be bolted on to DoFn. > This ticket is not a proposal, but a record of the issue and ideas that were > mentioned. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps
[ https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957959#comment-15957959 ] Eugene Kirpichov commented on BEAM-644: --- I don't think there's an SDF-specific issue here: it applies just as well to a non-splittable DoFn that would take a filename as input, and potentially produce elements whose timestamps are behind the timestamp of the input element (filename). Speaking of updates - no, I don't think there've been any updates. Perhaps in practice this can be solved by feeding the SDF elements with a timestamp of "infinite past", and from then on, relying on the SDF's own output watermark reporting. > Primitive to shift the watermark while assigning timestamps > --- > > Key: BEAM-644 > URL: https://issues.apache.org/jira/browse/BEAM-644 > Project: Beam > Issue Type: New Feature > Components: beam-model, beam-model-runner-api >Reporter: Kenneth Knowles > > There is a general need, especially important in the presence of > SplittableDoFn, to be able to assign new timestamps to elements without > making them late or droppable. > - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one > to produce late data, but does not allow one to shift the watermark so the > new data is on-time. > - For a SplittableDoFn, one may receive an element such as the name of a log > file that contains elements for the day preceding the log file. The timestamp > on the filename must currently be the beginning of the log. If such elements > are constantly flowing, it may be OK, but since we don't know that element is > coming, in that absence of data, the watermark may advance. We need a way to > keep it far enough back even in the absence of data holding it back. > One idea is a new primitive ShiftWatermark / AdjustTimestamps with the > following pieces: > - A constant duration (positive or negative) D by which to shift the > watermark. > - A function from TimestampedElement to new timestamp that is >= t + D > So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make > timestamps up to 60 minutes earlier. > With this primitive added, outputWithTimestamp and withAllowedTimestampSkew > could be removed, simplifying DoFn. > Alternatively, all of this functionality could be bolted on to DoFn. > This ticket is not a proposal, but a record of the issue and ideas that were > mentioned. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (BEAM-644) Primitive to shift the watermark while assigning timestamps
[ https://issues.apache.org/jira/browse/BEAM-644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15953812#comment-15953812 ] Daniel Halperin commented on BEAM-644: -- [~jkff] any update here w.r.t. {{SplittableDoFn}}? > Primitive to shift the watermark while assigning timestamps > --- > > Key: BEAM-644 > URL: https://issues.apache.org/jira/browse/BEAM-644 > Project: Beam > Issue Type: New Feature > Components: beam-model, beam-model-runner-api >Reporter: Kenneth Knowles > > There is a general need, especially important in the presence of > SplittableDoFn, to be able to assign new timestamps to elements without > making them late or droppable. > - DoFn.withAllowedTimestampSkew is inadequate, because it simply allows one > to produce late data, but does not allow one to shift the watermark so the > new data is on-time. > - For a SplittableDoFn, one may receive an element such as the name of a log > file that contains elements for the day preceding the log file. The timestamp > on the filename must currently be the beginning of the log. If such elements > are constantly flowing, it may be OK, but since we don't know that element is > coming, in that absence of data, the watermark may advance. We need a way to > keep it far enough back even in the absence of data holding it back. > One idea is a new primitive ShiftWatermark / AdjustTimestamps with the > following pieces: > - A constant duration (positive or negative) D by which to shift the > watermark. > - A function from TimestampedElement to new timestamp that is >= t + D > So, for example, AdjustTimestamps(<-60 minutes>, f) would allow f to make > timestamps up to 60 minutes earlier. > With this primitive added, outputWithTimestamp and withAllowedTimestampSkew > could be removed, simplifying DoFn. > Alternatively, all of this functionality could be bolted on to DoFn. > This ticket is not a proposal, but a record of the issue and ideas that were > mentioned. -- This message was sent by Atlassian JIRA (v6.3.15#6346)