Thanks, Aljoscha! Manually draining processing time timers during operator.close() is my current workaround as well. It's just not efficient for me since I may set the processing time timer for the callback after 5 mins but now I need to fire them immediately.
https://issues.apache.org/jira/browse/FLINK-18647 is really helpful and looking forward to the solution. Thanks for your help! On Wed, Nov 11, 2020 at 8:13 AM Aljoscha Krettek <aljos...@apache.org> wrote: > Hi! > > This is an interesting topic and we recently created a Jira issue about > this: https://issues.apache.org/jira/browse/FLINK-18647. > > In Beam we even have a workaround for this: > > https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L581 > > Maybe it's time to finally address this in Flink as well. > > Best, > Aljoscha > > > On 11.11.20 01:02, Boyuan Zhang wrote: > > Hi team, > > > > I'm writing my custom Operator as a high fan-out operation and I use > > processing time timers to defer processing some inputs When timers are > > firing, the operator will continue to process the deferred elements. One > > typical use case for my Operator is like: > > ImpulseOperator -> my Operator -> downstream where the watermark of > > ImpulseOperator advances to MAX_TIMESTAMP immediately. > > > > One problem I have is that after my operator.close() is called, it's > still > > possible for me to set processing time timers and wait for these timers > to > > be fired. But it seems like Flink pauses invoking processing timers once > > one operator.close() is called in the new version. I'm curious why Flink > > decides to do so and any workaround I can do for my operator? > > > > Thanks for your help! > > > >