[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068782#comment-16068782
 ] 

Eugene Kirpichov commented on BEAM-2140:
----------------------------------------

Conceptually, watermarks are for PCollections - lower bound on timestamps of 
new elements that may get added to the collection.
However, at the implementation level, watermarks are assigned to transforms: 
they have an "input watermark" and "output watermark" (I suppose, per input and 
per output).
The difference between the output watermark of a transform producing PC and the 
input watermark of a transform consuming PC is as follows: the input watermark 
is held by "pending elements", that we know need to be processed, but yet 
haven't.
The input watermark is also held by the event-time of pending timers set by the 
transform. In other words, logically the transform's input is (output of the 
producer of the input) + (timers set by the transform itself), and the input 
watermark is held by both of these.

Currently the input watermark of a transform is held only by _event-time_ 
timers; however, it makes sense to hold it also by _processing-time_ timers. 
For that we need to assign them an event-time timestamp. Currently this isn't 
happening at all (except assigning an "effective timestamp" to output from the 
timer firing, when it fires - it is assigned from the current input watermark). 
The suggestion in case of SDF is to use the ProcessContinuation's output 
watermark as the event-time for the residual timer.

We also discussed handling of processing-time timers in batch. Coming from the 
point of view that things should work exactly the same way in batch - setting a 
processing-time timer in batch for firing in 5 minutes should actually fire it 
after 5 minutes, including possibly delaying the bundle until processing-time 
timers quiesce. Motivating use case is, say, using an SDF-based polling 
continuous glob expander in a batch pipeline - it should process the same set 
of files it would in a streaming pipeline.

A few questions I still do not understand:
- Where exactly do the processing-timers get dropped, and on what condition? 
Kenn says that event-time timers don't get dropped: we just forbid setting them 
if they would be already "late". 
- When can an input to the SDF, or a timer set by the SDF be late at all; and 
should the SDF drop them? Technically a runner is free to drop late data at any 
point in the pipeline, but in practice it happens after GBKs; and semantically 
an SDF need not involve a GBK, so it should be allowed to just not drop 
anything late, no? - like a regular DoFn would (as long as it doesn't leak 
state)

Seems like we also should file JIRAs for the following:
- state leakage
- handling processing-time timers in batch properly
- holding watermark by processing-time timers
- allowing the timer API (internals or the user-facing one) to specifying 
event-time of processing-time timers
- more?

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
>                 Key: BEAM-2140
>                 URL: https://issues.apache.org/jira/browse/BEAM-2140
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to