Thanks, Aljoscha!

Manually draining processing time timers during operator.close() is my
current workaround as well. It's just not efficient for me since I may set
the processing time timer for the callback after 5 mins but now I need to
fire them immediately.

https://issues.apache.org/jira/browse/FLINK-18647 is really helpful and
looking forward to the solution.

Thanks for your help!


On Wed, Nov 11, 2020 at 8:13 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi!
>
> This is an interesting topic and we recently created a Jira issue about
> this: https://issues.apache.org/jira/browse/FLINK-18647.
>
> In Beam we even have a workaround for this:
>
> https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L581
>
> Maybe it's time to finally address this in Flink as well.
>
> Best,
> Aljoscha
>
>
> On 11.11.20 01:02, Boyuan Zhang wrote:
> > Hi team,
> >
> > I'm writing my custom Operator as a high fan-out operation and I use
> > processing time timers to defer processing some inputs When timers are
> > firing, the operator will continue to process the deferred elements. One
> > typical use case for my Operator is like:
> > ImpulseOperator -> my Operator -> downstream where the watermark of
> > ImpulseOperator advances to MAX_TIMESTAMP immediately.
> >
> > One problem I have is that after my operator.close() is called, it's
> still
> > possible for me to set processing time timers and wait for these timers
> to
> > be fired. But it seems like Flink pauses invoking processing timers once
> > one operator.close() is called in the new version. I'm curious why Flink
> > decides to do so and any workaround I can do for my operator?
> >
> > Thanks for your help!
> >
>
>

Reply via email to