One thing to keep in mind - any bundle can be executed multiple times, and
that includes timer firings. The runners will guarantee that only one
execution of the timer will "win" (i.e. if you output to another
PCollection from the timer, only one output will end up in that
PCollection). However if you are logging from the timer callback, you can
see repeated logs. The same hold true for any side-effect operation - e.g.
if you call an external RPC from the timer callback, those calls may be
repeatetd.

On Wed, Jul 15, 2020 at 8:23 PM Kenneth Knowles <k...@apache.org> wrote:

> Hello!
>
> What runner are you using? Does this reproduce on multiple runners? (it is
> very quick to try out your pipeline on DirectRunner and local versions of
> open source runners like Flink, Spark, etc)
>
> If you can produce a complete working reproduction it will be easier for
> someone to debug. I do not see anything wrong with your code. I assumed you
> got the `window` variable out of the ProcessContext\ (you can also make it
> a parameter to @ProcessElement)
>
> Kenn
>
> On Wed, Jul 15, 2020 at 4:38 PM Zhiheng Huang <sylvon.w...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am trying to set a timer at window expiration time for my use case and
>> expect it to fire just once per key per window.
>> But instead I observe that the onTimer() method gets called multiple
>> times almost all the time.
>>
>> Here's the relevant code snippet:
>>
>> @TimerId(WIN_EXP)
>> private final TimerSpec winexp = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>> @StateId(COUNTS)
>> private final StateSpec<ValueState<Map<String, Integer>>> counts =
>> StateSpecs.value();
>>
>> @ProcessElement
>> public void process(
>>                     ProcessContext context,
>>                     @StateId(COUNTS) ValueState<Map<String, Integer>>
>> countsState,
>>                     @TimerId(WIN_EXP) Timer winExpTimer) {
>>
>>   ...
>>   Map<String, Integer> counts = countsState.read();
>>   if (counts == null) {
>>     counts = new HashMap<>();
>>     // Only place where I set the timer
>>
>> winExpTimer.set(window.maxTimestamp().plus(Duration.standardMinutes(1)));
>>   }
>>   ... // no return here and I do not observe exception in the pipeline
>>   countsState.write(counts);
>>   ...
>> }
>>
>> I tried adding logs in OnTimer:
>>
>> String key = keyState.read();
>> if (key != null && key.equals("xxx")) {
>>   logger.error(String.format("fired for %s.",
>> context.window().maxTimestamp().toDateTime()));
>> }
>>
>> Output:
>>
>> E 2020-07-15T23:08:38.938Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:08:04.004Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:08:03.221Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:07:49.132Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:07:47.010Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:07:40.679Z fired for 2020-07-15T13:04:59.999Z.
>> E 2020-07-15T23:07:33.925Z fired for 2020-07-15T13:04:59.999Z.
>>
>> Seems like this is not due to some contention, the first log and the last
>> is ~1minute apart. BTW, my allowed
>> lateness is also set to 1 minute.
>>
>> Anyone can let me know if I am missing something here? I am using beam
>> 2.22 and dataflow runner.
>>
>> Thanks!
>>
>>

Reply via email to