Yes, restarting the app with a clean state does seem to fix the issue, but
I think I may have found a bug in Flink.
Here's how we can replicate it:
- Create a simple application with KeyedProcessFunction (with onTimer())
- Send a few records with the same key. In processElement(), register a
timer
After fixing your negative timestamp bug, can the timer be triggered?
> On 23 Mar 2022, at 2:39 AM, Binil Benjamin wrote:
>
> Here are some more findings as I was debugging this. I peeked into the
> snapshot to see the current values in "_timer_state/processing_user-timers"
> and here is ho
Here are some more findings as I was debugging this. I peeked into the
snapshot to see the current values in
"_timer_state/processing_user-timers" and here is how they look:
Timer{timestamp=-9223372036854715808, key=(FFX22...),
namespace=VoidNamespace}
Timer{timestamp=-9223372036854715808, key=(FF
Hi,
Parallelism is currently set to 9 and it appears to be occurring for all
subtasks.
We did put logs to see the various timestamps. The following logs are from
the last 5 days.
- logs from processElement() - logged immediately after timer registration:
"message": "FunctionName=WfProcessFun
ect:Re: onTimer() of a KeyedProcessFunction stops getting triggered after a
while
Hi,
Unfortunately, I cannot share the entire code, but the class roughly looks like
this:
public class WfProcessFunction extends KeyedProcessFunction, Map, Map> {
@Override
public void processEleme
Hi,
Unfortunately, I cannot share the entire code, but the class roughly looks
like this:
public class WfProcessFunction extends KeyedProcessFunction, Map, Map> {
@Override
public void processElement(Map inputRecord,
Context context, Collector> collector) throws
Exception {
Hi, can you share your code so we can check whether it is written correctly.
> On 18 Mar 2022, at 7:54 AM, Binil Benjamin wrote:
>
> Hi,
>
> We have a class that extends KeyedProcessFunction and overrides onTimer()
> method. During processElement(), we register a timer callback using
> cont
Hi,
We have a class that extends KeyedProcessFunction and overrides onTimer()
method. During processElement(), we register a timer callback using
context.timerService().registerProcessingTimeTimer(). For
a while, we see that the onTimer() method is getting called back and
everything works as expec