Thanks to a lot of hard work by Rehman, Beam now supports dynamic timers.
As a reminder, this was discussed on the dev list some time back.
As background, previously one had to statically declare all timers in your
code. So if you wanted to have two timers, you needed to create two timer
variables and two callbacks - one for each timer. A number of users kept
hitting stumbling blocks where they needed a dynamic set of timers (often
based on the element), which was not supported in Beam. The workarounds
were quite ugly and complicated.
The new support allows declaring a TimerMap, which is a map of timers. Each
TimerMap is scoped by a family name, so you can create multiple TimerMaps
each with its own callback. The use looks as follows:
class MyDoFn extends DoFn<...> {
@TimerFamily("timers")
private final TimerSpec timerMap =
TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(@TimerFamily("timers") TimerMap timers, @Element
Type e) {
timers.set("mainTimer", timestamp);
timers.set("actionType" + e.getActionType(), timestamp);
}
@OnTimerFamily .
public void onTimer(@TimerId String timerId) {
System.out.println("Timer fired. id: " + timerId);
}
}
This currently works for the Flink and the Dataflow runners.
Thank you Rehman for getting this done! Beam users will find it very
valuable.
Reuven