Everyone on this thread seems to agree on the general idea, and remaining discussions seem to be about details pertaining to the interface. Those are easier to discuss on a PR, so I suggest we move ahead with a PR, and if people want to they are welcome to comment on the PR.
Reuven On Fri, Nov 1, 2019 at 11:07 AM Reuven Lax <re...@google.com> wrote: > Hi Jan, > > Your proposal has merit, but I think using the TimerFamily specification > is more consistent with the existing API. I think that a StateFamily can > also have domains just like timers. > > Luke's suggestion for the proto changes sound good. > > Reuven > > On Tue, Oct 29, 2019 at 2:43 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Reuven, >> >> I didn't propose to restrict the model. Model can (and should have) >> multiple timers per key and even dynamic. The question was if this can be >> made efficiently by using single timer (after all, the runner will probably >> have single "timer service" so no matter what we expose on the API level, >> this will end up being multiplexed in the runner). And it might have >> additional benefits of preventing bugs. But I'm not proposing to do this >> change for existing timers, that was more a question about if we really >> must force runners to be able to implement dynamic timers or we can do it >> on the translation layer generally for all runners at once. >> >> Regarding the API - which is again independent question of how it will be >> implemented - what do we need the @TimerFamily TimerSpec declaration for? I >> see two reasons: >> >> a) it holds the time domain >> >> b) it declares the DoFn as being stateful >> >> Property a) looks like it can be specified when setting the timer. b) >> could be inferred from @ProcessElement (or other method). What about >> class MyDoFn extends DoFn<String, String> { >> @ProcessElement >> // declares @TimerContext which implies stateful DoFn >> public void process(@Element String e, @TimerContext TimerContext >> timers)) { >> Timer timer1 = timers.get("timer1", EVENT_TIME); >> Timer timer2 = timers.get("timer2", PROCESSING_TIME); >> timer1.set(...); >> timer2.set(...); >> } >> >> // empty name might be allowed iff the declaration contains >> @TimerContext, so that declares using dynamic timers >> @OnTimer public void onTimer(@TimerId String timerFired, @Timestamp >> Instant timerTs, @TimerContext TimerContext timers) { ... } >> } >> >> I'm still seeking the analogy with dynamic state, because in this API, >> that might become >> >> class MyDoFn extends DoFn<String, String> { >> @ProcessElement >> public void process(@Element String e, @StateContext StateContext >> states)) { >> ValueState state = states.get("myDynamicState", StateSpec...); >> state.get(...) >> state.set(...) >> } >> } >> >> The point is that there seems to be no use for any declaration like >> @TimerFamily in case of dynamic state, because there is no domain. It would >> feel weird to have to declare something for dynamic timers and not have to >> do it for state. >> >> Jan >> >> On 10/29/19 6:56 AM, Reuven Lax wrote: >> >> Just to circle back around, after the discussion on this thread I propose >> modifying the proposed API as follows: >> >> class MyDoFn extends DoFn<String, String> { >> @TimerFamily("timers") TimerSpec timers = >> TimerSpecs.timerFamily(TimeDomain(EVENT_TIME)); >> >> @ProcessElement >> public void process(@Element String e, @TimerFamily("timers") TimerMap >> timers)) { >> timers.set("timer1", ...); >> timers.set("timer2", ...); >> } >> >> @OnTimer("timer") public void onTimer(@TimerId String timerFired, >> @Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) { >> ... } >> } >> >> Discussions around exposing DoFnSignature and DoFnInvoker to DSL authors >> are a bit independent (though not completely so, as it does relate), so I >> suggest splitting that into a separate discussion. >> >> Reuven >> >> On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax <re...@google.com> wrote: >> >>> >>> >>> On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi Reuven, >>>> >>>> yes, if this change is intended to be used by end users, then >>>> DoFnSignatures cannot be used, agree on that. Regarding the relationship >>>> with dynamic state - I agree that this is separate problem, but because it >>>> is close enough, we should know how we want to deal with that. Because >>>> state and timers share some functionality (after all timers need state to >>>> be fault tolerant), these API should IMO share the same logic. Whatever >>>> solution chosen to expose dynamic timers, it should extend to dynamic >>>> state. >>>> >>>> I'd like to stop a little with the premise that users want dynamic >>>> timers (that is timers whose *name* - and therefore behavior - is >>>> determined by incoming data). Could this case be modeled so that the timer >>>> actually has one more (implicit) state variable that actually holds >>>> collection of tuples (timestamp, name)? Then the timer would be invoked at >>>> given (minimum of all currently set) timestamps with respective name? The >>>> question here probably is - can this have performance impact? That is to >>>> say - can any runner actually do anything different from this in the sense >>>> of time complexity of the algorithm? >>>> >>> >>> Yes - you could always multiplex many timers one one. This is what some >>> users do today, but it tends to be very inefficient and also complex. The >>> Beam model requires runners to support dynamic timers per key (e.g. that >>> how windowing is implemented - each window has a separate timer), so >>> there's no reason not to expose this to users. >>> >>>> I'm a little afraid if actually letting users define data-driven timers >>>> might not be too restrictive for some runners. Yes, runners that don't have >>>> this option would probably be able to resort to the logic described above, >>>> but if this work could be reasonably done for all runners, then we wouldn't >>>> force runners to actually implement it. And, the API could look as if the >>>> timers were actually dynamic. >>>> >>>> Jan >>>> >>>> P.S. If dynamic (and therefore any number) of timers can be actually >>>> implemented using single timer, that might be interesting pattern, because >>>> single timer per (window, key) has many nice properties, like it implicitly >>>> avoids situation where timer invocation is not ordered ([BEAM-7520]), which >>>> seems to issue for multiple runners (samza, portable flink). >>>> >>> BEAM-7520 is simply an implementation bug. I don't think it makes sense >>> to fix a bug by restricting the model. >>> >>> >>>> On 10/22/19 6:52 PM, Reuven Lax wrote: >>>> >>>> Kenn: >>>> +1 to using TimerFamily instead of TimerId and TimerMap. >>>> >>>> Jan: >>>> This is definitely not just for DSLs. I've definitely seen cases where >>>> the user wants different timers based on input data, so they cannot be >>>> defined statically. As a thought experiment: one stated goal of state + >>>> timers was to provide the low-level tools we use to implement windowing. >>>> However to implement windowing you need a dynamic set of timers, not just a >>>> single one. Now most users don't need to reimplement windowing (though we >>>> have had some users who had that need, when they wanted something slightly >>>> different than what native Beam windowing provided), however the need for >>>> dynamic timers is not unheard of. >>>> >>>> +1 to allowing dynamic state. However I think this is separate enough >>>> from timers that it doesn't need to be coupled in this discussion. Dynamic >>>> state also raises the wrinkle of pipeline compatibility (as you mentioned), >>>> which I think is a bit less of an issue for dynamic timers. >>>> >>>> Allowing a DSL to specify a DoFnSignature does not quite solve this >>>> problem. The DSL still needs a way to set and process the timers. It also >>>> does not solve the problem where the timers are based on input data >>>> elements, so cannot be known at pipeline construction time. However what >>>> might be more important is statically defining the timer families, and a >>>> DSL could do this by specifying a DoFnSignature (and something similar >>>> could be done with state). Also as mentioned above, this is useful to >>>> normal Beam users as well, and we shouldn't force normal users to start >>>> dealing with DoFnSignatures and DoFnInvokers. >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> Hi Max, >>>>> >>>>> wouldn't that be actually the same as >>>>> >>>>> class MyDoFn extends DoFn<String, String> { >>>>> >>>>> >>>>> @ProcessElement >>>>> public void process( >>>>> ProcessContext context) { >>>>> // "get" would register a new TimerSpec >>>>> Timer timer1 = context.getTimer("timer1"); >>>>> Timer timer2 = context.getTimer("timer2"); >>>>> timers.set(...); >>>>> timers.set(...); >>>>> } >>>>> >>>>> That is - no need to declare anything? One more concern about that - >>>>> if >>>>> we allow registration of timers (or even state) dynamically like that >>>>> it >>>>> might be harder to perform validation of pipeline upon upgrades. >>>>> >>>>> Jan >>>>> >>>>> On 10/22/19 4:47 PM, Maximilian Michels wrote: >>>>> > The idea makes sense to me. I really like that Beam gives upfront >>>>> > specs for timer and state, but it is not flexible enough for >>>>> > timer-based libraries or for users which want to dynamically >>>>> generate >>>>> > timers. >>>>> > >>>>> > I'm not sure about the proposed API yet. Shouldn't we separate the >>>>> > timer specs from setting actual timers? >>>>> > >>>>> > Suggestion: >>>>> > >>>>> > class MyDoFn extends DoFn<String, String> { >>>>> > @TimerMap TimerMap timers = TimerSpecs.timerMap(); >>>>> > >>>>> > @ProcessElement >>>>> > public void process( >>>>> > @Element String e, >>>>> > @TimerMap TimerMap timers)) { >>>>> > // "get" would register a new TimerSpec >>>>> > Timer timer1 = timers.get("timer1"); >>>>> > Timer timer2 = timers.get("timer2"); >>>>> > timers.set(...); >>>>> > timers.set(...); >>>>> > } >>>>> > >>>>> > // No args for "@OnTimer" => use generic TimerMap >>>>> > @OnTimer >>>>> > public void onTimer( >>>>> > @TimerId String timerFired, >>>>> > @Timestamp Instant timerTs, >>>>> > @TimerMap TimerMap timers) { >>>>> > // Timer firing >>>>> > ... >>>>> > // Set this timer (or another) >>>>> > Timer timer = timers.get(timerFired); >>>>> > timer.set(...); >>>>> > } >>>>> > } >>>>> > >>>>> > What do you think? >>>>> > >>>>> > -Max >>>>> > >>>>> > On 22.10.19 10:35, Jan Lukavský wrote: >>>>> >> Hi Kenn, >>>>> >> >>>>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote: >>>>> >>> This seems extremely useful. >>>>> >>> >>>>> >>> I assume you mean `@OnTimer("timers")` in your example. I would >>>>> >>> suggest that the parameter annotation be something other >>>>> >>> than @TimerId since that annotation is already used for a very >>>>> >>> similar but different purpose; they are close enough that it is >>>>> >>> tempting to pun them, but it is clearer to keep them distinct IMO. >>>>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively, >>>>> >>> keep @TimerId in the parameter list and change the declaration >>>>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more >>>>> >>> clear naming than "map". >>>>> >>> >>>>> >>> At the portability level, this API does seem to be pretty close to >>>>> a >>>>> >>> noop in terms of the messages that needs to be sent over the Fn >>>>> API, >>>>> >>> so it makes sense to loosen the protos. By the time the Fn API is >>>>> in >>>>> >>> play, all of our desires to catch errors prior to execution are >>>>> >>> irrelevant anyhow. >>>>> >>> >>>>> >>> On the other hand, I think DSLs have a different & bigger problem >>>>> >>> than this, in that they want to programmatically adjust all the >>>>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in >>>>> >>> another. Certainly some limited DSL use cases are addressed by >>>>> this, >>>>> >>> but I wouldn't take that as a primary use case for this feature. >>>>> >>> Ultimately they are probably better served by being able to >>>>> >>> explicitly author a DoFnInvoker and provide it to a variant of >>>>> >>> beam:transforms:ParDo where the do_fn field is a serialized >>>>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we >>>>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload. >>>>> >>> That would allow maximum flexibility in utilizing the portability >>>>> >>> framework. >>>>> >> >>>>> >> yes, exactly, but when DSLs are in question, we have to make sure >>>>> >> that DSLs are not bound to portability - we have to be able to >>>>> >> translate even in case of "legacy" runners as well. That might >>>>> >> complicate things a bit maybe. >>>>> >> >>>>> >> Jan >>>>> >> >>>>> >>> >>>>> >>> Kenn >>>>> >>> >>>>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <re...@google.com >>>>> >>> <mailto:re...@google.com>> wrote: >>>>> >>> >>>>> >>> BEAM-6857 documents the need for dynamic timer support in the >>>>> Beam >>>>> >>> API. I wanted to make a proposal for what this API would look >>>>> >>> like, and how to express it in the portability protos. >>>>> >>> >>>>> >>> Background: Today Beam (especially BeamJava) requires a ParDo >>>>> to >>>>> >>> statically declare all timers it accesses at compile time. For >>>>> >>> example: >>>>> >>> >>>>> >>> class MyDoFn extends DoFn<String, String> { >>>>> >>> @TimerId("timer1") TimerSpec timer1 = >>>>> >>> TimerSpecs.timer(TimeDomain(EVENT_TIME)); >>>>> >>> @TimerId("timer2") TimerSpec timer2 = >>>>> >>> TimerSpecs.timer(TimeDomain(PROCESSING_TIME)); >>>>> >>> >>>>> >>> @ProcessElement >>>>> >>> public void process(@Element String e, @TimerId("timer1") >>>>> Timer >>>>> >>> timer1, @TimerId("timer2") Timer timer2)) { >>>>> >>> timer1.set(...); >>>>> >>> timer2.set(...); >>>>> >>> } >>>>> >>> >>>>> >>> @OnTimer("timer1") public void onTimer1() { ... } >>>>> >>> @OnTimer("timer2") public void onTimer2() { ... } >>>>> >>> } >>>>> >>> >>>>> >>> This requires the author of a ParDo to know the full list of >>>>> >>> timers ahead of time, which has been problematic in many cases. >>>>> >>> One example where it causes issues is for DSLs such as >>>>> Euphoria or >>>>> >>> Scio. DSL authors usually write ParDos to interpret the code >>>>> >>> written in the high-level DSL, and so don't know ahead of time >>>>> the >>>>> >>> list of timers needed; alternatives today are quite ugly: >>>>> physical >>>>> >>> code generation or creating a single timer that multiplexes >>>>> all of >>>>> >>> the users logical timers. There are also cases where a ParDo >>>>> needs >>>>> >>> multiple distinct timers, but the set of distinct timers is >>>>> >>> controlled by the input data, and therefore not knowable in >>>>> >>> advance. The Beam timer API has been insufficient for these >>>>> use >>>>> >>> cases. >>>>> >>> >>>>> >>> I propose a new TimerMap construct, which allow a ParDo to >>>>> >>> dynamically set named timers. It's use in the Java API would >>>>> look >>>>> >>> as follows: >>>>> >>> >>>>> >>> class MyDoFn extends DoFn<String, String> { >>>>> >>> @TimerId("timers") TimerSpec timers = >>>>> >>> TimerSpecs.timerMap(TimeDomain(EVENT_TIME)); >>>>> >>> >>>>> >>> @ProcessElement >>>>> >>> public void process(@Element String e, @TimerId("timers") >>>>> >>> TimerMap timer)) { >>>>> >>> timers.set("timer1", ...); >>>>> >>> timers.set("timer2", ...); >>>>> >>> } >>>>> >>> >>>>> >>> @OnTimer("timer") public void onTimer(@TimerId String >>>>> >>> timerFired, @Timestamp Instant timerTs) { ... } >>>>> >>> } >>>>> >>> >>>>> >>> There is a new TimerSpec type to specify a TimerMap. The >>>>> TimerMap >>>>> >>> class itself allows dynamically setting multiple timers based >>>>> on a >>>>> >>> String tag argument. Each TimerMap has a single callback which >>>>> >>> when called is given the id of the timer that is currently >>>>> firing. >>>>> >>> >>>>> >>> It is allowed to have multiple TimerMap objects in a ParDo (and >>>>> >>> required if you want to have both processing-time and >>>>> event-time >>>>> >>> timers in the same ParDo). Each TimerMap is its own logical >>>>> >>> namespace. i.e. if the user sets timers with the same string >>>>> tag >>>>> >>> on different TimerMap objects the timers will not collide. >>>>> >>> >>>>> >>> Currently the portability protos were written to mirror the >>>>> Java >>>>> >>> API, expecting one TimerSpec per timer accessed by the ParDo. I >>>>> >>> suggest that we instead make TimerMap the default for >>>>> portability, >>>>> >>> and model the current behavior on top of timer map. If this >>>>> proves >>>>> >>> problematic for some runners, we could instead introduce a new >>>>> >>> TimerSpec proto to represent TimerMap. >>>>> >>> >>>>> >>> Thoughts? >>>>> >>> >>>>> >>> Reuven >>>>> >>> >>>>> >>>>