Hello all :)!
I'm having trouble creating a tick service.
Goal: register a TableSource that emits a Row roughly every 200ms in
processing time. The Row would contain only one column "counter" that is
incremented by 1 each Row.
Current attempt: Using TimerService
A TableSource with
public DataStream<String> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv
.fromElements((Long) offset) // default 0L, one element
.keyBy(new NullByteKeySelector<>())
.process(new TickKeyedProcessFunction(200L))
.forceNonParallel();
}
And a KeyedProcessFunction with onTimer doing the heavy-lifting:
public void processElement(Long value, Context context,
Collector<Long> collector) throws IOException {
// called once
counter.update(value);
Long now = System.currentTimeMillis();
context.timerService().registerProcessingTimeTimer(now);
}
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<Long> out) throws Exception {
Long then = timestamp + interval;
Long current = counter.value();
current++;
counter.update(current);
ctx.timerService().registerProcessingTimeTimer(then);
out.collect(current);
}
Now, the runtime tells me the Source is in FINISHED status. So obviously
there must be limitations around re-scheduling one key inside onTimer.
Is there a way to use the TimerService to go around that?
Also, how would you implement this tick service by other means?
Cheers
Ben