Everyone on this thread seems to agree on the general idea, and remaining
discussions seem to be about details pertaining to the interface. Those are
easier to discuss on a PR, so I suggest we move ahead with a PR, and if
people want to they are welcome to comment on the PR.

Reuven

On Fri, Nov 1, 2019 at 11:07 AM Reuven Lax <re...@google.com> wrote:

> Hi Jan,
>
> Your proposal has merit, but I think using the TimerFamily specification
> is more consistent with the existing API. I think that a StateFamily can
> also have domains just like timers.
>
> Luke's suggestion for the proto changes sound good.
>
> Reuven
>
> On Tue, Oct 29, 2019 at 2:43 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Reuven,
>>
>> I didn't propose to restrict the model. Model can (and should have)
>> multiple timers per key and even dynamic. The question was if this can be
>> made efficiently by using single timer (after all, the runner will probably
>> have single "timer service" so no matter what we expose on the API level,
>> this will end up being multiplexed in the runner). And it might have
>> additional benefits of preventing bugs. But I'm not proposing to do this
>> change for existing timers, that was more a question about if we really
>> must force runners to be able to implement dynamic timers or we can do it
>> on the translation layer generally for all runners at once.
>>
>> Regarding the API - which is again independent question of how it will be
>> implemented - what do we need the @TimerFamily TimerSpec declaration for? I
>> see two reasons:
>>
>>  a) it holds the time domain
>>
>>  b) it declares the DoFn as being stateful
>>
>> Property a) looks like it can be specified when setting the timer. b)
>> could be inferred from @ProcessElement (or other method). What about
>> class MyDoFn extends DoFn<String, String> {
>>   @ProcessElement
>>   // declares @TimerContext which implies stateful DoFn
>>   public void process(@Element String e, @TimerContext TimerContext
>> timers)) {
>>     Timer timer1 = timers.get("timer1", EVENT_TIME);
>>     Timer timer2 = timers.get("timer2", PROCESSING_TIME);
>>     timer1.set(...);
>>     timer2.set(...);
>>   }
>>
>>   // empty name might be allowed iff the declaration contains
>> @TimerContext, so that declares using dynamic timers
>>   @OnTimer public void onTimer(@TimerId String timerFired, @Timestamp
>> Instant timerTs, @TimerContext TimerContext timers) { ... }
>> }
>>
>> I'm still seeking the analogy with dynamic state, because in this API,
>> that might become
>>
>> class MyDoFn extends DoFn<String, String> {
>>   @ProcessElement
>>   public void process(@Element String e, @StateContext StateContext
>> states)) {
>>     ValueState state = states.get("myDynamicState", StateSpec...);
>>     state.get(...)
>>     state.set(...)
>>   }
>> }
>>
>> The point is that there seems to be no use for any declaration like
>> @TimerFamily in case of dynamic state, because there is no domain. It would
>> feel weird to have to declare something for dynamic timers and not have to
>> do it for state.
>>
>> Jan
>>
>> On 10/29/19 6:56 AM, Reuven Lax wrote:
>>
>> Just to circle back around, after the discussion on this thread I propose
>> modifying the proposed API as follows:
>>
>> class MyDoFn extends DoFn<String, String> {
>>   @TimerFamily("timers") TimerSpec timers =
>> TimerSpecs.timerFamily(TimeDomain(EVENT_TIME));
>>
>>   @ProcessElement
>>   public void process(@Element String e, @TimerFamily("timers") TimerMap
>> timers)) {
>>     timers.set("timer1", ...);
>>     timers.set("timer2", ...);
>>   }
>>
>>   @OnTimer("timer") public void onTimer(@TimerId String timerFired,
>> @Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) {
>> ... }
>> }
>>
>> Discussions around exposing DoFnSignature and DoFnInvoker to DSL authors
>> are a bit independent (though not completely so, as it does relate), so I
>> suggest splitting that into a separate discussion.
>>
>> Reuven
>>
>> On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax <re...@google.com> wrote:
>>
>>>
>>>
>>> On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi Reuven,
>>>>
>>>> yes, if this change is intended to be used by end users, then
>>>> DoFnSignatures cannot be used, agree on that. Regarding the relationship
>>>> with dynamic state - I agree that this is separate problem, but because it
>>>> is close enough, we should know how we want to deal with that. Because
>>>> state and timers share some functionality (after all timers need state to
>>>> be fault tolerant), these API should IMO share the same logic. Whatever
>>>> solution chosen to expose dynamic timers, it should extend to dynamic 
>>>> state.
>>>>
>>>> I'd like to stop a little with the premise that users want dynamic
>>>> timers (that is timers whose *name* - and therefore behavior - is
>>>> determined by incoming data). Could this case be modeled so that the timer
>>>> actually has one more (implicit) state variable that actually holds
>>>> collection of tuples (timestamp, name)? Then the timer would be invoked at
>>>> given (minimum of all currently set) timestamps with respective name? The
>>>> question here probably is - can this have performance impact? That is to
>>>> say - can any runner actually do anything different from this in the sense
>>>> of time complexity of the algorithm?
>>>>
>>>
>>> Yes - you could always multiplex many timers one one. This is what some
>>> users do today, but it tends to be very inefficient and also complex. The
>>> Beam model requires runners to support dynamic timers per key (e.g. that
>>> how windowing is implemented - each window has a separate timer), so
>>> there's no reason not to expose this to users.
>>>
>>>> I'm a little afraid if actually letting users define data-driven timers
>>>> might not be too restrictive for some runners. Yes, runners that don't have
>>>> this option would probably be able to resort to the logic described above,
>>>> but if this work could be reasonably done for all runners, then we wouldn't
>>>> force runners to actually implement it. And, the API could look as if the
>>>> timers were actually dynamic.
>>>>
>>>> Jan
>>>>
>>>> P.S. If dynamic (and therefore any number) of timers can be actually
>>>> implemented using single timer, that might be interesting pattern, because
>>>> single timer per (window, key) has many nice properties, like it implicitly
>>>> avoids situation where timer invocation is not ordered ([BEAM-7520]), which
>>>> seems to issue for multiple runners (samza, portable flink).
>>>>
>>> BEAM-7520 is simply an implementation bug. I don't think it makes sense
>>> to fix a bug by restricting the model.
>>>
>>>
>>>> On 10/22/19 6:52 PM, Reuven Lax wrote:
>>>>
>>>> Kenn:
>>>> +1 to using TimerFamily instead of TimerId and TimerMap.
>>>>
>>>> Jan:
>>>> This is definitely not just for DSLs. I've definitely seen cases where
>>>> the user wants different timers based on input data, so they cannot be
>>>> defined statically. As a thought experiment: one stated goal of state +
>>>> timers was to provide the low-level tools we use to implement windowing.
>>>> However to implement windowing you need a dynamic set of timers, not just a
>>>> single one. Now most users don't need to reimplement windowing (though we
>>>> have had some users who had that need, when they wanted something slightly
>>>> different than what native Beam windowing provided), however the need for
>>>> dynamic timers is not unheard of.
>>>>
>>>> +1 to allowing dynamic state. However I think this is separate enough
>>>> from timers that it doesn't need to be coupled in this discussion. Dynamic
>>>> state also raises the wrinkle of pipeline compatibility (as you mentioned),
>>>> which I think is a bit less of an issue for dynamic timers.
>>>>
>>>> Allowing a DSL to specify a DoFnSignature does not quite solve this
>>>> problem. The DSL still needs a way to set and process the timers. It also
>>>> does not solve the problem where the timers are based on input data
>>>> elements, so cannot be known at pipeline construction time. However what
>>>> might be more important is statically defining the timer families, and a
>>>> DSL could do this by specifying a DoFnSignature (and something similar
>>>> could be done with state). Also as mentioned above, this is useful to
>>>> normal Beam users as well, and we shouldn't force normal users to start
>>>> dealing with DoFnSignatures and DoFnInvokers.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Max,
>>>>>
>>>>> wouldn't that be actually the same as
>>>>>
>>>>> class MyDoFn extends DoFn<String, String> {
>>>>>
>>>>>
>>>>>    @ProcessElement
>>>>>    public void process(
>>>>>        ProcessContext context) {
>>>>>      // "get" would register a new TimerSpec
>>>>>      Timer timer1 = context.getTimer("timer1");
>>>>>      Timer timer2 = context.getTimer("timer2");
>>>>>      timers.set(...);
>>>>>      timers.set(...);
>>>>>    }
>>>>>
>>>>> That is - no need to declare anything? One more concern about that -
>>>>> if
>>>>> we allow registration of timers (or even state) dynamically like that
>>>>> it
>>>>> might be harder to perform validation of pipeline upon upgrades.
>>>>>
>>>>> Jan
>>>>>
>>>>> On 10/22/19 4:47 PM, Maximilian Michels wrote:
>>>>> > The idea makes sense to me. I really like that Beam gives upfront
>>>>> > specs for timer and state, but it is not flexible enough for
>>>>> > timer-based libraries or for users which want to dynamically
>>>>> generate
>>>>> > timers.
>>>>> >
>>>>> > I'm not sure about the proposed API yet. Shouldn't we separate the
>>>>> > timer specs from setting actual timers?
>>>>> >
>>>>> > Suggestion:
>>>>> >
>>>>> > class MyDoFn extends DoFn<String, String> {
>>>>> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>>>>> >
>>>>> >   @ProcessElement
>>>>> >   public void process(
>>>>> >       @Element String e,
>>>>> >       @TimerMap TimerMap timers)) {
>>>>> >     // "get" would register a new TimerSpec
>>>>> >     Timer timer1 = timers.get("timer1");
>>>>> >     Timer timer2 = timers.get("timer2");
>>>>> >     timers.set(...);
>>>>> >     timers.set(...);
>>>>> >   }
>>>>> >
>>>>> >   // No args for "@OnTimer" => use generic TimerMap
>>>>> >   @OnTimer
>>>>> >   public void onTimer(
>>>>> >       @TimerId String timerFired,
>>>>> >       @Timestamp Instant timerTs,
>>>>> >       @TimerMap TimerMap timers) {
>>>>> >      // Timer firing
>>>>> >      ...
>>>>> >      // Set this timer (or another)
>>>>> >      Timer timer = timers.get(timerFired);
>>>>> >      timer.set(...);
>>>>> >   }
>>>>> > }
>>>>> >
>>>>> > What do you think?
>>>>> >
>>>>> > -Max
>>>>> >
>>>>> > On 22.10.19 10:35, Jan Lukavský wrote:
>>>>> >> Hi Kenn,
>>>>> >>
>>>>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>>>>> >>> This seems extremely useful.
>>>>> >>>
>>>>> >>> I assume you mean `@OnTimer("timers")` in your example. I would
>>>>> >>> suggest that the parameter annotation be something other
>>>>> >>> than @TimerId since that annotation is already used for a very
>>>>> >>> similar but different purpose; they are close enough that it is
>>>>> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
>>>>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>>>>> >>> keep @TimerId in the parameter list and change the declaration
>>>>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
>>>>> >>> clear naming than "map".
>>>>> >>>
>>>>> >>> At the portability level, this API does seem to be pretty close to
>>>>> a
>>>>> >>> noop in terms of the messages that needs to be sent over the Fn
>>>>> API,
>>>>> >>> so it makes sense to loosen the protos. By the time the Fn API is
>>>>> in
>>>>> >>> play, all of our desires to catch errors prior to execution are
>>>>> >>> irrelevant anyhow.
>>>>> >>>
>>>>> >>> On the other hand, I think DSLs have a different & bigger problem
>>>>> >>> than this, in that they want to programmatically adjust all the
>>>>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
>>>>> >>> another. Certainly some limited DSL use cases are addressed by
>>>>> this,
>>>>> >>> but I wouldn't take that as a primary use case for this feature.
>>>>> >>> Ultimately they are probably better served by being able to
>>>>> >>> explicitly author a DoFnInvoker and provide it to a variant of
>>>>> >>> beam:transforms:ParDo where the do_fn field is a serialized
>>>>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we
>>>>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
>>>>> >>> That would allow maximum flexibility in utilizing the portability
>>>>> >>> framework.
>>>>> >>
>>>>> >> yes, exactly, but when DSLs are in question, we have to make sure
>>>>> >> that DSLs are not bound to portability - we have to be able to
>>>>> >> translate even in case of "legacy" runners as well. That might
>>>>> >> complicate things a bit maybe.
>>>>> >>
>>>>> >> Jan
>>>>> >>
>>>>> >>>
>>>>> >>> Kenn
>>>>> >>>
>>>>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <re...@google.com
>>>>> >>> <mailto:re...@google.com>> wrote:
>>>>> >>>
>>>>> >>>     BEAM-6857 documents the need for dynamic timer support in the
>>>>> Beam
>>>>> >>>     API. I wanted to make a proposal for what this API would look
>>>>> >>>     like, and how to express it in the portability protos.
>>>>> >>>
>>>>> >>>     Background: Today Beam (especially BeamJava) requires a ParDo
>>>>> to
>>>>> >>>     statically declare all timers it accesses at compile time. For
>>>>> >>>     example:
>>>>> >>>
>>>>> >>>     class MyDoFn extends DoFn<String, String> {
>>>>> >>>       @TimerId("timer1") TimerSpec timer1 =
>>>>> >>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>>>>> >>>       @TimerId("timer2") TimerSpec timer2 =
>>>>> >>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>>>>> >>>
>>>>> >>>       @ProcessElement
>>>>> >>>       public void process(@Element String e, @TimerId("timer1")
>>>>> Timer
>>>>> >>>     timer1, @TimerId("timer2") Timer timer2)) {
>>>>> >>>         timer1.set(...);
>>>>> >>>         timer2.set(...);
>>>>> >>>       }
>>>>> >>>
>>>>> >>>       @OnTimer("timer1") public void onTimer1() { ... }
>>>>> >>>       @OnTimer("timer2") public void onTimer2() { ... }
>>>>> >>>     }
>>>>> >>>
>>>>> >>>     This requires the author of a ParDo to know the full list of
>>>>> >>>     timers ahead of time, which has been problematic in many cases.
>>>>> >>>     One example where it causes issues is for DSLs such as
>>>>> Euphoria or
>>>>> >>>     Scio. DSL authors usually write ParDos to interpret the code
>>>>> >>>     written in the high-level DSL, and so don't know ahead of time
>>>>> the
>>>>> >>>     list of timers needed; alternatives today are quite ugly:
>>>>> physical
>>>>> >>>     code generation or creating a single timer that multiplexes
>>>>> all of
>>>>> >>>     the users logical timers. There are also cases where a ParDo
>>>>> needs
>>>>> >>>     multiple distinct timers, but the set of distinct timers is
>>>>> >>>     controlled by the input data, and therefore not knowable in
>>>>> >>>     advance. The Beam timer API has been insufficient for these
>>>>> use
>>>>> >>> cases.
>>>>> >>>
>>>>> >>>     I propose a new TimerMap construct, which allow a ParDo to
>>>>> >>>     dynamically set named timers. It's use in the Java API would
>>>>> look
>>>>> >>>     as follows:
>>>>> >>>
>>>>> >>>     class MyDoFn extends DoFn<String, String> {
>>>>> >>>       @TimerId("timers") TimerSpec timers =
>>>>> >>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>>>>> >>>
>>>>> >>>       @ProcessElement
>>>>> >>>       public void process(@Element String e, @TimerId("timers")
>>>>> >>>     TimerMap timer)) {
>>>>> >>>         timers.set("timer1", ...);
>>>>> >>>         timers.set("timer2", ...);
>>>>> >>>       }
>>>>> >>>
>>>>> >>>       @OnTimer("timer") public void onTimer(@TimerId String
>>>>> >>>     timerFired, @Timestamp Instant timerTs) { ... }
>>>>> >>>     }
>>>>> >>>
>>>>> >>>     There is a new TimerSpec type to specify a TimerMap. The
>>>>> TimerMap
>>>>> >>>     class itself allows dynamically setting multiple timers based
>>>>> on a
>>>>> >>>     String tag argument. Each TimerMap has a single callback which
>>>>> >>>     when called is given the id of the timer that is currently
>>>>> firing.
>>>>> >>>
>>>>> >>>     It is allowed to have multiple TimerMap objects in a ParDo (and
>>>>> >>>     required if you want to have both processing-time and
>>>>> event-time
>>>>> >>>     timers in the same ParDo). Each TimerMap is its own logical
>>>>> >>>     namespace. i.e. if the user sets timers with the same string
>>>>> tag
>>>>> >>>     on different TimerMap objects the timers will not collide.
>>>>> >>>
>>>>> >>>     Currently the portability protos were written to mirror the
>>>>> Java
>>>>> >>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>>>>> >>>     suggest that we instead make TimerMap the default for
>>>>> portability,
>>>>> >>>     and model the current behavior on top of timer map. If this
>>>>> proves
>>>>> >>>     problematic for some runners, we could instead introduce a new
>>>>> >>>     TimerSpec proto to represent TimerMap.
>>>>> >>>
>>>>> >>>     Thoughts?
>>>>> >>>
>>>>> >>>     Reuven
>>>>> >>>
>>>>>
>>>>

Reply via email to