Hi Reuven,

I didn't propose to restrict the model. Model can (and should have) multiple timers per key and even dynamic. The question was if this can be made efficiently by using single timer (after all, the runner will probably have single "timer service" so no matter what we expose on the API level, this will end up being multiplexed in the runner). And it might have additional benefits of preventing bugs. But I'm not proposing to do this change for existing timers, that was more a question about if we really must force runners to be able to implement dynamic timers or we can do it on the translation layer generally for all runners at once.

Regarding the API - which is again independent question of how it will be implemented - what do we need the @TimerFamily TimerSpec declaration for? I see two reasons:

 a) it holds the time domain

 b) it declares the DoFn as being stateful

Property a) looks like it can be specified when setting the timer. b) could be inferred from @ProcessElement (or other method). What about

class MyDoFn extends DoFn<String, String> {
  @ProcessElement
  // declares @TimerContext which implies stateful DoFn
  public void process(@Element String e, @TimerContext TimerContext timers)) {
    Timer timer1 = timers.get("timer1", EVENT_TIME);
    Timer timer2 = timers.get("timer2", PROCESSING_TIME);
timer1.set(...);
timer2.set(...);
  }

  // empty name might be allowed iff the declaration contains @TimerContext, so that declares using dynamic timers   @OnTimer public void onTimer(@TimerId String timerFired, @Timestamp Instant timerTs, @TimerContext TimerContext timers) { ... }
}

I'm still seeking the analogy with dynamic state, because in this API, that might become

class MyDoFn extends DoFn<String, String> {
  @ProcessElement
  public void process(@Element String e, @StateContext StateContext states)) {
    ValueState state = states.get("myDynamicState", StateSpec...);
    state.get(...)
    state.set(...)
  }
}

The point is that there seems to be no use for any declaration like @TimerFamily in case of dynamic state, because there is no domain. It would feel weird to have to declare something for dynamic timers and not have to do it for state.

Jan

On 10/29/19 6:56 AM, Reuven Lax wrote:
Just to circle back around, after the discussion on this thread I propose modifying the proposed API as follows:

class MyDoFn extends DoFn<String, String> {
  @TimerFamily("timers") TimerSpec timers = TimerSpecs.timerFamily(TimeDomain(EVENT_TIME));

  @ProcessElement
  public void process(@Element String e, @TimerFamily("timers") TimerMap timers)) {
timers.set("timer1", ...);
timers.set("timer2", ...);
  }

  @OnTimer("timer") public void onTimer(@TimerId String timerFired, @Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) { ... }
}

Discussions around exposing DoFnSignature and DoFnInvoker to DSL authors are a bit independent (though not completely so, as it does relate), so I suggest splitting that into a separate discussion.

Reuven

On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax <re...@google.com <mailto:re...@google.com>> wrote:



    On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Hi Reuven,

        yes, if this change is intended to be used by end users, then
        DoFnSignatures cannot be used, agree on that. Regarding the
        relationship with dynamic state - I agree that this is
        separate problem, but because it is close enough, we should
        know how we want to deal with that. Because state and timers
        share some functionality (after all timers need state to be
        fault tolerant), these API should IMO share the same logic.
        Whatever solution chosen to expose dynamic timers, it should
        extend to dynamic state.

        I'd like to stop a little with the premise that users want
        dynamic timers (that is timers whose *name* - and therefore
        behavior - is determined by incoming data). Could this case be
        modeled so that the timer actually has one more (implicit)
        state variable that actually holds collection of tuples
        (timestamp, name)? Then the timer would be invoked at given
        (minimum of all currently set) timestamps with respective
        name? The question here probably is - can this have
        performance impact? That is to say - can any runner actually
        do anything different from this in the sense of time
        complexity of the algorithm?


    Yes - you could always multiplex many timers one one. This is what
    some users do today, but it tends to be very inefficient and also
    complex. The Beam model requires runners to support dynamic timers
    per key (e.g. that how windowing is implemented - each window has
    a separate timer), so there's no reason not to expose this to users.

        I'm a little afraid if actually letting users define
        data-driven timers might not be too restrictive for some
        runners. Yes, runners that don't have this option would
        probably be able to resort to the logic described above, but
        if this work could be reasonably done for all runners, then we
        wouldn't force runners to actually implement it. And, the API
        could look as if the timers were actually dynamic.

        Jan

        P.S. If dynamic (and therefore any number) of timers can be
        actually implemented using single timer, that might be
        interesting pattern, because single timer per (window, key)
        has many nice properties, like it implicitly avoids situation
        where timer invocation is not ordered ([BEAM-7520]), which
        seems to issue for multiple runners (samza, portable flink).

    BEAM-7520 is simply an implementation bug. I don't think it makes
    sense to fix a bug by restricting the model.

        On 10/22/19 6:52 PM, Reuven Lax wrote:
        Kenn:
        +1 to using TimerFamily instead of TimerId and TimerMap.

        Jan:
        This is definitely not just for DSLs. I've definitely seen
        cases where the user wants different timers based on input
        data, so they cannot be defined statically. As a thought
        experiment: one stated goal of state + timers was to provide
        the low-level tools we use to implement windowing. However to
        implement windowing you need a dynamic set of timers, not
        just a single one. Now most users don't need to reimplement
        windowing (though we have had some users who had that need,
        when they wanted something slightly different than what
        native Beam windowing provided), however the need for dynamic
        timers is not unheard of.

        +1 to allowing dynamic state. However I think this is
        separate enough from timers that it doesn't need to be
        coupled in this discussion. Dynamic state also raises the
        wrinkle of pipeline compatibility (as you mentioned), which I
        think is a bit less of an issue for dynamic timers.

        Allowing a DSL to specify a DoFnSignature does not quite
        solve this problem. The DSL still needs a way to set and
        process the timers. It also does not solve the problem where
        the timers are based on input data elements, so cannot be
        known at pipeline construction time. However what might be
        more important is statically defining the timer families, and
        a DSL could do this by specifying a DoFnSignature (and
        something similar could be done with state). Also as
        mentioned above, this is useful to normal Beam users as well,
        and we shouldn't force normal users to start dealing with
        DoFnSignatures and DoFnInvokers.






        On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz
        <mailto:je...@seznam.cz>> wrote:

            Hi Max,

            wouldn't that be actually the same as

            class MyDoFn extends DoFn<String, String> {


               @ProcessElement
               public void process(
                   ProcessContext context) {
                 // "get" would register a new TimerSpec
                 Timer timer1 = context.getTimer("timer1");
                 Timer timer2 = context.getTimer("timer2");
                 timers.set(...);
                 timers.set(...);
               }

            That is - no need to declare anything? One more concern
            about that - if
            we allow registration of timers (or even state)
            dynamically like that it
            might be harder to perform validation of pipeline upon
            upgrades.

            Jan

            On 10/22/19 4:47 PM, Maximilian Michels wrote:
            > The idea makes sense to me. I really like that Beam
            gives upfront
            > specs for timer and state, but it is not flexible
            enough for
            > timer-based libraries or for users which want to
            dynamically generate
            > timers.
            >
            > I'm not sure about the proposed API yet. Shouldn't we
            separate the
            > timer specs from setting actual timers?
            >
            > Suggestion:
            >
            > class MyDoFn extends DoFn<String, String> {
            >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
            >
            >   @ProcessElement
            >   public void process(
            >       @Element String e,
            >       @TimerMap TimerMap timers)) {
            >     // "get" would register a new TimerSpec
            >     Timer timer1 = timers.get("timer1");
            >     Timer timer2 = timers.get("timer2");
            >     timers.set(...);
            >     timers.set(...);
            >   }
            >
            >   // No args for "@OnTimer" => use generic TimerMap
            >   @OnTimer
            >   public void onTimer(
            >       @TimerId String timerFired,
            >       @Timestamp Instant timerTs,
            >       @TimerMap TimerMap timers) {
            >      // Timer firing
            >      ...
            >      // Set this timer (or another)
            >      Timer timer = timers.get(timerFired);
            >      timer.set(...);
            >   }
            > }
            >
            > What do you think?
            >
            > -Max
            >
            > On 22.10.19 10:35, Jan Lukavský wrote:
            >> Hi Kenn,
            >>
            >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
            >>> This seems extremely useful.
            >>>
            >>> I assume you mean `@OnTimer("timers")` in your
            example. I would
            >>> suggest that the parameter annotation be something other
            >>> than @TimerId since that annotation is already used
            for a very
            >>> similar but different purpose; they are close enough
            that it is
            >>> tempting to pun them, but it is clearer to keep them
            distinct IMO.
            >>> Perhaps @TimerName or @TimerKey or some such.
            Alternatively,
            >>> keep @TimerId in the parameter list and change the
            declaration
            >>> to @TimerFamily("timers"). I think "family" or
            "group" may be more
            >>> clear naming than "map".
            >>>
            >>> At the portability level, this API does seem to be
            pretty close to a
            >>> noop in terms of the messages that needs to be sent
            over the Fn API,
            >>> so it makes sense to loosen the protos. By the time
            the Fn API is in
            >>> play, all of our desires to catch errors prior to
            execution are
            >>> irrelevant anyhow.
            >>>
            >>> On the other hand, I think DSLs have a different &
            bigger problem
            >>> than this, in that they want to programmatically
            adjust all the
            >>> capabilities of a DoFn. Same goes for wrapping one
            DoFn in
            >>> another. Certainly some limited DSL use cases are
            addressed by this,
            >>> but I wouldn't take that as a primary use case for
            this feature.
            >>> Ultimately they are probably better served by being
            able to
            >>> explicitly author a DoFnInvoker and provide it to a
            variant of
            >>> beam:transforms:ParDo where the do_fn field is a
            serialized
            >>> DoFnInvoker. Now that I think about this, I cannot
            recall why we
            >>> don't already ship a DoFnSignature & DoFnInvoker as
            the payload.
            >>> That would allow maximum flexibility in utilizing the
            portability
            >>> framework.
            >>
            >> yes, exactly, but when DSLs are in question, we have
            to make sure
            >> that DSLs are not bound to portability - we have to be
            able to
            >> translate even in case of "legacy" runners as well.
            That might
            >> complicate things a bit maybe.
            >>
            >> Jan
            >>
            >>>
            >>> Kenn
            >>>
            >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax
            <re...@google.com <mailto:re...@google.com>
            >>> <mailto:re...@google.com <mailto:re...@google.com>>>
            wrote:
            >>>
            >>>     BEAM-6857 documents the need for dynamic timer
            support in the Beam
            >>>     API. I wanted to make a proposal for what this
            API would look
            >>>     like, and how to express it in the portability
            protos.
            >>>
            >>>     Background: Today Beam (especially BeamJava)
            requires a ParDo to
            >>>     statically declare all timers it accesses at
            compile time. For
            >>>     example:
            >>>
            >>>     class MyDoFn extends DoFn<String, String> {
            >>>       @TimerId("timer1") TimerSpec timer1 =
            >>> TimerSpecs.timer(TimeDomain(EVENT_TIME));
            >>>       @TimerId("timer2") TimerSpec timer2 =
            >>> TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
            >>>
            >>>       @ProcessElement
            >>>       public void process(@Element String
            e, @TimerId("timer1") Timer
            >>>     timer1, @TimerId("timer2") Timer timer2)) {
            >>>         timer1.set(...);
            >>>         timer2.set(...);
            >>>       }
            >>>
            >>>       @OnTimer("timer1") public void onTimer1() { ... }
            >>>       @OnTimer("timer2") public void onTimer2() { ... }
            >>>     }
            >>>
            >>>     This requires the author of a ParDo to know the
            full list of
            >>>     timers ahead of time, which has been problematic
            in many cases.
            >>>     One example where it causes issues is for DSLs
            such as Euphoria or
            >>>     Scio. DSL authors usually write ParDos to
            interpret the code
            >>>     written in the high-level DSL, and so don't know
            ahead of time the
            >>>     list of timers needed; alternatives today are
            quite ugly: physical
            >>>     code generation or creating a single timer that
            multiplexes all of
            >>>     the users logical timers. There are also cases
            where a ParDo needs
            >>>     multiple distinct timers, but the set of distinct
            timers is
            >>>     controlled by the input data, and therefore not
            knowable in
            >>>     advance. The Beam timer API has been insufficient
            for these use
            >>> cases.
            >>>
            >>>     I propose a new TimerMap construct, which allow a
            ParDo to
            >>>     dynamically set named timers. It's use in the
            Java API would look
            >>>     as follows:
            >>>
            >>>     class MyDoFn extends DoFn<String, String> {
            >>>       @TimerId("timers") TimerSpec timers =
            >>> TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
            >>>
            >>>       @ProcessElement
            >>>       public void process(@Element String
            e, @TimerId("timers")
            >>>     TimerMap timer)) {
            >>>         timers.set("timer1", ...);
            >>>         timers.set("timer2", ...);
            >>>       }
            >>>
            >>>       @OnTimer("timer") public void onTimer(@TimerId
            String
            >>>     timerFired, @Timestamp Instant timerTs) { ... }
            >>>     }
            >>>
            >>>     There is a new TimerSpec type to specify a
            TimerMap. The TimerMap
            >>>     class itself allows dynamically setting multiple
            timers based on a
            >>>     String tag argument. Each TimerMap has a single
            callback which
            >>>     when called is given the id of the timer that is
            currently firing.
            >>>
            >>>     It is allowed to have multiple TimerMap objects
            in a ParDo (and
            >>>     required if you want to have both processing-time
            and event-time
            >>>     timers in the same ParDo). Each TimerMap is its
            own logical
            >>>     namespace. i.e. if the user sets timers with the
            same string tag
            >>>     on different TimerMap objects the timers will not
            collide.
            >>>
            >>>     Currently the portability protos were written to
            mirror the Java
            >>>     API, expecting one TimerSpec per timer accessed
            by the ParDo. I
            >>>     suggest that we instead make TimerMap the default
            for portability,
            >>>     and model the current behavior on top of timer
            map. If this proves
            >>>     problematic for some runners, we could instead
            introduce a new
            >>>     TimerSpec proto to represent TimerMap.
            >>>
            >>>     Thoughts?
            >>>
            >>>     Reuven
            >>>

Reply via email to