[ https://issues.apache.org/jira/browse/BEAM-7428?focusedWorklogId=262395&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-262395 ]
ASF GitHub Bot logged work on BEAM-7428: ---------------------------------------- Author: ASF GitHub Bot Created on: 18/Jun/19 15:45 Start Date: 18/Jun/19 15:45 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #8741: [BEAM-7428] Output the timestamp on elements in ReadAllViaFileBasedSource URL: https://github.com/apache/beam/pull/8741#issuecomment-503194658 In the Beam model, the watermark propagates forward since the input watermark >= output watermark of each transform. The input watermark is a bound saying that all data that is before me will be considered late while the output watermark says that all data that is output before me will be considered late. So lets say T1 provides data to T2, then ``` ... >= input_watermark(T1) >= output_watermark(T1) >= input_watermark(T2) >= ouptut_watermark(T2) >= ... ``` Note that the input/output watermarks are computed by the runner and aren't ever exposed to the SDK. Whether something is late or ontime is figured out as part of the GroupByKey. The runner also automatically advances the output watermark when all the data for a given input watermark has been consumed (watermark holds which is a special timer that isn't meant to be exposed to SDKs and will be modeled properly with the resolution of https://issues.apache.org/jira/browse/BEAM-2535 does allow the SDK to hold back the watermark). This seems all great but what about the roots of the pipeline. This is where the `UnboundedSource` has the ability to report what it thinks the watermark should be and the runner computes the watermark by taking the min across all UnboundedSource instances for a particular root. Another property of the Beam model is that if you output elements with a timestamp that is before the output watermark, the data will be classified as late (and depending on the trigger, may be dropped). Combining these two properties of the Beam model, we get to the issue of what timestamp we should output with. My concern is that the proposed solution of using getCurrentTimestamp and if it is unknown fallback to using the element timestamp can lead to data being marked late by default in the case where ReadAllViaFileBasedSource is used in a streaming pipeline. The reason why I suggest to model this as an SDF is that I believe SDFs should be able to hold the output watermark just like an UnboundedSource by invoking ProcessContext.updateWatermark() which would then allow for ReadAllViaFileBasedSource to output data without any of it being marked late. Eugene believes that ProcessContext.updateWatermark() should only be used to advance the output watermark faster then the input and to not be able to hold the output watermark and hence the data could be marked as late. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 262395) Time Spent: 5.5h (was: 5h 20m) > ReadAllViaFileBasedSource does not output the timestamps of the read elements > ----------------------------------------------------------------------------- > > Key: BEAM-7428 > URL: https://issues.apache.org/jira/browse/BEAM-7428 > Project: Beam > Issue Type: Bug > Components: sdk-java-core > Reporter: Ismaël Mejía > Assignee: Ismaël Mejía > Priority: Minor > Time Spent: 5.5h > Remaining Estimate: 0h > > This differs from the implementation of JavaReadViaImpulse that tackles a > similar problem but does output the timestamps correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)