Hello!

What runner are you using? Does this reproduce on multiple runners? (it is
very quick to try out your pipeline on DirectRunner and local versions of
open source runners like Flink, Spark, etc)

If you can produce a complete working reproduction it will be easier for
someone to debug. I do not see anything wrong with your code. I assumed you
got the `window` variable out of the ProcessContext\ (you can also make it
a parameter to @ProcessElement)

Kenn

On Wed, Jul 15, 2020 at 4:38 PM Zhiheng Huang <sylvon.w...@gmail.com> wrote:

> Hi,
>
> I am trying to set a timer at window expiration time for my use case and
> expect it to fire just once per key per window.
> But instead I observe that the onTimer() method gets called multiple times
> almost all the time.
>
> Here's the relevant code snippet:
>
> @TimerId(WIN_EXP)
> private final TimerSpec winexp = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
> @StateId(COUNTS)
> private final StateSpec<ValueState<Map<String, Integer>>> counts =
> StateSpecs.value();
>
> @ProcessElement
> public void process(
>                     ProcessContext context,
>                     @StateId(COUNTS) ValueState<Map<String, Integer>>
> countsState,
>                     @TimerId(WIN_EXP) Timer winExpTimer) {
>
>   ...
>   Map<String, Integer> counts = countsState.read();
>   if (counts == null) {
>     counts = new HashMap<>();
>     // Only place where I set the timer
>
> winExpTimer.set(window.maxTimestamp().plus(Duration.standardMinutes(1)));
>   }
>   ... // no return here and I do not observe exception in the pipeline
>   countsState.write(counts);
>   ...
> }
>
> I tried adding logs in OnTimer:
>
> String key = keyState.read();
> if (key != null && key.equals("xxx")) {
>   logger.error(String.format("fired for %s.",
> context.window().maxTimestamp().toDateTime()));
> }
>
> Output:
>
> E 2020-07-15T23:08:38.938Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:08:04.004Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:08:03.221Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:07:49.132Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:07:47.010Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:07:40.679Z fired for 2020-07-15T13:04:59.999Z.
> E 2020-07-15T23:07:33.925Z fired for 2020-07-15T13:04:59.999Z.
>
> Seems like this is not due to some contention, the first log and the last
> is ~1minute apart. BTW, my allowed
> lateness is also set to 1 minute.
>
> Anyone can let me know if I am missing something here? I am using beam
> 2.22 and dataflow runner.
>
> Thanks!
>
>

Reply via email to