[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16065661#comment-16065661
 ] 

Eugene Kirpichov commented on BEAM-2140:
----------------------------------------

Working backwards from that, in the "read Pubsub topic names from Kafka" case: 
let "topicsPC" be the PCollection of topic names, and "recordsPC" be the 
PCollection of records read from these topics.

We want the watermark of "recordsPC" to be a lower bound on timestamps of new 
records that will ever be read from any of the current or future topics.
Currently known elements of topicsPC already provide this bound for their 
records via the watermark hold, but for elements of recordsPC that will be 
produced from future elements of topicsPC, the only information we have is the 
watermark of topicsPC.

So, the ideal observable watermark behavior is as follows:
- elements in topicsPC have timestamps, and timestamp of an element in topicsPC 
is a reasonable starting watermark for elements produced from this topic into 
recordsPC.
- watermark of recordsPC should be min(current watermark holds set by currently 
pending element/restriction pairs from topicsPC, watermark of topicsPC itself) 
- the first term describes what records can arrive from currently read topics, 
the second term, from future topics, due to bullet 1.
- in the special case where topicsPC is bounded (e.g. Create.of()) and its 
watermark has advanced to infinity, this reduces to just the current watermark 
holds, which is correct.

Now, our problem is that if watermark of topicsPC advances to infinity (e.g. 
because it was bounded and we've processed the initial ProcessElement calls for 
its element/restriction pairs), the runner thinks that it's a promise that 
"likely nothing new will appear in this PCollection" which is not true of the 
processing-time timer set by SDF.

On the other hand, if we hold the watermark of topicsPC at the original ancient 
timestamp of the element/restriction pair, the runner will interpret it as "I 
can only promise you that new elements/timers in topicsPC will have a timestamp 
later than this ancient timestamp" which is unnecessarily restrictive - in 
reality, new elements/timers in topicsPC will either come from the transform 
that produces topicsPC, or from new processing-time timers scheduled by the 
currently read topics, and watermark should be min(these).

How do we make a promise about future event-time timestamps of processing-time 
timers? You say "Currently processing time timers are treated as inputs with a 
timestamp equal to the input watermark at the moment of their arrival" - which 
would be the current watermark of "topicsPC" I suppose? I think that would be 
consistent with the desired behavior above.

(clearly more thought is needed, but thought I'd dump this anyway)

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
>                 Key: BEAM-2140
>                 URL: https://issues.apache.org/jira/browse/BEAM-2140
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to