[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16002370#comment-16002370
 ] 

Aljoscha Krettek commented on BEAM-2140:
----------------------------------------

I think this is mostly right, yes. 😃

1. Why should it not wrap {{StatefulDoFnRunner}}? I think it's the easiest way 
to get timers and state that expires.
2. Yes, I think this is a problem.
3. Also a problem, so when we shut down we should check whether we have any 
outstanding processing-time timers and process them.

I'm assuming you read the little analysis I did on Aviem's PR, pasting just in 
case:
{quote}
@jkff SplittableDoFnTest.testOutputAfterCheckpoint() fails on the Flink Runner. 
If you instrument the test (with a little sysout printing in a ParDo) you see 
that the SDF only emits elements up to 12344. If, in BoundedSourceWrapper 
(which executes the Create.of("foo") on Flink), you replace the end of the 
run() method by
{code}
    // emit final Long.MAX_VALUE watermark, just to be sure
    // ctx.emitWatermark(new Watermark(Long.MAX_VALUE));

    while (isRunning) {
      Thread.sleep(500);
    }
{code}
you see all values correctly emitted but the test never stops because the 
source is now unbounded. (Interestingly, if you replace that one line by 
ctx.emitWatermark(new 
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); you get values up to 
34567)

The problem seems to be that the code running the SDF sees the high watermarks 
and then doesn't emit stuff anymore. Do you have an idea what could be going on 
there? If not we'll probably have to delve deeper into the code.
{quote}

So fiddling with the watermarks in {{BoundedSourceWrapper}} also seems to 
affect what is getting processed/emitted by the splittable DoFn (SDF).

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
>                 Key: BEAM-2140
>                 URL: https://issues.apache.org/jira/browse/BEAM-2140
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to