Hi Reuven,

first of all big +1 for this.

Next, couple of questions that arise. Do you target DSLs only, or do you suppose that this would be used by end-users as well? If only DSLs would be in concern, then I think:

 a) it is not only about timers, but state has to be managed in the same way (although the mentioned JIRA talks about timers only now, so we can start with that)

 b) maybe an alternative approach (also discussed several times in mailing lists) would be more flexible - expose the possibility to provide runners directly with DoFnSignature, which would enable DSLs to hook "closer" to runners

I don't currently have in mind how would this "lower level" API look like, but concerning Euphoria, this would be a preferred solution.

Although the TimerMap looks general enough and also flexible enough, the general solution must include state (at least as thought experiment), just to make sure that we don't enable dynamic setup of timers, but exposing the same functionality for state would again mean we have to expose the complete DoFnSignature (and therefore make the TimerMap a somewhat redundant feature).

Jan

On 10/22/19 12:23 AM, Reuven Lax wrote:
BEAM-6857 documents the need for dynamic timer support in the Beam API. I wanted to make a proposal for what this API would look like, and how to express it in the portability protos.

Background: Today Beam (especially BeamJava) requires a ParDo to statically declare all timers it accesses at compile time. For example:

class MyDoFn extends DoFn<String, String> {
  @TimerId("timer1") TimerSpec timer1 = TimerSpecs.timer(TimeDomain(EVENT_TIME));   @TimerId("timer2") TimerSpec timer2 = TimerSpecs.timer(TimeDomain(PROCESSING_TIME));

  @ProcessElement
  public void process(@Element String e, @TimerId("timer1") Timer timer1, @TimerId("timer2") Timer timer2)) {
    timer1.set(...);
    timer2.set(...);
  }

  @OnTimer("timer1") public void onTimer1() { ... }
  @OnTimer("timer2") public void onTimer2() { ... }
}

This requires the author of a ParDo to know the full list of timers ahead of time, which has been problematic in many cases. One example where it causes issues is for DSLs such as Euphoria or Scio. DSL authors usually write ParDos to interpret the code written in the high-level DSL, and so don't know ahead of time the list of timers needed; alternatives today are quite ugly: physical code generation or creating a single timer that multiplexes all of the users logical timers. There are also cases where a ParDo needs multiple distinct timers, but the set of distinct timers is controlled by the input data, and therefore not knowable in advance. The Beam timer API has been insufficient for these use cases.

I propose a new TimerMap construct, which allow a ParDo to dynamically set named timers. It's use in the Java API would look as follows:

class MyDoFn extends DoFn<String, String> {
  @TimerId("timers") TimerSpec timers = TimerSpecs.timerMap(TimeDomain(EVENT_TIME));

  @ProcessElement
  public void process(@Element String e, @TimerId("timers") TimerMap timer)) {
    timers.set("timer1", ...);
    timers.set("timer2", ...);
  }

  @OnTimer("timer") public void onTimer(@TimerId String timerFired, @Timestamp Instant timerTs) { ... }
}

There is a new TimerSpec type to specify a TimerMap. The TimerMap class itself allows dynamically setting multiple timers based on a String tag argument. Each TimerMap has a single callback which when called is given the id of the timer that is currently firing.

It is allowed to have multiple TimerMap objects in a ParDo (and required if you want to have both processing-time and event-time timers in the same ParDo). Each TimerMap is its own logical namespace. i.e. if the user sets timers with the same string tag on different TimerMap objects the timers will not collide.

Currently the portability protos were written to mirror the Java API, expecting one TimerSpec per timer accessed by the ParDo. I suggest that we instead make TimerMap the default for portability, and model the current behavior on top of timer map. If this proves problematic for some runners, we could instead introduce a new TimerSpec proto to represent TimerMap.

Thoughts?

Reuven

Reply via email to