Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-23 Thread Binil Benjamin
Yes, restarting the app with a clean state does seem to fix the issue, but
I think I may have found a bug in Flink.

Here's how we can replicate it:
- Create a simple application with KeyedProcessFunction (with onTimer())
- Send a few records with the same key. In processElement(), register a
timer for each of these records:

 
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
+ 1000)
- The onTimer() gets triggered as expected for the above timer registrations
- Now for the same key, register a timer with a negative value (say, when
value = some_special_value)
   ctx.timerService().registerProcessingTimeTimer(Long.MIN_VALUE)
- Now send more records with same key and register regular timers:

 
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
+ 1000)
- As originally reported in this mail, the newly registered timers don't
get triggered
- Now delete the only timer with negative timestamp
   ctx.timerService().deleteProcessingTimeTimer(Long.MIN_VALUE);
- Now send more records with same key and register regular timers:

 
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
+ 1000)
- The new timers still don't get triggered.

I expected the new timers to go off after deleting the bad timer, but it
looks like there is no way to recover once a bad timer is registered. Could
this be a Flink bug?

Thanks!

On Tue, Mar 22, 2022 at 11:38 PM yu'an huang  wrote:

> [ External sender. Exercise caution. ]
>
> After fixing your negative timestamp bug, can the timer be triggered?
>
>
>
>
> On 23 Mar 2022, at 2:39 AM, Binil Benjamin  wrote:
>
> Here are some more findings as I was debugging this. I peeked into the
> snapshot to see the current values in
> "_timer_state/processing_user-timers" and here is how they look:
>
> Timer{timestamp=-9223372036854715808, key=(FFX22...),
> namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22...),
> namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22...),
> namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22), namespace=VoidNamespace}
> Timer{timestamp=1644897305245, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1644998232084, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1645730447266, key=(FFX1...), namespace=VoidNamespace}
> Timer{timestamp=1645742358651, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1645743288774, key=(FFX22...), namespace=VoidNamespace}
> ...
>
> As you can see, the priorityQueue has some negative values (there was a
> bug in our code at some point that added these negative values). Could this
> be the root cause of why the timer is not getting triggered?
>
> Thanks!
>
> On Fri, Mar 18, 2022 at 6:50 PM Binil Benjamin 
> wrote:
>
>> Hi,
>>
>> Parallelism is currently set to 9 and it appears to be occurring for all
>> subtasks.
>>
>> We did put logs to see the various timestamps. The following logs are
>> from the last 5 days.
>>
>> - logs from processElement() - logged immediately after timer
>> registration:
>> "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647229483281, Current
>> step duration=6, Current processing time=1647229483281, Next trigger
>> time=1647229543281,
>> CurrentKey=(FFX22OJAEAA,d7d337b6-9dbc-4898-9001-4d10f2cd9796e1b9ba96-b4b6-3472-94ae-8a1d3e3d9ce1:::d7d337b6-9dbc-4898-9001-4d10f2cd9796:::service:none:)",
>> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>> "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647229512107, Current
>> step duration=6, Current processing time=1647229512107, Next trigger
>> time=1647229572107,
>> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a44bbe719-2ed4-33c9-aeb7-e7cb5f61dd91:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::service:none:)",
>> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>> "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647229543475, Current
>> step duration=6, Current processing time=1647229543475, Next trigger
>> time=1647229603475,
>> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a0bea4f14-9f23-3368-b74f-531a67fbe9f0:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::d11f4623-dab0-3232-92aa-341628e96330)",
>> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>> "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647229633747, Current
>> step duration=6, Current processing time=1647229633747, Next trigger
>> time=1647229693746,
>> 

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-23 Thread yu'an huang
After fixing your negative timestamp bug, can the timer be triggered?




> On 23 Mar 2022, at 2:39 AM, Binil Benjamin  wrote:
> 
> Here are some more findings as I was debugging this. I peeked into the 
> snapshot to see the current values in "_timer_state/processing_user-timers" 
> and here is how they look:
> 
> Timer{timestamp=-9223372036854715808, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22), namespace=VoidNamespace}
> Timer{timestamp=1644897305245, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1644998232084, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1645730447266, key=(FFX1...), namespace=VoidNamespace}
> Timer{timestamp=1645742358651, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1645743288774, key=(FFX22...), namespace=VoidNamespace}
> ...
> 
> As you can see, the priorityQueue has some negative values (there was a bug 
> in our code at some point that added these negative values). Could this be 
> the root cause of why the timer is not getting triggered?
> 
> Thanks!
> 
> On Fri, Mar 18, 2022 at 6:50 PM Binil Benjamin  > wrote:
> Hi, 
> 
> Parallelism is currently set to 9 and it appears to be occurring for all 
> subtasks.
> 
> We did put logs to see the various timestamps. The following logs are from 
> the last 5 days.
> 
> - logs from processElement() - logged immediately after timer registration:
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647229483281, Current 
> step duration=6, Current processing time=1647229483281, Next trigger 
> time=1647229543281, 
> CurrentKey=(FFX22OJAEAA,d7d337b6-9dbc-4898-9001-4d10f2cd9796e1b9ba96-b4b6-3472-94ae-8a1d3e3d9ce1:::d7d337b6-9dbc-4898-9001-4d10f2cd9796:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink) 
> (7/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647229512107, Current 
> step duration=6, Current processing time=1647229512107, Next trigger 
> time=1647229572107, 
> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a44bbe719-2ed4-33c9-aeb7-e7cb5f61dd91:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink) 
> (2/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647229543475, Current 
> step duration=6, Current processing time=1647229543475, Next trigger 
> time=1647229603475, 
> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a0bea4f14-9f23-3368-b74f-531a67fbe9f0:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::d11f4623-dab0-3232-92aa-341628e96330)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink) 
> (8/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647229633747, Current 
> step duration=6, Current processing time=1647229633747, Next trigger 
> time=1647229693746, 
> CurrentKey=(FFX22OJAEAA,0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd50fe795a7-64db-3350-b56e-37400b19ae07:::0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd5:::743f32f2-4a6c-315b-9850-9992b88f2b67)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink) 
> (9/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647236501795, Current 
> step duration=6, Current processing time=1647236501795, Next trigger 
> time=1647236561795, 
> CurrentKey=(FFX22OJAEAA,4b6fbc31-5f41-45c3-aa08-4f865062e2a2dae46709-ff86-35d1-a830-1c01888a4cde:::4b6fbc31-5f41-45c3-aa08-4f865062e2a2:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink) 
> (3/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647236513004, Current 
> step duration=6, Current processing time=1647236513004, Next trigger 
> time=1647236573004, 
> CurrentKey=(FFX22OJAEAA,90ba0c88-0a1e-43b5-8e3a-65613ccd7943e4c9234f-5f83-3ef6-8b22-28224d070404:::90ba0c88-0a1e-43b5-8e3a-65613ccd7943:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink) 
> (2/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement, 
> FunctionMessage=\"Time values\", Current system time=1647236561848, Current 
> step duration=6, Current processing time=1647236561848, Next trigger 
> time=1647236621848, 
> CurrentKey=(FFX22OJAEAA,b3b6bb3f-3c72-4378-bd94-a28aee3da1f9d0a3e195-56b2-3242-9d03-326ccbfbc040:::b3b6bb3f-3c72-4378-bd94-a28aee3da1f9:::service:none:)",
> 

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-22 Thread Binil Benjamin
Here are some more findings as I was debugging this. I peeked into the
snapshot to see the current values in
"_timer_state/processing_user-timers" and here is how they look:

Timer{timestamp=-9223372036854715808, key=(FFX22...),
namespace=VoidNamespace}
Timer{timestamp=-9223372036854715808, key=(FFX22...),
namespace=VoidNamespace}
Timer{timestamp=-9223372036854715808, key=(FFX22...),
namespace=VoidNamespace}
Timer{timestamp=-9223372036854715808, key=(FFX22), namespace=VoidNamespace}
Timer{timestamp=1644897305245, key=(FFX22...), namespace=VoidNamespace}
Timer{timestamp=1644998232084, key=(FFX22...), namespace=VoidNamespace}
Timer{timestamp=1645730447266, key=(FFX1...), namespace=VoidNamespace}
Timer{timestamp=1645742358651, key=(FFX22...), namespace=VoidNamespace}
Timer{timestamp=1645743288774, key=(FFX22...), namespace=VoidNamespace}
...

As you can see, the priorityQueue has some negative values (there was a bug
in our code at some point that added these negative values). Could this be
the root cause of why the timer is not getting triggered?

Thanks!

On Fri, Mar 18, 2022 at 6:50 PM Binil Benjamin  wrote:

> Hi,
>
> Parallelism is currently set to 9 and it appears to be occurring for all
> subtasks.
>
> We did put logs to see the various timestamps. The following logs are from
> the last 5 days.
>
> - logs from processElement() - logged immediately after timer registration:
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647229483281, Current
> step duration=6, Current processing time=1647229483281, Next trigger
> time=1647229543281,
> CurrentKey=(FFX22OJAEAA,d7d337b6-9dbc-4898-9001-4d10f2cd9796e1b9ba96-b4b6-3472-94ae-8a1d3e3d9ce1:::d7d337b6-9dbc-4898-9001-4d10f2cd9796:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (7/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647229512107, Current
> step duration=6, Current processing time=1647229512107, Next trigger
> time=1647229572107,
> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a44bbe719-2ed4-33c9-aeb7-e7cb5f61dd91:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (2/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647229543475, Current
> step duration=6, Current processing time=1647229543475, Next trigger
> time=1647229603475,
> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a0bea4f14-9f23-3368-b74f-531a67fbe9f0:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::d11f4623-dab0-3232-92aa-341628e96330)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (8/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647229633747, Current
> step duration=6, Current processing time=1647229633747, Next trigger
> time=1647229693746,
> CurrentKey=(FFX22OJAEAA,0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd50fe795a7-64db-3350-b56e-37400b19ae07:::0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd5:::743f32f2-4a6c-315b-9850-9992b88f2b67)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (9/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647236501795, Current
> step duration=6, Current processing time=1647236501795, Next trigger
> time=1647236561795,
> CurrentKey=(FFX22OJAEAA,4b6fbc31-5f41-45c3-aa08-4f865062e2a2dae46709-ff86-35d1-a830-1c01888a4cde:::4b6fbc31-5f41-45c3-aa08-4f865062e2a2:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (3/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647236513004, Current
> step duration=6, Current processing time=1647236513004, Next trigger
> time=1647236573004,
> CurrentKey=(FFX22OJAEAA,90ba0c88-0a1e-43b5-8e3a-65613ccd7943e4c9234f-5f83-3ef6-8b22-28224d070404:::90ba0c88-0a1e-43b5-8e3a-65613ccd7943:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (2/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647236561848, Current
> step duration=6, Current processing time=1647236561848, Next trigger
> time=1647236621848,
> CurrentKey=(FFX22OJAEAA,b3b6bb3f-3c72-4378-bd94-a28aee3da1f9d0a3e195-56b2-3242-9d03-326ccbfbc040:::b3b6bb3f-3c72-4378-bd94-a28aee3da1f9:::service:none:)",
> "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
> (4/9)#0",
> "message": "FunctionName=WfProcessFunction::processElement,
> FunctionMessage=\"Time values\", Current system time=1647236591875, Current
> step duration=6, 

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-18 Thread Binil Benjamin
Hi,

Parallelism is currently set to 9 and it appears to be occurring for all
subtasks.

We did put logs to see the various timestamps. The following logs are from
the last 5 days.

- logs from processElement() - logged immediately after timer registration:
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647229483281, Current
step duration=6, Current processing time=1647229483281, Next trigger
time=1647229543281,
CurrentKey=(FFX22OJAEAA,d7d337b6-9dbc-4898-9001-4d10f2cd9796e1b9ba96-b4b6-3472-94ae-8a1d3e3d9ce1:::d7d337b6-9dbc-4898-9001-4d10f2cd9796:::service:none:)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(7/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647229512107, Current
step duration=6, Current processing time=1647229512107, Next trigger
time=1647229572107,
CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a44bbe719-2ed4-33c9-aeb7-e7cb5f61dd91:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::service:none:)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(2/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647229543475, Current
step duration=6, Current processing time=1647229543475, Next trigger
time=1647229603475,
CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a0bea4f14-9f23-3368-b74f-531a67fbe9f0:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::d11f4623-dab0-3232-92aa-341628e96330)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(8/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647229633747, Current
step duration=6, Current processing time=1647229633747, Next trigger
time=1647229693746,
CurrentKey=(FFX22OJAEAA,0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd50fe795a7-64db-3350-b56e-37400b19ae07:::0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd5:::743f32f2-4a6c-315b-9850-9992b88f2b67)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(9/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647236501795, Current
step duration=6, Current processing time=1647236501795, Next trigger
time=1647236561795,
CurrentKey=(FFX22OJAEAA,4b6fbc31-5f41-45c3-aa08-4f865062e2a2dae46709-ff86-35d1-a830-1c01888a4cde:::4b6fbc31-5f41-45c3-aa08-4f865062e2a2:::service:none:)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(3/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647236513004, Current
step duration=6, Current processing time=1647236513004, Next trigger
time=1647236573004,
CurrentKey=(FFX22OJAEAA,90ba0c88-0a1e-43b5-8e3a-65613ccd7943e4c9234f-5f83-3ef6-8b22-28224d070404:::90ba0c88-0a1e-43b5-8e3a-65613ccd7943:::service:none:)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(2/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647236561848, Current
step duration=6, Current processing time=1647236561848, Next trigger
time=1647236621848,
CurrentKey=(FFX22OJAEAA,b3b6bb3f-3c72-4378-bd94-a28aee3da1f9d0a3e195-56b2-3242-9d03-326ccbfbc040:::b3b6bb3f-3c72-4378-bd94-a28aee3da1f9:::service:none:)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(4/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647236591875, Current
step duration=6, Current processing time=1647236591875, Next trigger
time=1647236651875,
CurrentKey=(FFX22OJAEAA,b3b6bb3f-3c72-4378-bd94-a28aee3da1f9eb796957-ef3a-3b67-8e63-8ba136e1b86d:::b3b6bb3f-3c72-4378-bd94-a28aee3da1f9:::45f60cdb-7cc9-3e5a-ace7-b0ca50b6c230)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(4/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647236761584, Current
step duration=6, Current processing time=1647236761584, Next trigger
time=1647236821584,
CurrentKey=(FFX22OJAEAA,585594a0-9421-4719-97bd-34920582cd260fea860d-93a3-3514-b470-4bd49670a298:::585594a0-9421-4719-97bd-34920582cd26:::e9e49df8-30d4-3dc1-93df-64024609acc3)",
"threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
(2/9)#0",
"message": "FunctionName=WfProcessFunction::processElement,
FunctionMessage=\"Time values\", Current system time=1647241184750, Current
step duration=6, Current processing time=1647241184750, Next trigger
time=1647241244750,
CurrentKey=(FFX22OJAEAA,45d4124e-675d-4d5c-a8fe-715038032bd8e920e91b-e4c2-310f-82d5-a64768f32035:::45d4124e-675d-4d5c-a8fe-715038032bd8:::9634ad13-c121-33b8-87b0-b71e8dfe4f77)",

Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-18 Thread Yun Gao
Hi Binil,

I think the code itself also looks good to me. May I have a double confirmation 
on the 
details of the issue:
1. What is the parallelism of this operator, and does the issues occurs for all 
the subtasks?
2. Have we already added some logs in the processElement and onTimer to print 
the time of 
registered processing timer and the time of the callbacks? Could you also share 
this part of result?

Best,
Yun Gao


--
From:Binil Benjamin 
Send Time:2022 Mar. 18 (Fri.) 16:07
To:"yu'an huang" 
Cc:user 
Subject:Re: onTimer() of a KeyedProcessFunction stops getting triggered after a 
while

Hi,

Unfortunately, I cannot share the entire code, but the class roughly looks like 
this:

public class WfProcessFunction extends KeyedProcessFunction, Map, Map> {

@Override
public void processElement(Map inputRecord,
Context context, Collector> collector) throws 
Exception {
...

context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()
 + 5 * TimeUnit.SECONDS.toMillis(1L));
...
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, 
Collector> out) throws Exception {
...
} 
}
Thanks!
On Thu, Mar 17, 2022 at 9:24 PM yu'an huang  wrote:
[ External sender. Exercise caution. ]

 Hi, can you share your code so we can check whether it is written correctly.



 > On 18 Mar 2022, at 7:54 AM, Binil Benjamin  wrote:
 > 
 > Hi,
 > 
 > We have a class that extends KeyedProcessFunction and overrides onTimer() 
 > method. During processElement(), we register a timer callback using 
 > context.timerService().registerProcessingTimeTimer(). For 
 > a while, we see that the onTimer() method is getting called back and 
 > everything works as expected; however, after a while, the onTimer() stops 
 > getting any callbacks from Flink (the registration of the timer via. 
 > registerProcessingTimeTimer() is working just fine). Does anyone know what 
 > could be wrong here and how we can debug this?
 > 
 > Flink version is 1.13.2 (running on AWS KDA)
 > 
 > Thanks!





Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-18 Thread Binil Benjamin
Hi,

Unfortunately, I cannot share the entire code, but the class roughly looks
like this:

 public class WfProcessFunction extends KeyedProcessFunction, Map, Map> {

@Override
public void processElement(Map inputRecord,
Context context, Collector> collector) throws
Exception {
...

context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()
+ 5 * TimeUnit.SECONDS.toMillis(1L));
...
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector> out) throws Exception {
...
}
}

Thanks!

On Thu, Mar 17, 2022 at 9:24 PM yu'an huang  wrote:

> [ External sender. Exercise caution. ]
>
> Hi, can you share your code so we can check whether it is written
> correctly.
>
>
>
> > On 18 Mar 2022, at 7:54 AM, Binil Benjamin  wrote:
> >
> > Hi,
> >
> > We have a class that extends KeyedProcessFunction and overrides
> onTimer() method. During processElement(), we register a timer callback
> using
> context.timerService().registerProcessingTimeTimer(). For
> a while, we see that the onTimer() method is getting called back and
> everything works as expected; however, after a while, the onTimer() stops
> getting any callbacks from Flink (the registration of the timer via.
> registerProcessingTimeTimer() is working just fine). Does anyone know what
> could be wrong here and how we can debug this?
> >
> > Flink version is 1.13.2 (running on AWS KDA)
> >
> > Thanks!
>
>
>


Re: onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-17 Thread yu'an huang
Hi, can you share your code so we can check whether it is written correctly.



> On 18 Mar 2022, at 7:54 AM, Binil Benjamin  wrote:
> 
> Hi,
> 
> We have a class that extends KeyedProcessFunction and overrides onTimer() 
> method. During processElement(), we register a timer callback using 
> context.timerService().registerProcessingTimeTimer(). For a 
> while, we see that the onTimer() method is getting called back and everything 
> works as expected; however, after a while, the onTimer() stops getting any 
> callbacks from Flink (the registration of the timer via. 
> registerProcessingTimeTimer() is working just fine). Does anyone know what 
> could be wrong here and how we can debug this?
> 
> Flink version is 1.13.2 (running on AWS KDA)
> 
> Thanks!



onTimer() of a KeyedProcessFunction stops getting triggered after a while

2022-03-17 Thread Binil Benjamin
Hi,

We have a class that extends KeyedProcessFunction and overrides onTimer()
method. During processElement(), we register a timer callback using
context.timerService().registerProcessingTimeTimer(). For
a while, we see that the onTimer() method is getting called back and
everything works as expected; however, after a while, the onTimer() stops
getting any callbacks from Flink (the registration of the timer via.
registerProcessingTimeTimer() is working just fine). Does anyone know what
could be wrong here and how we can debug this?

Flink version is 1.13.2 (running on AWS KDA)

Thanks!