[ https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069802#comment-16069802 ]
Aljoscha Krettek commented on BEAM-2140: ---------------------------------------- Yep, in the Flink Runner the processing path for {{ProcessFn}} contains {{StatefulDoFnRunner}}, that's why timers were dropped once the input watermark went to +Inf. I fixed this in the branch I posted earlier by changing {{SplittableDoFnOperator}} to not use that code path anymore but instead use completely custom code for processing a splittable DoFn: https://github.com/apache/beam/blob/10b1b598100541ff37734a04850ada45fc362b99/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java#L73-L73. This fixed the problem of dropped processing-time timers. The other problem (in the Flink Runner) was that processing-time timers are simply dropped if the pipeline is shutting down. I'm getting around this by setting a "last resort" event-time timer that fires when the watermark goes to +Inf. There I'm processing the remaining restrictions until they're exhausted. Splittable DoFn processing in the {{SplittableDoFnOperator}} is now split (hehe) into three methods: * {{processElement()}}: seed state and process restriction once, set next processing-time timer and set last resort event-time timer * {{onProcessingTime()}}: process restriction once and set next processing-time timer, cleanup all state if restriction is exhausted * {{onEventTime()}}: process restriction until exhausted, cleanup all state There is no way of getting around Flink dropping processing-time timers so if we want to get the Flink Runner to directly use {{ProcessFn}} we should add this "last resort" timer there as well. I think it makes sense to have this in general anyways. [~jkff] what do you think about this? Regarding state leakage: AFAIK a splittable DoFn is not allowed to have any custom state or timers, right? And {{ProcessFn}} makes sure to cleanup the element state and restriction state when a restriction is exhausted so there should be no state leakage, right? [~jkff] You mentioned that "the input watermark is held by "pending elements"". Is this true? I thought that only the output watermark is held by pending elements. > Fix SplittableDoFn ValidatesRunner tests in FlinkRunner > ------------------------------------------------------- > > Key: BEAM-2140 > URL: https://issues.apache.org/jira/browse/BEAM-2140 > Project: Beam > Issue Type: Bug > Components: runner-flink > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > > As discovered as part of BEAM-1763, there is a failing SDF test. We disabled > the tests to unblock the open PR for BEAM-1763. -- This message was sent by Atlassian JIRA (v6.4.14#64029)