[ https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068782#comment-16068782 ]
Eugene Kirpichov commented on BEAM-2140: ---------------------------------------- Conceptually, watermarks are for PCollections - lower bound on timestamps of new elements that may get added to the collection. However, at the implementation level, watermarks are assigned to transforms: they have an "input watermark" and "output watermark" (I suppose, per input and per output). The difference between the output watermark of a transform producing PC and the input watermark of a transform consuming PC is as follows: the input watermark is held by "pending elements", that we know need to be processed, but yet haven't. The input watermark is also held by the event-time of pending timers set by the transform. In other words, logically the transform's input is (output of the producer of the input) + (timers set by the transform itself), and the input watermark is held by both of these. Currently the input watermark of a transform is held only by _event-time_ timers; however, it makes sense to hold it also by _processing-time_ timers. For that we need to assign them an event-time timestamp. Currently this isn't happening at all (except assigning an "effective timestamp" to output from the timer firing, when it fires - it is assigned from the current input watermark). The suggestion in case of SDF is to use the ProcessContinuation's output watermark as the event-time for the residual timer. We also discussed handling of processing-time timers in batch. Coming from the point of view that things should work exactly the same way in batch - setting a processing-time timer in batch for firing in 5 minutes should actually fire it after 5 minutes, including possibly delaying the bundle until processing-time timers quiesce. Motivating use case is, say, using an SDF-based polling continuous glob expander in a batch pipeline - it should process the same set of files it would in a streaming pipeline. A few questions I still do not understand: - Where exactly do the processing-timers get dropped, and on what condition? Kenn says that event-time timers don't get dropped: we just forbid setting them if they would be already "late". - When can an input to the SDF, or a timer set by the SDF be late at all; and should the SDF drop them? Technically a runner is free to drop late data at any point in the pipeline, but in practice it happens after GBKs; and semantically an SDF need not involve a GBK, so it should be allowed to just not drop anything late, no? - like a regular DoFn would (as long as it doesn't leak state) Seems like we also should file JIRAs for the following: - state leakage - handling processing-time timers in batch properly - holding watermark by processing-time timers - allowing the timer API (internals or the user-facing one) to specifying event-time of processing-time timers - more? > Fix SplittableDoFn ValidatesRunner tests in FlinkRunner > ------------------------------------------------------- > > Key: BEAM-2140 > URL: https://issues.apache.org/jira/browse/BEAM-2140 > Project: Beam > Issue Type: Bug > Components: runner-flink > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > > As discovered as part of BEAM-1763, there is a failing SDF test. We disabled > the tests to unblock the open PR for BEAM-1763. -- This message was sent by Atlassian JIRA (v6.4.14#64029)