Yes, restarting the app with a clean state does seem to fix the issue, but
I think I may have found a bug in Flink.

Here's how we can replicate it:
- Create a simple application with KeyedProcessFunction (with onTimer())
- Send a few records with the same key. In processElement(), register a
timer for each of these records:

 
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
+ 1000)
- The onTimer() gets triggered as expected for the above timer registrations
- Now for the same key, register a timer with a negative value (say, when
value = some_special_value)
   ctx.timerService().registerProcessingTimeTimer(Long.MIN_VALUE)
- Now send more records with same key and register regular timers:

 
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
+ 1000)
- As originally reported in this mail, the newly registered timers don't
get triggered
- Now delete the only timer with negative timestamp
   ctx.timerService().deleteProcessingTimeTimer(Long.MIN_VALUE);
- Now send more records with same key and register regular timers:

 
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
+ 1000)
- The new timers still don't get triggered.

I expected the new timers to go off after deleting the bad timer, but it
looks like there is no way to recover once a bad timer is registered. Could
this be a Flink bug?

Thanks!

On Tue, Mar 22, 2022 at 11:38 PM yu'an huang <h.yuan...@gmail.com> wrote:

> [ External sender. Exercise caution. ]
>
> After fixing your negative timestamp bug, can the timer be triggered?
>
>
>
>
> On 23 Mar 2022, at 2:39 AM, Binil Benjamin <bbenja...@splunk.com> wrote:
>
> Here are some more findings as I was debugging this. I peeked into the
> snapshot to see the current values in
> "_timer_state/processing_user-timers" and here is how they look:
>
> Timer{timestamp=-9223372036854715808, key=(FFX22...),
> namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22...),
> namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22...),
> namespace=VoidNamespace}
> Timer{timestamp=-9223372036854715808, key=(FFX22), namespace=VoidNamespace}
> Timer{timestamp=1644897305245, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1644998232084, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1645730447266, key=(FFX1...), namespace=VoidNamespace}
> Timer{timestamp=1645742358651, key=(FFX22...), namespace=VoidNamespace}
> Timer{timestamp=1645743288774, key=(FFX22...), namespace=VoidNamespace}
> ...
>
> As you can see, the priorityQueue has some negative values (there was a
> bug in our code at some point that added these negative values). Could this
> be the root cause of why the timer is not getting triggered?
>
> Thanks!
>
> On Fri, Mar 18, 2022 at 6:50 PM Binil Benjamin <bbenja...@splunk.com>
> wrote:
>
>> Hi,
>>
>> Parallelism is currently set to 9 and it appears to be occurring for all
>> subtasks.
>>
>> We did put logs to see the various timestamps. The following logs are
>> from the last 5 days.
>>
>> - logs from processElement() - logged immediately after timer
>> registration:
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647229483281, Current
>> step duration=60000, Current processing time=1647229483281, Next trigger
>> time=1647229543281,
>> CurrentKey=(FFX22OJAEAA,d7d337b6-9dbc-4898-9001-4d10f2cd9796e1b9ba96-b4b6-3472-94ae-8a1d3e3d9ce1:::d7d337b6-9dbc-4898-9001-4d10f2cd9796:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647229512107, Current
>> step duration=60000, Current processing time=1647229512107, Next trigger
>> time=1647229572107,
>> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a44bbe719-2ed4-33c9-aeb7-e7cb5f61dd91:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647229543475, Current
>> step duration=60000, Current processing time=1647229543475, Next trigger
>> time=1647229603475,
>> CurrentKey=(FFX22OJAEAA,40fe87cb-23ec-4d52-b90e-c0d27c84e47a0bea4f14-9f23-3368-b74f-531a67fbe9f0:::40fe87cb-23ec-4d52-b90e-c0d27c84e47a:::d11f4623-dab0-3232-92aa-341628e96330)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647229633747, Current
>> step duration=60000, Current processing time=1647229633747, Next trigger
>> time=1647229693746,
>> CurrentKey=(FFX22OJAEAA,0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd50fe795a7-64db-3350-b56e-37400b19ae07:::0cb3dbe5-6d1a-4fc5-874a-972c4bac1bd5:::743f32f2-4a6c-315b-9850-9992b88f2b67)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (9/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647236501795, Current
>> step duration=60000, Current processing time=1647236501795, Next trigger
>> time=1647236561795,
>> CurrentKey=(FFX22OJAEAA,4b6fbc31-5f41-45c3-aa08-4f865062e2a2dae46709-ff86-35d1-a830-1c01888a4cde:::4b6fbc31-5f41-45c3-aa08-4f865062e2a2:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (3/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647236513004, Current
>> step duration=60000, Current processing time=1647236513004, Next trigger
>> time=1647236573004,
>> CurrentKey=(FFX22OJAEAA,90ba0c88-0a1e-43b5-8e3a-65613ccd7943e4c9234f-5f83-3ef6-8b22-28224d070404:::90ba0c88-0a1e-43b5-8e3a-65613ccd7943:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647236561848, Current
>> step duration=60000, Current processing time=1647236561848, Next trigger
>> time=1647236621848,
>> CurrentKey=(FFX22OJAEAA,b3b6bb3f-3c72-4378-bd94-a28aee3da1f9d0a3e195-56b2-3242-9d03-326ccbfbc040:::b3b6bb3f-3c72-4378-bd94-a28aee3da1f9:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647236591875, Current
>> step duration=60000, Current processing time=1647236591875, Next trigger
>> time=1647236651875,
>> CurrentKey=(FFX22OJAEAA,b3b6bb3f-3c72-4378-bd94-a28aee3da1f9eb796957-ef3a-3b67-8e63-8ba136e1b86d:::b3b6bb3f-3c72-4378-bd94-a28aee3da1f9:::45f60cdb-7cc9-3e5a-ace7-b0ca50b6c230)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647236761584, Current
>> step duration=60000, Current processing time=1647236761584, Next trigger
>> time=1647236821584,
>> CurrentKey=(FFX22OJAEAA,585594a0-9421-4719-97bd-34920582cd260fea860d-93a3-3514-b470-4bd49670a298:::585594a0-9421-4719-97bd-34920582cd26:::e9e49df8-30d4-3dc1-93df-64024609acc3)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647241184750, Current
>> step duration=60000, Current processing time=1647241184750, Next trigger
>> time=1647241244750,
>> CurrentKey=(FFX22OJAEAA,45d4124e-675d-4d5c-a8fe-715038032bd8e920e91b-e4c2-310f-82d5-a64768f32035:::45d4124e-675d-4d5c-a8fe-715038032bd8:::9634ad13-c121-33b8-87b0-b71e8dfe4f77)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647241246553, Current
>> step duration=60000, Current processing time=1647241246553, Next trigger
>> time=1647241306553,
>> CurrentKey=(FFX22OJAEAA,d0488660-ad34-461a-9604-264d01feab31715e4fae-6311-3600-8076-43572ce41806:::d0488660-ad34-461a-9604-264d01feab31:::536866f8-8301-3e70-ae15-11ca1437c1ee)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647241269983, Current
>> step duration=60000, Current processing time=1647241269983, Next trigger
>> time=1647241329983,
>> CurrentKey=(FFX22OJAEAA,02393974-b0a4-4007-aadc-caeb7630cfcc0dc72753-e143-3930-91ce-735cc1d0cba4:::02393974-b0a4-4007-aadc-caeb7630cfcc:::f92d1828-7728-30db-8e14-b930eb3304e6)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (1/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647248706456, Current
>> step duration=60000, Current processing time=1647248706456, Next trigger
>> time=1647248766456,
>> CurrentKey=(FFX22OJAEAA,22926aad-76a7-49eb-9849-000a54c854a077746760-0831-362d-aac2-30360889abde:::22926aad-76a7-49eb-9849-000a54c854a0:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647248707045, Current
>> step duration=60000, Current processing time=1647248707045, Next trigger
>> time=1647248767045,
>> CurrentKey=(FFX22OJAEAA,447d704b-08fe-4f15-ba51-74215a182f77acdf7170-a063-3230-92ac-4a196bfc6916:::447d704b-08fe-4f15-ba51-74215a182f77:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (3/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647248771741, Current
>> step duration=60000, Current processing time=1647248771741, Next trigger
>> time=1647248831741,
>> CurrentKey=(FFX22OJAEAA,aba5ed2a-66a2-499e-aefd-f064af2ce641100bb46e-7545-3e3a-aae5-c0ad52f0ab94:::aba5ed2a-66a2-499e-aefd-f064af2ce641:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647248802844, Current
>> step duration=60000, Current processing time=1647248802844, Next trigger
>> time=1647248862844,
>> CurrentKey=(FFX22OJAEAA,aba5ed2a-66a2-499e-aefd-f064af2ce641e550d004-7fa4-3fe0-b30e-002957f32463:::aba5ed2a-66a2-499e-aefd-f064af2ce641:::d41fe893-56db-3a0b-9f2b-8a66f8ee1ad9)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (1/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647248954463, Current
>> step duration=60000, Current processing time=1647248954463, Next trigger
>> time=1647249014463,
>> CurrentKey=(FFX22OJAEAA,13dcb224-2b43-4e97-bc19-ba0293d8db4c7541dbe4-e73e-3987-aa49-41c36bde9939:::13dcb224-2b43-4e97-bc19-ba0293d8db4c:::2fbcd22a-18c7-33b1-a151-f3f2de89f463)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (5/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647250145814, Current
>> step duration=60000, Current processing time=1647250145814, Next trigger
>> time=1647250205814,
>> CurrentKey=(FFX22OJAEAA,73c1be15-8b13-492b-ab08-bae2dc667237144cef1d-7782-300a-bc59-cb58f28a51cf:::73c1be15-8b13-492b-ab08-bae2dc667237:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (9/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647250181941, Current
>> step duration=60000, Current processing time=1647250181941, Next trigger
>> time=1647250241941,
>> CurrentKey=(FFX22OJAEAA,feea7dcb-f689-4c2a-b88e-ef805ea959f5c295316d-56ca-3efc-9799-d23e145401af:::feea7dcb-f689-4c2a-b88e-ef805ea959f5:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647250236425, Current
>> step duration=60000, Current processing time=1647250236425, Next trigger
>> time=1647250296425,
>> CurrentKey=(FFX22OJAEAA,feea7dcb-f689-4c2a-b88e-ef805ea959f5101be295-b198-3324-8b1e-aaedb0f980e8:::feea7dcb-f689-4c2a-b88e-ef805ea959f5:::f350b0f2-289d-3c9a-ae5e-dd69cc461643)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647250367093, Current
>> step duration=60000, Current processing time=1647250367093, Next trigger
>> time=1647250427093,
>> CurrentKey=(FFX22OJAEAA,27c351d2-6c7a-4edf-8a6f-514b611f3562311fde41-4928-3fb2-a691-8cda5d44c67f:::27c351d2-6c7a-4edf-8a6f-514b611f3562:::543f4fc6-0360-3578-b261-43604c759b8d)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (9/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647250368844, Current
>> step duration=60000, Current processing time=1647250368844, Next trigger
>> time=1647250428844,
>> CurrentKey=(FFX22OJAEAA,68650ef4-e874-4f86-bd3e-9c71d130c46c40a3c91d-2090-3608-9935-cae9dba9afd3:::68650ef4-e874-4f86-bd3e-9c71d130c46c:::12f52ca9-f347-3fbd-a8ab-cda4c494716a)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647250408776, Current
>> step duration=60000, Current processing time=1647250408776, Next trigger
>> time=1647250468776,
>> CurrentKey=(FFX22OJAEAA,4d023027-fdac-4500-be13-2e040b71dc5f053151f1-dfb6-39db-8739-04a142ca0808:::4d023027-fdac-4500-be13-2e040b71dc5f:::251cacdc-0a87-3794-a666-908f298a4830)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647252600233, Current
>> step duration=60000, Current processing time=1647252600233, Next trigger
>> time=1647252660233,
>> CurrentKey=(FFX22OJAEAA,b4c34e13-6207-4398-aa49-da091f9c523e7d606256-e3e8-3825-a283-c683e3185377:::b4c34e13-6207-4398-aa49-da091f9c523e:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647252622501, Current
>> step duration=60000, Current processing time=1647252622501, Next trigger
>> time=1647252682500,
>> CurrentKey=(FFX22OJAEAA,b4c34e13-6207-4398-aa49-da091f9c523e2ade545e-0e7b-3c66-87bf-d292f6fbacc6:::b4c34e13-6207-4398-aa49-da091f9c523e:::5cb391e8-890c-3e2e-8b9b-6c562d366673)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647252688059, Current
>> step duration=60000, Current processing time=1647252688059, Next trigger
>> time=1647252748059,
>> CurrentKey=(FFX22OJAEAA,40cd467e-644b-4bde-abc4-6c4feed75d6b792833e6-f982-36da-8cea-a21bb35b1773:::40cd467e-644b-4bde-abc4-6c4feed75d6b:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (5/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647252771454, Current
>> step duration=60000, Current processing time=1647252771454, Next trigger
>> time=1647252831454,
>> CurrentKey=(FFX22OJAEAA,acbfb6f9-3471-4035-b010-ed6122ecc5fe36d3406a-f303-38b8-a690-8d3d8255f992:::acbfb6f9-3471-4035-b010-ed6122ecc5fe:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647252787446, Current
>> step duration=60000, Current processing time=1647252787446, Next trigger
>> time=1647252847446,
>> CurrentKey=(FFX22OJAEAA,54722594-ecab-45f9-9c4d-1a1130473c5641e6b495-105e-308f-8959-0cee330c8cd5:::54722594-ecab-45f9-9c4d-1a1130473c56:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (1/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647252830542, Current
>> step duration=60000, Current processing time=1647252830542, Next trigger
>> time=1647252890542,
>> CurrentKey=(FFX22OJAEAA,534f27a5-f34f-4a6b-bf6d-23c204c09b4e15562e6d-439d-3f0c-8c75-483b6a9d7471:::534f27a5-f34f-4a6b-bf6d-23c204c09b4e:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647252836311, Current
>> step duration=60000, Current processing time=1647252836311, Next trigger
>> time=1647252896311,
>> CurrentKey=(FFX22OJAEAA,534f27a5-f34f-4a6b-bf6d-23c204c09b4e7d767391-9776-3276-b90b-8fd7752b7c94:::534f27a5-f34f-4a6b-bf6d-23c204c09b4e:::c10633f6-ae0e-392e-a689-e23b30cd1b6e)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647252927982, Current
>> step duration=60000, Current processing time=1647252927982, Next trigger
>> time=1647252987982,
>> CurrentKey=(FFX22OJAEAA,b7887bff-8d13-4970-b85a-46c52802c68af167e117-009c-3573-b32e-4b32010d6353:::b7887bff-8d13-4970-b85a-46c52802c68a:::a2582905-860a-3b98-bf7a-cf126c3423bd)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (1/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647254250544, Current
>> step duration=60000, Current processing time=1647254250544, Next trigger
>> time=1647254310544,
>> CurrentKey=(FFX22OJAEAA,1e6069de-ee4c-40cd-92c4-0d99b1a73de0aa4f4e4d-d42f-3027-9ad5-97650e63eeed:::1e6069de-ee4c-40cd-92c4-0d99b1a73de0:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647254330591, Current
>> step duration=60000, Current processing time=1647254330591, Next trigger
>> time=1647254390591,
>> CurrentKey=(FFX22OJAEAA,11b1111b-9da4-49f4-9c82-a8bee7940ca13e11c712-3308-3abc-9514-98ef5fa7b66d:::11b1111b-9da4-49f4-9c82-a8bee7940ca1:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647254344103, Current
>> step duration=60000, Current processing time=1647254344103, Next trigger
>> time=1647254404103,
>> CurrentKey=(FFX22OJAEAA,1d5584da-350e-4e5b-b70d-459aa60a479664c4b4f2-98f9-3487-9746-5033aff36cc2:::1d5584da-350e-4e5b-b70d-459aa60a4796:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (1/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647254394638, Current
>> step duration=60000, Current processing time=1647254394638, Next trigger
>> time=1647254454638,
>> CurrentKey=(FFX22OJAEAA,884600de-08fe-4ce2-bcfc-df25f8d629845b6b6611-f163-3689-9543-a322a512089f:::884600de-08fe-4ce2-bcfc-df25f8d62984:::d5729e52-1fd5-3229-a9b1-07f33c8257e5)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647254400155, Current
>> step duration=60000, Current processing time=1647254400155, Next trigger
>> time=1647254460155,
>> CurrentKey=(FFX22OJAEAA,884600de-08fe-4ce2-bcfc-df25f8d62984cba8ce4f-3254-3989-a616-6f5b69e148fb:::884600de-08fe-4ce2-bcfc-df25f8d62984:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (1/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647254480310, Current
>> step duration=60000, Current processing time=1647254480310, Next trigger
>> time=1647254540310,
>> CurrentKey=(FFX22OJAEAA,95300a6c-35da-4d9f-b1bb-28ae81e32be70dc10b8b-3b5b-3455-9b0c-2ccfd60424f2:::95300a6c-35da-4d9f-b1bb-28ae81e32be7:::ae1983c6-b2aa-37a9-86db-7e46d460ff27)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (1/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647255409300, Current
>> step duration=60000, Current processing time=1647255409300, Next trigger
>> time=1647255469300,
>> CurrentKey=(FFX22OJAEAA,44100466-ed87-4562-a007-2f6ff47a242f8d2b6b7f-2898-3708-b760-e2ecbdd591b0:::44100466-ed87-4562-a007-2f6ff47a242f:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (1/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647255417581, Current
>> step duration=60000, Current processing time=1647255417581, Next trigger
>> time=1647255477581,
>> CurrentKey=(FFX22OJAEAA,44100466-ed87-4562-a007-2f6ff47a242f3e1b26ca-67ce-304e-9bd2-2c2abcfa256a:::44100466-ed87-4562-a007-2f6ff47a242f:::b4b4b34b-9f3d-3e1d-83cf-b4014a3f7324)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647255481602, Current
>> step duration=60000, Current processing time=1647255481602, Next trigger
>> time=1647255541602,
>> CurrentKey=(FFX22OJAEAA,52ebbacb-2a90-4f5e-8147-af4327d19ddb4118bcf6-0e91-3efd-9c31-d0569e5b1809:::52ebbacb-2a90-4f5e-8147-af4327d19ddb:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (9/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647255550611, Current
>> step duration=60000, Current processing time=1647255550611, Next trigger
>> time=1647255610611,
>> CurrentKey=(FFX22OJAEAA,de881d7e-e303-4975-b922-6655654bab9b7a07abff-2ffb-3fd1-aa46-62bd7e117c80:::de881d7e-e303-4975-b922-6655654bab9b:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647255567769, Current
>> step duration=60000, Current processing time=1647255567769, Next trigger
>> time=1647255627769,
>> CurrentKey=(FFX22OJAEAA,2b0775ea-92af-4646-a2b8-c313794e5dcd4bd705b3-2161-309c-b27f-d4f5835a6820:::2b0775ea-92af-4646-a2b8-c313794e5dcd:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647255599870, Current
>> step duration=60000, Current processing time=1647255599870, Next trigger
>> time=1647255659870,
>> CurrentKey=(FFX22OJAEAA,38afb166-f1d6-429b-afdb-4a2d28b96b28bc030018-0f3b-3f1e-b95d-7acd263ee074:::38afb166-f1d6-429b-afdb-4a2d28b96b28:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647255607673, Current
>> step duration=60000, Current processing time=1647255607673, Next trigger
>> time=1647255667673,
>> CurrentKey=(FFX22OJAEAA,38afb166-f1d6-429b-afdb-4a2d28b96b282ebdab9d-d05c-3527-b092-6d87b4a87b24:::38afb166-f1d6-429b-afdb-4a2d28b96b28:::594e1118-3b6a-37f9-9cb6-326007a447fc)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (5/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647255694416, Current
>> step duration=60000, Current processing time=1647255694416, Next trigger
>> time=1647255754416,
>> CurrentKey=(FFX22OJAEAA,f20879ca-2215-4a89-a0cc-0881878da9bdc7ec8795-7d4a-36e1-a8de-87eb74276107:::f20879ca-2215-4a89-a0cc-0881878da9bd:::f7d6e53e-0823-3766-8aa7-8a88da792a19)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647255994707, Current
>> step duration=60000, Current processing time=1647255994707, Next trigger
>> time=1647256054707,
>> CurrentKey=(FFX22OJAEAA,a2111695-fb2c-4da0-87c9-5cdb7f8f1506438e5a29-8653-301e-bd31-1a3e9ecc05aa:::a2111695-fb2c-4da0-87c9-5cdb7f8f1506:::31534c4f-83a2-3a93-a252-3ca50ad83969)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (5/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647315926250, Current
>> step duration=60000, Current processing time=1647315926250, Next trigger
>> time=1647315986250,
>> CurrentKey=(FFX22OJAEAA,09b90fad-554e-49fd-bc45-17562f6925d781588d82-5869-35eb-bce3-5e123f280763:::09b90fad-554e-49fd-bc45-17562f6925d7:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (1/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647316003432, Current
>> step duration=60000, Current processing time=1647316003432, Next trigger
>> time=1647316063432,
>> CurrentKey=(FFX22OJAEAA,9dbf1a42-4bbc-4890-a654-dbbb5b2fd9cb5b5e45bb-0842-3c63-b9ef-7f7840d15345:::9dbf1a42-4bbc-4890-a654-dbbb5b2fd9cb:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (1/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647316018509, Current
>> step duration=60000, Current processing time=1647316018509, Next trigger
>> time=1647316078509,
>> CurrentKey=(FFX22OJAEAA,0cfad1b7-9358-4d29-9624-6edd876265bc1cc86b93-a473-3cf3-818e-a6b7683e7e14:::0cfad1b7-9358-4d29-9624-6edd876265bc:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647316053839, Current
>> step duration=60000, Current processing time=1647316053839, Next trigger
>> time=1647316113839,
>> CurrentKey=(FFX22OJAEAA,29eb80e6-9a09-4794-8a46-4332f98f42ef9d3d591d-bc6b-308a-b95e-c2e8d3bb3dc2:::29eb80e6-9a09-4794-8a46-4332f98f42ef:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647316062159, Current
>> step duration=60000, Current processing time=1647316062159, Next trigger
>> time=1647316122159,
>> CurrentKey=(FFX22OJAEAA,29eb80e6-9a09-4794-8a46-4332f98f42ef5152c8b4-7bdf-384b-9d0f-bd0ddeeaeeea:::29eb80e6-9a09-4794-8a46-4332f98f42ef:::0e8f37dd-8682-3b3c-ae49-f842ee473d97)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647316113119, Current
>> step duration=60000, Current processing time=1647316113119, Next trigger
>> time=1647316173119,
>> CurrentKey=(FFX22OJAEAA,126d246c-9a02-421a-8e83-6a2b5de0dbf92bd1bd52-433a-3c6e-80dd-14a98b4f7d92:::126d246c-9a02-421a-8e83-6a2b5de0dbf9:::c7dd5036-6ac5-3c7d-b98b-9c6b9fb2e95e)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647320295002, Current
>> step duration=60000, Current processing time=1647320295002, Next trigger
>> time=1647320355002,
>> CurrentKey=(FFX22OJAEAA,93a19921-fd7d-4a0f-b121-2c078429a4d515e6a951-3487-37e5-a3df-3805a1a6778a:::93a19921-fd7d-4a0f-b121-2c078429a4d5:::887a0095-e765-3d00-8970-584b1d542695)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647339426984, Current
>> step duration=60000, Current processing time=1647339426984, Next trigger
>> time=1647339486984,
>> CurrentKey=(FFX22OJAEAA,611b4f94-7430-44ee-ad71-f5eaf818532d29fb069a-a162-3de1-9784-7bdc5b0cb712:::611b4f94-7430-44ee-ad71-f5eaf818532d:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647339507811, Current
>> step duration=60000, Current processing time=1647339507811, Next trigger
>> time=1647339567811,
>> CurrentKey=(FFX22OJAEAA,d3272dc4-c5c8-446a-b867-ac252c1ac138b45f422f-a91f-3e0e-8f29-f83b2e845996:::d3272dc4-c5c8-446a-b867-ac252c1ac138:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (3/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647339538912, Current
>> step duration=60000, Current processing time=1647339538912, Next trigger
>> time=1647339598912,
>> CurrentKey=(FFX22OJAEAA,a5272dd4-8776-4f78-ab7a-72adb5a0d2ee29181513-3ce0-3ef9-bf61-df2cefbdbfdd:::a5272dd4-8776-4f78-ab7a-72adb5a0d2ee:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647339546909, Current
>> step duration=60000, Current processing time=1647339546909, Next trigger
>> time=1647339606909,
>> CurrentKey=(FFX22OJAEAA,a5272dd4-8776-4f78-ab7a-72adb5a0d2eeefdd8c79-5f1a-3942-9ab4-c1fa2c016607:::a5272dd4-8776-4f78-ab7a-72adb5a0d2ee:::79e97f88-a1cb-30a2-a8d1-0e04f0dd7556)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (3/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647339627124, Current
>> step duration=60000, Current processing time=1647339627124, Next trigger
>> time=1647339687124,
>> CurrentKey=(FFX22OJAEAA,f2573a4b-3462-46f9-a48f-bb3bd3e96f18e65e9316-27c9-3584-9aaf-928663a1cca1:::f2573a4b-3462-46f9-a48f-bb3bd3e96f18:::f3ac69dd-6044-3876-a006-aa098c701421)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (5/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647365489254, Current
>> step duration=60000, Current processing time=1647365489254, Next trigger
>> time=1647365549254,
>> CurrentKey=(FN5Zm2aAEAA,22454d31-fa8d-40f2-a6e5-2d3b8615a94568b5bbd6-c0ce-3f86-9f05-8fa841cc45a1:::22454d31-fa8d-40f2-a6e5-2d3b8615a945:::9d003a17-390f-3583-a6c6-18c11fbf21ca)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (5/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647365751778, Current
>> step duration=60000, Current processing time=1647365751778, Next trigger
>> time=1647365811778,
>> CurrentKey=(FN5Zm2aAEAA,22454d31-fa8d-40f2-a6e5-2d3b8615a9457c15a19d-ffc1-357b-bd04-9068d11f270f:::22454d31-fa8d-40f2-a6e5-2d3b8615a945:::5a69ddf2-78e9-37d2-94ea-0f8039064faf)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (5/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647366305380, Current
>> step duration=60000, Current processing time=1647366305380, Next trigger
>> time=1647366365380,
>> CurrentKey=(FN5j5drAIAA,fcc2ee76-6274-4b72-977c-85f8243b443e81f9f3f0-4c95-3d2d-b470-58c9e84bac3f:::fcc2ee76-6274-4b72-977c-85f8243b443e:::896f5b37-5479-3cf5-848c-da433ab5f611)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647374712568, Current
>> step duration=60000, Current processing time=1647374712568, Next trigger
>> time=1647374772568,
>> CurrentKey=(FFX1q19AAAA,57e28ef6-1bf9-4876-9709-d0f58922ee6bd4320009-85cb-3017-baa2-945089d5ecb3:::57e28ef6-1bf9-4876-9709-d0f58922ee6b:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (1/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647374901073, Current
>> step duration=60000, Current processing time=1647374901073, Next trigger
>> time=1647374961073,
>> CurrentKey=(FFX1q19AAAA,57e28ef6-1bf9-4876-9709-d0f58922ee6b0a305d5f-0e0d-377d-9049-12c870127bfc:::57e28ef6-1bf9-4876-9709-d0f58922ee6b:::d8fff1a0-9708-39c2-aee3-0d9ceed6e10a)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647488849471, Current
>> step duration=60000, Current processing time=1647488849471, Next trigger
>> time=1647488909471,
>> CurrentKey=(FN5j5drAIAA,4c706b32-2276-4e6b-9496-084fb3f522f93fbbc6a4-6e24-35e4-b92c-1875073d0ec9:::4c706b32-2276-4e6b-9496-084fb3f522f9:::e847dad4-b1ba-3ba1-b2a5-86771137122f)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (3/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647488900860, Current
>> step duration=60000, Current processing time=1647488900860, Next trigger
>> time=1647488960860,
>> CurrentKey=(FN5j5drAIAA,583bb5a9-5aab-4737-b5b5-40f1ee7af5a5d963e038-53fa-3899-9b27-da39f16ba9ce:::583bb5a9-5aab-4737-b5b5-40f1ee7af5a5:::051ed497-3431-3ce2-9f3a-bef5637eb550)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (1/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647495362444, Current
>> step duration=60000, Current processing time=1647495362444, Next trigger
>> time=1647495422444,
>> CurrentKey=(FFX22OJAEAA,2800d89f-6a6b-4e96-9c01-6ccf23387ea7522fd9b6-7bd7-34af-896a-02a446781190:::2800d89f-6a6b-4e96-9c01-6ccf23387ea7:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647495374051, Current
>> step duration=60000, Current processing time=1647495374051, Next trigger
>> time=1647495434051,
>> CurrentKey=(FFX22OJAEAA,2800d89f-6a6b-4e96-9c01-6ccf23387ea7d6dbecb9-a572-3528-a14e-767f011cf3f9:::2800d89f-6a6b-4e96-9c01-6ccf23387ea7:::57ce813b-7b5b-357c-973f-a3d4c7b160ac)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647495440461, Current
>> step duration=60000, Current processing time=1647495440461, Next trigger
>> time=1647495500461,
>> CurrentKey=(FFX22OJAEAA,80f4ed3a-8577-4a3b-8d93-e6f4314f8a1704d32965-49cc-3865-ae68-aa80eec8e4bb:::80f4ed3a-8577-4a3b-8d93-e6f4314f8a17:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (5/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647495527584, Current
>> step duration=60000, Current processing time=1647495527584, Next trigger
>> time=1647495587584,
>> CurrentKey=(FFX22OJAEAA,96f421b3-2e1b-4124-b41d-48f6fbaf64f62008c84e-7651-3a64-985d-87eea60fe735:::96f421b3-2e1b-4124-b41d-48f6fbaf64f6:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647495573023, Current
>> step duration=60000, Current processing time=1647495573023, Next trigger
>> time=1647495633023,
>> CurrentKey=(FFX22OJAEAA,96f421b3-2e1b-4124-b41d-48f6fbaf64f6e6be0ba0-ed31-302d-911c-5c43abbd7256:::96f421b3-2e1b-4124-b41d-48f6fbaf64f6:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647495646747, Current
>> step duration=60000, Current processing time=1647495646747, Next trigger
>> time=1647495706746,
>> CurrentKey=(FFX22OJAEAA,899d7c5e-220a-4a2b-a145-b504b1fa7ecf76476c08-a271-3288-b118-451927ea494a:::899d7c5e-220a-4a2b-a145-b504b1fa7ecf:::28ea92a9-e9c2-3b7d-82a2-afa2de4e1d9e)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647495648631, Current
>> step duration=60000, Current processing time=1647495648631, Next trigger
>> time=1647495708631,
>> CurrentKey=(FFX22OJAEAA,2a46b874-a474-4f1c-b9ce-97ce1b930a21494c715b-ef7e-3ce5-bf9c-70ab5456005f:::2a46b874-a474-4f1c-b9ce-97ce1b930a21:::497691a6-4b8c-3a68-b4d8-c76958b709dc)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647499801058, Current
>> step duration=60000, Current processing time=1647499801058, Next trigger
>> time=1647499861058,
>> CurrentKey=(FFX22OJAEAA,92affaa1-52ef-4176-a7e8-905eec667d6281eff92c-0431-33d1-945b-0bbfedff940e:::92affaa1-52ef-4176-a7e8-905eec667d62:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647499805657, Current
>> step duration=60000, Current processing time=1647499805657, Next trigger
>> time=1647499865657,
>> CurrentKey=(FFX22OJAEAA,9a671b9c-1cda-4312-8bdd-84af28906ba15a098660-bbf3-3690-98e1-73a2cfa38bb8:::9a671b9c-1cda-4312-8bdd-84af28906ba1:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647499813004, Current
>> step duration=60000, Current processing time=1647499813004, Next trigger
>> time=1647499873004,
>> CurrentKey=(FFX22OJAEAA,92affaa1-52ef-4176-a7e8-905eec667d62645d195d-1b6c-3617-b78f-47a899b1088a:::92affaa1-52ef-4176-a7e8-905eec667d62:::c2791ca4-e3f6-3d37-b07d-b1552bf8f2a5)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647499875055, Current
>> step duration=60000, Current processing time=1647499875055, Next trigger
>> time=1647499935055,
>> CurrentKey=(FFX22OJAEAA,ae413c9a-ca64-4e4c-b85e-c1d711bbf052377f3f7e-f061-35c1-ad73-d16e102e4ca4:::ae413c9a-ca64-4e4c-b85e-c1d711bbf052:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647499929037, Current
>> step duration=60000, Current processing time=1647499929037, Next trigger
>> time=1647499989037,
>> CurrentKey=(FFX22OJAEAA,e58e7ca9-375d-4955-a40c-924988795152729e94cb-ee89-3e25-8871-413ad7a0c14b:::e58e7ca9-375d-4955-a40c-924988795152:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647499970500, Current
>> step duration=60000, Current processing time=1647499970500, Next trigger
>> time=1647500030500,
>> CurrentKey=(FFX22OJAEAA,e5edd1fa-51ed-4f5a-a617-2e32cd526f3d06de5f18-12b5-38d6-9a9f-1424bed15209:::e5edd1fa-51ed-4f5a-a617-2e32cd526f3d:::17c1f52d-acef-35b5-adf6-b0e491d7ce45)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647500042496, Current
>> step duration=60000, Current processing time=1647500042496, Next trigger
>> time=1647500102496,
>> CurrentKey=(FFX22OJAEAA,99fefc82-3e34-4550-a410-d93e498ea69994ecf8f2-d05f-37ac-bc2d-3977c278e2d8:::99fefc82-3e34-4550-a410-d93e498ea699:::885130bf-0247-39d9-a536-00e687683f64)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647502061573, Current
>> step duration=60000, Current processing time=1647502061573, Next trigger
>> time=1647502121573,
>> CurrentKey=(FFX22OJAEAA,f88e2327-01a7-49f3-9dfe-32b25b7f42a7c7c6bc04-0aae-3888-a358-bcedf8fbe5b1:::f88e2327-01a7-49f3-9dfe-32b25b7f42a7:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647502072791, Current
>> step duration=60000, Current processing time=1647502072791, Next trigger
>> time=1647502132791,
>> CurrentKey=(FFX22OJAEAA,f88e2327-01a7-49f3-9dfe-32b25b7f42a799967dc4-c117-36a6-b617-b9c3d66df6df:::f88e2327-01a7-49f3-9dfe-32b25b7f42a7:::c335cb4b-30c3-3361-a3a5-e112295aa0e6)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (3/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647502139029, Current
>> step duration=60000, Current processing time=1647502139029, Next trigger
>> time=1647502199029,
>> CurrentKey=(FFX22OJAEAA,f78f6997-51b1-4764-a9c1-12299b7cfe017c76fc2a-eccc-3bd5-91ea-9439b70b6d0b:::f78f6997-51b1-4764-a9c1-12299b7cfe01:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (3/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647502229437, Current
>> step duration=60000, Current processing time=1647502229437, Next trigger
>> time=1647502289437,
>> CurrentKey=(FFX22OJAEAA,473bd192-a782-4929-9d3a-0986b000cd916c115aed-1e2f-37b9-acf7-fe05ebe5a7ba:::473bd192-a782-4929-9d3a-0986b000cd91:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647502305562, Current
>> step duration=60000, Current processing time=1647502305562, Next trigger
>> time=1647502365562,
>> CurrentKey=(FFX22OJAEAA,c0370908-213e-4983-832b-07fc3c3a9023d982e42e-7065-398f-a780-bce666dd935d:::c0370908-213e-4983-832b-07fc3c3a9023:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647502362192, Current
>> step duration=60000, Current processing time=1647502362192, Next trigger
>> time=1647502422192,
>> CurrentKey=(FFX22OJAEAA,1c575870-90eb-428b-880f-27a22abfb8b604970e00-2e6f-3684-ba49-3f92c7b5f6a7:::1c575870-90eb-428b-880f-27a22abfb8b6:::2cb4fe86-6b39-3e3b-b0c7-bbf8c20849ef)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (3/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647505262166, Current
>> step duration=60000, Current processing time=1647505262166, Next trigger
>> time=1647505322166,
>> CurrentKey=(FFX22OJAEAA,2177eb9c-10e9-41c8-aa20-ec2d8e98001af6b0155a-48cb-3fab-81d6-db72baa94cb2:::2177eb9c-10e9-41c8-aa20-ec2d8e98001a:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (9/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647505345283, Current
>> step duration=60000, Current processing time=1647505345283, Next trigger
>> time=1647505405283,
>> CurrentKey=(FFX22OJAEAA,ecd00778-158e-4300-a7bc-03e8df0a03af5691be9f-7c86-355e-8cd4-a44aeffed08d:::ecd00778-158e-4300-a7bc-03e8df0a03af:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647505375658, Current
>> step duration=60000, Current processing time=1647505375658, Next trigger
>> time=1647505435649,
>> CurrentKey=(FFX22OJAEAA,14264224-e250-491f-bf94-9bf2dffeaf1ae6a734c0-9262-35a0-a027-98e0246d3aa6:::14264224-e250-491f-bf94-9bf2dffeaf1a:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647505384592, Current
>> step duration=60000, Current processing time=1647505384592, Next trigger
>> time=1647505444592,
>> CurrentKey=(FFX22OJAEAA,14264224-e250-491f-bf94-9bf2dffeaf1a399ae7e4-7fe6-3d63-8148-ef807331df4e:::14264224-e250-491f-bf94-9bf2dffeaf1a:::d018b328-4fae-35d2-b5da-5a94cc33bfff)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647505464829, Current
>> step duration=60000, Current processing time=1647505464829, Next trigger
>> time=1647505524829,
>> CurrentKey=(FFX22OJAEAA,60a77fd9-da23-4bcd-9f4b-cc6726c66f6dd43a98e2-2e31-3303-a960-1549bbe33c52:::60a77fd9-da23-4bcd-9f4b-cc6726c66f6d:::eb704291-f184-3129-838f-a9672cb3106f)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (3/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647532905516, Current
>> step duration=60000, Current processing time=1647532905516, Next trigger
>> time=1647532965516,
>> CurrentKey=(FN5j5drAIAA,fd004bca-c964-40f6-8fa1-ed87550046a5b087ac94-272d-350e-b5f8-14e763d591bf:::fd004bca-c964-40f6-8fa1-ed87550046a5:::9d7e18cb-36dd-3539-ab11-45cf7cc58186)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647533504948, Current
>> step duration=60000, Current processing time=1647533504948, Next trigger
>> time=1647533564948,
>> CurrentKey=(FFX22OJAEAA,c4aea181-ac4a-4601-aaae-e76434f58a33f08ab4f2-f848-3ddb-a205-326af143eead:::c4aea181-ac4a-4601-aaae-e76434f58a33:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647533598124, Current
>> step duration=60000, Current processing time=1647533598124, Next trigger
>> time=1647533658124,
>> CurrentKey=(FFX22OJAEAA,c4aea181-ac4a-4601-aaae-e76434f58a3350c78e69-4e5f-38aa-98f5-2e736f0b2c8d:::c4aea181-ac4a-4601-aaae-e76434f58a33:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647533875472, Current
>> step duration=60000, Current processing time=1647533875472, Next trigger
>> time=1647533935472,
>> CurrentKey=(FFX22OJAEAA,6dc26572-3d52-45e9-80a6-acbb2961dbaaf2c2019d-fec9-31ad-8ea3-b3cdfbaca789:::6dc26572-3d52-45e9-80a6-acbb2961dbaa:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (4/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647533885077, Current
>> step duration=60000, Current processing time=1647533885077, Next trigger
>> time=1647533945077,
>> CurrentKey=(FFX22OJAEAA,6dc26572-3d52-45e9-80a6-acbb2961dbaae0c54f68-7cb1-3bfe-8250-7879cb107dca:::6dc26572-3d52-45e9-80a6-acbb2961dbaa:::81be5dad-757a-39d1-93d9-580c78f25ee5)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647533953284, Current
>> step duration=60000, Current processing time=1647533953285, Next trigger
>> time=1647534013284,
>> CurrentKey=(FFX22OJAEAA,843941a0-86af-44d9-8789-dc1349966db715a12169-026e-3d42-a611-9e65132b1d74:::843941a0-86af-44d9-8789-dc1349966db7:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647534024448, Current
>> step duration=60000, Current processing time=1647534024448, Next trigger
>> time=1647534084448,
>> CurrentKey=(FFX22OJAEAA,ad90b960-af3c-4f54-b4d6-27d9255d682be89a8d2a-4768-317d-9cdc-173df0254d46:::ad90b960-af3c-4f54-b4d6-27d9255d682b:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (2/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647534039240, Current
>> step duration=60000, Current processing time=1647534039240, Next trigger
>> time=1647534099240,
>> CurrentKey=(FFX22OJAEAA,ca0bd4f8-768e-4957-a9d7-5df47830f9e664101080-f3bc-3e31-9384-97d600f1c5b6:::ca0bd4f8-768e-4957-a9d7-5df47830f9e6:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (9/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647534141458, Current
>> step duration=60000, Current processing time=1647534141458, Next trigger
>> time=1647534201458,
>> CurrentKey=(FFX22OJAEAA,8f772d2c-54c9-457f-a794-5bb1d040fc64a6cd8388-207e-3f6d-a202-f8f25618faef:::8f772d2c-54c9-457f-a794-5bb1d040fc64:::f2ebcf18-c6b6-3aea-9b44-9ccfaa102b04)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647534212124, Current
>> step duration=60000, Current processing time=1647534212124, Next trigger
>> time=1647534272124,
>> CurrentKey=(FFX22OJAEAA,d79fdb02-7a4b-4314-8b72-d0c562881855c793dad3-8d8b-3915-9f23-0ae86796b21e:::d79fdb02-7a4b-4314-8b72-d0c562881855:::00b1fa4a-5418-36b5-8574-c72b2f746468)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647550774162, Current
>> step duration=60000, Current processing time=1647550774162, Next trigger
>> time=1647550834162,
>> CurrentKey=(FN5j5drAIAA,b8a1b626-fe8c-4237-abd6-151cbcb2123d2dd78241-0254-3bd8-9c4d-1fcfde2fe84c:::b8a1b626-fe8c-4237-abd6-151cbcb2123d:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647550924203, Current
>> step duration=60000, Current processing time=1647550924203, Next trigger
>> time=1647550984202,
>> CurrentKey=(FN5j5drAIAA,b8a1b626-fe8c-4237-abd6-151cbcb2123d15c50eb2-29cb-32e5-9739-aca3f6a56e40:::b8a1b626-fe8c-4237-abd6-151cbcb2123d:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (3/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647575168443, Current
>> step duration=60000, Current processing time=1647575168443, Next trigger
>> time=1647575228443,
>> CurrentKey=(FFX22OJAEAA,71e2884e-0449-490c-b9e8-0b19b9dc69bbd579c1e2-4fd8-3f50-a58a-c1359826f35e:::71e2884e-0449-490c-b9e8-0b19b9dc69bb:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (7/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647575241464, Current
>> step duration=60000, Current processing time=1647575241464, Next trigger
>> time=1647575301464,
>> CurrentKey=(FFX22OJAEAA,e3c144af-ca10-45e7-aad1-3e94207403b33505a124-9d4e-3223-8207-d179e8b2cd54:::e3c144af-ca10-45e7-aad1-3e94207403b3:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647575257221, Current
>> step duration=60000, Current processing time=1647575257221, Next trigger
>> time=1647575317221,
>> CurrentKey=(FFX22OJAEAA,3abbcf42-2c11-4295-8c5e-44370f5b68266b0456a1-8822-343a-9b8e-8f5837d0decb:::3abbcf42-2c11-4295-8c5e-44370f5b6826:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (9/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647575292500, Current
>> step duration=60000, Current processing time=1647575292500, Next trigger
>> time=1647575352499,
>> CurrentKey=(FFX22OJAEAA,454f09b4-aff3-4ece-a2f4-3ea27b8775241a37a0bd-c3de-3478-b1d2-ac04e1fb269c:::454f09b4-aff3-4ece-a2f4-3ea27b877524:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (3/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647575293520, Current
>> step duration=60000, Current processing time=1647575293520, Next trigger
>> time=1647575353520,
>> CurrentKey=(FFX22OJAEAA,454f09b4-aff3-4ece-a2f4-3ea27b8775249ec3dbb5-a42a-39cd-a970-c859c84a7f20:::454f09b4-aff3-4ece-a2f4-3ea27b877524:::0262adef-7cc7-3350-84b4-0ad975a4dc46)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (9/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647575377885, Current
>> step duration=60000, Current processing time=1647575377885, Next trigger
>> time=1647575437885,
>> CurrentKey=(FFX22OJAEAA,fde5a0e4-3d6e-408e-b06a-8eec6b0ed1e8e1b39c5a-eb5d-332c-95b0-919656677a57:::fde5a0e4-3d6e-408e-b06a-8eec6b0ed1e8:::e4919759-6a79-35c6-ac82-8f89b775cefa)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>     "message": "FunctionName=WfProcessFunction::processElement,
>> FunctionMessage=\"Time values\", Current system time=1647650433962, Current
>> step duration=60000, Current processing time=1647650433962, Next trigger
>> time=1647650493962,
>> CurrentKey=(FFX1q19AAAA,f9d6df71-4a27-4f0e-94b2-0020f622e489b71148e7-b197-34c4-867c-4f7c058e4dd6:::f9d6df71-4a27-4f0e-94b2-0020f622e489:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (8/9)#0",
>>
>>
>> - logs from onTimer() - logged as the very first line in the method:
>>         "message": "FunctionName=WfProcessFunction::onTimer::start,
>> Status=Started, Timestamp=1647252682500, TimerService
>> CurrentProcessingTime=1647252682501,
>> CurrentKey=(FFX22OJAEAA,b4c34e13-6207-4398-aa49-da091f9c523e2ade545e-0e7b-3c66-87bf-d292f6fbacc6:::b4c34e13-6207-4398-aa49-da091f9c523e:::5cb391e8-890c-3e2e-8b9b-6c562d366673)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>     "message": "FunctionName=WfProcessFunction::onTimer::start,
>> Status=Started, Timestamp=1647316113839, TimerService
>> CurrentProcessingTime=1647316113840,
>> CurrentKey=(FFX22OJAEAA,29eb80e6-9a09-4794-8a46-4332f98f42ef9d3d591d-bc6b-308a-b95e-c2e8d3bb3dc2:::29eb80e6-9a09-4794-8a46-4332f98f42ef:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>     "message": "FunctionName=WfProcessFunction::onTimer::start,
>> Status=Started, Timestamp=1647502121573, TimerService
>> CurrentProcessingTime=1647502121574,
>> CurrentKey=(FFX22OJAEAA,f88e2327-01a7-49f3-9dfe-32b25b7f42a7c7c6bc04-0aae-3888-a358-bcedf8fbe5b1:::f88e2327-01a7-49f3-9dfe-32b25b7f42a7:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>     "message": "FunctionName=WfProcessFunction::onTimer::start,
>> Status=Started, Timestamp=1647505435649, TimerService
>> CurrentProcessingTime=1647505435650,
>> CurrentKey=(FFX22OJAEAA,14264224-e250-491f-bf94-9bf2dffeaf1ae6a734c0-9262-35a0-a027-98e0246d3aa6:::14264224-e250-491f-bf94-9bf2dffeaf1a:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>     "message": "FunctionName=WfProcessFunction::onTimer::start,
>> Status=Started, Timestamp=1647575301464, TimerService
>> CurrentProcessingTime=1647575301466,
>> CurrentKey=(FFX22OJAEAA,e3c144af-ca10-45e7-aad1-3e94207403b33505a124-9d4e-3223-8207-d179e8b2cd54:::e3c144af-ca10-45e7-aad1-3e94207403b3:::service:none:)",
>>     "threadName": "WF Processor -> (Sink: act sink, Sink: wf log sink)
>> (6/9)#0",
>>
>> (Interestingly all the onTimer() logs are from just one thread - 6/9 in
>> this case)
>>
>> Thanks!
>>
>> On Fri, Mar 18, 2022 at 5:02 PM Yun Gao <yungao...@aliyun.com> wrote:
>>
>>> [ External sender. Exercise caution. ]
>>>
>>> Hi Binil,
>>>
>>> I think the code itself also looks good to me. May I have a double
>>> confirmation on the
>>> details of the issue:
>>> 1. What is the parallelism of this operator, and does the issues occurs
>>> for all the subtasks?
>>> 2. Have we already added some logs in the processElement and onTimer to
>>> print the time of
>>> registered processing timer and the time of the callbacks? Could you
>>> also share this part of result?
>>>
>>> Best,
>>> Yun Gao
>>>
>>> ------------------------------------------------------------------
>>> From:Binil Benjamin <bbenja...@splunk.com>
>>> Send Time:2022 Mar. 18 (Fri.) 16:07
>>> To:"yu'an huang" <h.yuan...@gmail.com>
>>> Cc:user <user@flink.apache.org>
>>> Subject:Re: onTimer() of a KeyedProcessFunction stops getting triggered
>>> after a while
>>>
>>> Hi,
>>>
>>> Unfortunately, I cannot share the entire code, but the
>>> class roughly looks like this:
>>>
>>>  public class WfProcessFunction extends
>>> KeyedProcessFunction<Tuple2<String, String>, Map<String, Object>,
>>> Map<String, Object>> {
>>>
>>>     @Override
>>>     public void processElement(Map<String, Object> inputRecord,
>>>         Context context, Collector<Map<String, Object>> collector)
>>> throws Exception {
>>>         ...
>>>
>>> context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime()
>>> + 5 * TimeUnit.SECONDS.toMillis(1L));
>>>         ...
>>>     }
>>>
>>>     @Override
>>>     public void onTimer(long timestamp, OnTimerContext ctx,
>>> Collector<Map<String, Object>> out) throws Exception {
>>>         ...
>>>     }
>>> }
>>> Thanks!
>>>
>>> On Thu, Mar 17, 2022 at 9:24 PM yu'an huang <h.yuan...@gmail.com> wrote:
>>> [ External sender. Exercise caution. ]
>>>
>>> Hi, can you share your code so we can check whether it is written
>>> correctly.
>>>
>>>
>>>
>>> > On 18 Mar 2022, at 7:54 AM, Binil Benjamin <bbenja...@splunk.com>
>>> wrote:
>>> >
>>> > Hi,
>>> >
>>> > We have a class that extends KeyedProcessFunction and overrides
>>> onTimer() method. During processElement(), we register a timer callback
>>> using
>>> context.timerService().registerProcessingTimeTimer(<some-future-time>). For
>>> a while, we see that the onTimer() method is getting called back and
>>> everything works as expected; however, after a while, the onTimer() stops
>>> getting any callbacks from Flink (the registration of the timer via.
>>> registerProcessingTimeTimer() is working just fine). Does anyone know what
>>> could be wrong here and how we can debug this?
>>> >
>>> > Flink version is 1.13.2 (running on AWS KDA)
>>> >
>>> > Thanks!
>>>
>>>
>>>
>>>
>

Reply via email to