Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-23 Thread Binil Benjamin
Yes, restarting the app with a clean state does seem to fix the issue, but I think I may have found a bug in Flink. Here's how we can replicate it: - Create a simple application with KeyedProcessFunction (with onTimer()) - Send a few records with the same key. In processElement(), register a

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-23 Thread yu'an huang
After fixing your negative timestamp bug, can the timer be triggered? > On 23 Mar 2022, at 2:39 AM, Binil Benjamin wrote: > > Here are some more findings as I was debugging this. I peeked into the > snapshot to see the current values in "_timer_state/processing_user-timers" > and here is

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-22 Thread Binil Benjamin
Here are some more findings as I was debugging this. I peeked into the snapshot to see the current values in "_timer_state/processing_user-timers" and here is how they look: Timer{timestamp=-9223372036854715808, key=(FFX22...), namespace=VoidNamespace} Timer{timestamp=-9223372036854715808,

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-18 Thread Binil Benjamin
Hi, Parallelism is currently set to 9 and it appears to be occurring for all subtasks. We did put logs to see the various timestamps. The following logs are from the last 5 days. - logs from processElement() - logged immediately after timer registration: "message":

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-18 Thread Yun Gao
e: onTimer() of a KeyedProcessFunction stops getting triggered after a while Hi, Unfortunately, I cannot share the entire code, but the class roughly looks like this: public class WfProcessFunction extends KeyedProcessFunction, Map, Map> { @Override public void processElement(Ma

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-18 Thread Binil Benjamin
Hi, Unfortunately, I cannot share the entire code, but the class roughly looks like this: public class WfProcessFunction extends KeyedProcessFunction, Map, Map> { @Override public void processElement(Map inputRecord, Context context, Collector> collector) throws Exception {

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-17 Thread yu'an huang
Hi, can you share your code so we can check whether it is written correctly. > On 18 Mar 2022, at 7:54 AM, Binil Benjamin wrote: > > Hi, > > We have a class that extends KeyedProcessFunction and overrides onTimer() > method. During processElement(), we register a timer callback using >

onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-17 Thread Binil Benjamin
Hi, We have a class that extends KeyedProcessFunction and overrides onTimer() method. During processElement(), we register a timer callback using context.timerService().registerProcessingTimeTimer(). For a while, we see that the onTimer() method is getting called back and everything works as