Hi Reuven,
first of all big +1 for this.
Next, couple of questions that arise. Do you target DSLs only, or do you
suppose that this would be used by end-users as well? If only DSLs would
be in concern, then I think:
a) it is not only about timers, but state has to be managed in the
same way (although the mentioned JIRA talks about timers only now, so we
can start with that)
b) maybe an alternative approach (also discussed several times in
mailing lists) would be more flexible - expose the possibility to
provide runners directly with DoFnSignature, which would enable DSLs to
hook "closer" to runners
I don't currently have in mind how would this "lower level" API look
like, but concerning Euphoria, this would be a preferred solution.
Although the TimerMap looks general enough and also flexible enough, the
general solution must include state (at least as thought experiment),
just to make sure that we don't enable dynamic setup of timers, but
exposing the same functionality for state would again mean we have to
expose the complete DoFnSignature (and therefore make the TimerMap a
somewhat redundant feature).
Jan
On 10/22/19 12:23 AM, Reuven Lax wrote:
BEAM-6857 documents the need for dynamic timer support in the Beam
API. I wanted to make a proposal for what this API would look like,
and how to express it in the portability protos.
Background: Today Beam (especially BeamJava) requires a ParDo to
statically declare all timers it accesses at compile time. For example:
class MyDoFn extends DoFn<String, String> {
@TimerId("timer1") TimerSpec timer1 =
TimerSpecs.timer(TimeDomain(EVENT_TIME));
@TimerId("timer2") TimerSpec timer2 =
TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
@ProcessElement
public void process(@Element String e, @TimerId("timer1") Timer
timer1, @TimerId("timer2") Timer timer2)) {
timer1.set(...);
timer2.set(...);
}
@OnTimer("timer1") public void onTimer1() { ... }
@OnTimer("timer2") public void onTimer2() { ... }
}
This requires the author of a ParDo to know the full list of timers
ahead of time, which has been problematic in many cases. One example
where it causes issues is for DSLs such as Euphoria or Scio. DSL
authors usually write ParDos to interpret the code written in the
high-level DSL, and so don't know ahead of time the list of timers
needed; alternatives today are quite ugly: physical code generation or
creating a single timer that multiplexes all of the users logical
timers. There are also cases where a ParDo needs multiple distinct
timers, but the set of distinct timers is controlled by the input
data, and therefore not knowable in advance. The Beam timer API has
been insufficient for these use cases.
I propose a new TimerMap construct, which allow a ParDo to dynamically
set named timers. It's use in the Java API would look as follows:
class MyDoFn extends DoFn<String, String> {
@TimerId("timers") TimerSpec timers =
TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
@ProcessElement
public void process(@Element String e, @TimerId("timers") TimerMap
timer)) {
timers.set("timer1", ...);
timers.set("timer2", ...);
}
@OnTimer("timer") public void onTimer(@TimerId String timerFired,
@Timestamp Instant timerTs) { ... }
}
There is a new TimerSpec type to specify a TimerMap. The TimerMap
class itself allows dynamically setting multiple timers based on a
String tag argument. Each TimerMap has a single callback which when
called is given the id of the timer that is currently firing.
It is allowed to have multiple TimerMap objects in a ParDo (and
required if you want to have both processing-time and event-time
timers in the same ParDo). Each TimerMap is its own logical namespace.
i.e. if the user sets timers with the same string tag on different
TimerMap objects the timers will not collide.
Currently the portability protos were written to mirror the Java API,
expecting one TimerSpec per timer accessed by the ParDo. I suggest
that we instead make TimerMap the default for portability, and model
the current behavior on top of timer map. If this proves problematic
for some runners, we could instead introduce a new TimerSpec proto to
represent TimerMap.
Thoughts?
Reuven