[ 
https://issues.apache.org/jira/browse/BEAM-7428?focusedWorklogId=262395&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-262395
 ]

ASF GitHub Bot logged work on BEAM-7428:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jun/19 15:45
            Start Date: 18/Jun/19 15:45
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on issue #8741: [BEAM-7428] Output 
the timestamp on elements in ReadAllViaFileBasedSource
URL: https://github.com/apache/beam/pull/8741#issuecomment-503194658
 
 
   In the Beam model, the watermark propagates forward since the input 
watermark >= output watermark of each transform. The input watermark is a bound 
saying that all data that is before me will be considered late while the output 
watermark says that all data that is output before me will be considered late. 
So lets say T1 provides data to T2, then
   ```
   ... >= input_watermark(T1) >= output_watermark(T1) >= input_watermark(T2) >= 
ouptut_watermark(T2) >= ...
   ```
   Note that the input/output watermarks are computed by the runner and aren't 
ever exposed to the SDK. Whether something is late or ontime is figured out as 
part of the GroupByKey. The runner also automatically advances the output 
watermark when all the data for a given input watermark has been consumed 
(watermark holds which is a special timer that isn't meant to be exposed to 
SDKs and will be modeled properly with the resolution of 
https://issues.apache.org/jira/browse/BEAM-2535 does allow the SDK to hold back 
the watermark).
   
   This seems all great but what about the roots of the pipeline. This is where 
the `UnboundedSource` has the ability to report what it thinks the watermark 
should be and the runner computes the watermark by taking the min across all 
UnboundedSource instances for a particular root.
   
   Another property of the Beam model is that if you output elements with a 
timestamp that is before the output watermark, the data will be classified as 
late (and depending on the trigger, may be dropped).
   
   Combining these two properties of the Beam model, we get to the issue of 
what timestamp we should output with.
   
   My concern is that the proposed solution of using getCurrentTimestamp and if 
it is unknown fallback to using the element timestamp can lead to data being 
marked late by default in the case where ReadAllViaFileBasedSource is used in a 
streaming pipeline.
   
   The reason why I suggest to model this as an SDF is that I believe SDFs 
should be able to hold the output watermark just like an UnboundedSource by 
invoking ProcessContext.updateWatermark() which would then allow for 
ReadAllViaFileBasedSource to output data without any of it being marked late.
   
   Eugene believes that ProcessContext.updateWatermark() should only be used to 
advance the output watermark faster then the input and to not be able to hold 
the output watermark and hence the data could be marked as late.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 262395)
    Time Spent: 5.5h  (was: 5h 20m)

> ReadAllViaFileBasedSource does not output the timestamps of the read elements
> -----------------------------------------------------------------------------
>
>                 Key: BEAM-7428
>                 URL: https://issues.apache.org/jira/browse/BEAM-7428
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>            Reporter: Ismaël Mejía
>            Assignee: Ismaël Mejía
>            Priority: Minor
>          Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> This differs from the implementation of JavaReadViaImpulse that tackles a 
> similar problem but does output the timestamps correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to