Based upon the current description, from the portability perspective we
could:
Update the timer spec map comment[1] to be:
// (Optional) A mapping of local timer families to timer specifications.
map<string, TimerSpec> timer_specs = 5;
And update the timer coder to have the timer id[2]:
// Encodes a timer containing a timestamp and a user specified payload.
// The encoding is represented as: timestamp timer_id payload
// timestamp - a big endian 8 byte integer representing
millis-since-epoch.
// The encoded representation is shifted so that the byte
representation of
// negative values are lexicographically ordered before the byte
representation
// of positive values. This is typically done by subtracting
-9223372036854775808
// from the value and encoding it as a signed big endian integer.
Example values:
//
// -9223372036854775808: 00 00 00 00 00 00 00 00
// -255: 7F FF FF FF FF FF FF 01
// -1: 7F FF FF FF FF FF FF FF
// 0: 80 00 00 00 00 00 00 00
// 1: 80 00 00 00 00 00 00 01
// 256: 80 00 00 00 00 00 01 00
// 9223372036854775807: FF FF FF FF FF FF FF FF
// timer_id - UTF8 string encoded using beam:coder:string_utf8:v1
format
// payload - user defined data, uses the component coder
// Components: Coder for the payload.
TIMER = 4 [(beam_urn) = "beam:coder:timer:v1"];
The rest is about plumbing this all through the SDKs and Runners.
1:
https://github.com/apache/beam/blob/79ba5458f9fd1a44c5c5778162e178dbee62bd64/model/pipeline/src/main/proto/beam_runner_api.proto#L372
2:
https://github.com/apache/beam/blob/79ba5458f9fd1a44c5c5778162e178dbee62bd64/model/pipeline/src/main/proto/beam_runner_api.proto#L595
On Tue, Oct 29, 2019 at 2:43 AM Jan Lukavský <[email protected]> wrote:
> Hi Reuven,
>
> I didn't propose to restrict the model. Model can (and should have)
> multiple timers per key and even dynamic. The question was if this can be
> made efficiently by using single timer (after all, the runner will probably
> have single "timer service" so no matter what we expose on the API level,
> this will end up being multiplexed in the runner). And it might have
> additional benefits of preventing bugs. But I'm not proposing to do this
> change for existing timers, that was more a question about if we really
> must force runners to be able to implement dynamic timers or we can do it
> on the translation layer generally for all runners at once.
>
> Regarding the API - which is again independent question of how it will be
> implemented - what do we need the @TimerFamily TimerSpec declaration for? I
> see two reasons:
>
> a) it holds the time domain
>
> b) it declares the DoFn as being stateful
>
> Property a) looks like it can be specified when setting the timer. b)
> could be inferred from @ProcessElement (or other method). What about
> class MyDoFn extends DoFn<String, String> {
> @ProcessElement
> // declares @TimerContext which implies stateful DoFn
> public void process(@Element String e, @TimerContext TimerContext timers))
> {
> Timer timer1 = timers.get("timer1", EVENT_TIME);
> Timer timer2 = timers.get("timer2", PROCESSING_TIME);
> timer1.set(...);
> timer2.set(...);
> }
>
> // empty name might be allowed iff the declaration contains
> @TimerContext, so that declares using dynamic timers
> @OnTimer public void onTimer(@TimerId String timerFired, @Timestamp
> Instant timerTs, @TimerContext TimerContext timers) { ... }
> }
>
> I'm still seeking the analogy with dynamic state, because in this API,
> that might become
>
> class MyDoFn extends DoFn<String, String> {
> @ProcessElement
> public void process(@Element String e, @StateContext StateContext
> states)) {
> ValueState state = states.get("myDynamicState", StateSpec...);
> state.get(...)
> state.set(...)
> }
> }
>
> The point is that there seems to be no use for any declaration like
> @TimerFamily in case of dynamic state, because there is no domain. It would
> feel weird to have to declare something for dynamic timers and not have to
> do it for state.
>
> Jan
>
> On 10/29/19 6:56 AM, Reuven Lax wrote:
>
> Just to circle back around, after the discussion on this thread I propose
> modifying the proposed API as follows:
>
> class MyDoFn extends DoFn<String, String> {
> @TimerFamily("timers") TimerSpec timers =
> TimerSpecs.timerFamily(TimeDomain(EVENT_TIME));
>
> @ProcessElement
> public void process(@Element String e, @TimerFamily("timers") TimerMap
> timers)) {
> timers.set("timer1", ...);
> timers.set("timer2", ...);
> }
>
> @OnTimer("timer") public void onTimer(@TimerId String timerFired,
> @Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) { ...
> }
> }
>
> Discussions around exposing DoFnSignature and DoFnInvoker to DSL authors
> are a bit independent (though not completely so, as it does relate), so I
> suggest splitting that into a separate discussion.
>
> Reuven
>
> On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax <[email protected]> wrote:
>
>>
>>
>> On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <[email protected]> wrote:
>>
>>> Hi Reuven,
>>>
>>> yes, if this change is intended to be used by end users, then
>>> DoFnSignatures cannot be used, agree on that. Regarding the relationship
>>> with dynamic state - I agree that this is separate problem, but because it
>>> is close enough, we should know how we want to deal with that. Because
>>> state and timers share some functionality (after all timers need state to
>>> be fault tolerant), these API should IMO share the same logic. Whatever
>>> solution chosen to expose dynamic timers, it should extend to dynamic state.
>>>
>>> I'd like to stop a little with the premise that users want dynamic
>>> timers (that is timers whose *name* - and therefore behavior - is
>>> determined by incoming data). Could this case be modeled so that the timer
>>> actually has one more (implicit) state variable that actually holds
>>> collection of tuples (timestamp, name)? Then the timer would be invoked at
>>> given (minimum of all currently set) timestamps with respective name? The
>>> question here probably is - can this have performance impact? That is to
>>> say - can any runner actually do anything different from this in the sense
>>> of time complexity of the algorithm?
>>>
>>
>> Yes - you could always multiplex many timers one one. This is what some
>> users do today, but it tends to be very inefficient and also complex. The
>> Beam model requires runners to support dynamic timers per key (e.g. that
>> how windowing is implemented - each window has a separate timer), so
>> there's no reason not to expose this to users.
>>
>>> I'm a little afraid if actually letting users define data-driven timers
>>> might not be too restrictive for some runners. Yes, runners that don't have
>>> this option would probably be able to resort to the logic described above,
>>> but if this work could be reasonably done for all runners, then we wouldn't
>>> force runners to actually implement it. And, the API could look as if the
>>> timers were actually dynamic.
>>>
>>> Jan
>>>
>>> P.S. If dynamic (and therefore any number) of timers can be actually
>>> implemented using single timer, that might be interesting pattern, because
>>> single timer per (window, key) has many nice properties, like it implicitly
>>> avoids situation where timer invocation is not ordered ([BEAM-7520]), which
>>> seems to issue for multiple runners (samza, portable flink).
>>>
>> BEAM-7520 is simply an implementation bug. I don't think it makes sense
>> to fix a bug by restricting the model.
>>
>>
>>> On 10/22/19 6:52 PM, Reuven Lax wrote:
>>>
>>> Kenn:
>>> +1 to using TimerFamily instead of TimerId and TimerMap.
>>>
>>> Jan:
>>> This is definitely not just for DSLs. I've definitely seen cases where
>>> the user wants different timers based on input data, so they cannot be
>>> defined statically. As a thought experiment: one stated goal of state +
>>> timers was to provide the low-level tools we use to implement windowing.
>>> However to implement windowing you need a dynamic set of timers, not just a
>>> single one. Now most users don't need to reimplement windowing (though we
>>> have had some users who had that need, when they wanted something slightly
>>> different than what native Beam windowing provided), however the need for
>>> dynamic timers is not unheard of.
>>>
>>> +1 to allowing dynamic state. However I think this is separate enough
>>> from timers that it doesn't need to be coupled in this discussion. Dynamic
>>> state also raises the wrinkle of pipeline compatibility (as you mentioned),
>>> which I think is a bit less of an issue for dynamic timers.
>>>
>>> Allowing a DSL to specify a DoFnSignature does not quite solve this
>>> problem. The DSL still needs a way to set and process the timers. It also
>>> does not solve the problem where the timers are based on input data
>>> elements, so cannot be known at pipeline construction time. However what
>>> might be more important is statically defining the timer families, and a
>>> DSL could do this by specifying a DoFnSignature (and something similar
>>> could be done with state). Also as mentioned above, this is useful to
>>> normal Beam users as well, and we shouldn't force normal users to start
>>> dealing with DoFnSignatures and DoFnInvokers.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <[email protected]> wrote:
>>>
>>>> Hi Max,
>>>>
>>>> wouldn't that be actually the same as
>>>>
>>>> class MyDoFn extends DoFn<String, String> {
>>>>
>>>>
>>>> @ProcessElement
>>>> public void process(
>>>> ProcessContext context) {
>>>> // "get" would register a new TimerSpec
>>>> Timer timer1 = context.getTimer("timer1");
>>>> Timer timer2 = context.getTimer("timer2");
>>>> timers.set(...);
>>>> timers.set(...);
>>>> }
>>>>
>>>> That is - no need to declare anything? One more concern about that - if
>>>> we allow registration of timers (or even state) dynamically like that
>>>> it
>>>> might be harder to perform validation of pipeline upon upgrades.
>>>>
>>>> Jan
>>>>
>>>> On 10/22/19 4:47 PM, Maximilian Michels wrote:
>>>> > The idea makes sense to me. I really like that Beam gives upfront
>>>> > specs for timer and state, but it is not flexible enough for
>>>> > timer-based libraries or for users which want to dynamically generate
>>>> > timers.
>>>> >
>>>> > I'm not sure about the proposed API yet. Shouldn't we separate the
>>>> > timer specs from setting actual timers?
>>>> >
>>>> > Suggestion:
>>>> >
>>>> > class MyDoFn extends DoFn<String, String> {
>>>> > @TimerMap TimerMap timers = TimerSpecs.timerMap();
>>>> >
>>>> > @ProcessElement
>>>> > public void process(
>>>> > @Element String e,
>>>> > @TimerMap TimerMap timers)) {
>>>> > // "get" would register a new TimerSpec
>>>> > Timer timer1 = timers.get("timer1");
>>>> > Timer timer2 = timers.get("timer2");
>>>> > timers.set(...);
>>>> > timers.set(...);
>>>> > }
>>>> >
>>>> > // No args for "@OnTimer" => use generic TimerMap
>>>> > @OnTimer
>>>> > public void onTimer(
>>>> > @TimerId String timerFired,
>>>> > @Timestamp Instant timerTs,
>>>> > @TimerMap TimerMap timers) {
>>>> > // Timer firing
>>>> > ...
>>>> > // Set this timer (or another)
>>>> > Timer timer = timers.get(timerFired);
>>>> > timer.set(...);
>>>> > }
>>>> > }
>>>> >
>>>> > What do you think?
>>>> >
>>>> > -Max
>>>> >
>>>> > On 22.10.19 10:35, Jan Lukavský wrote:
>>>> >> Hi Kenn,
>>>> >>
>>>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>>>> >>> This seems extremely useful.
>>>> >>>
>>>> >>> I assume you mean `@OnTimer("timers")` in your example. I would
>>>> >>> suggest that the parameter annotation be something other
>>>> >>> than @TimerId since that annotation is already used for a very
>>>> >>> similar but different purpose; they are close enough that it is
>>>> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
>>>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>>>> >>> keep @TimerId in the parameter list and change the declaration
>>>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
>>>> >>> clear naming than "map".
>>>> >>>
>>>> >>> At the portability level, this API does seem to be pretty close to
>>>> a
>>>> >>> noop in terms of the messages that needs to be sent over the Fn
>>>> API,
>>>> >>> so it makes sense to loosen the protos. By the time the Fn API is
>>>> in
>>>> >>> play, all of our desires to catch errors prior to execution are
>>>> >>> irrelevant anyhow.
>>>> >>>
>>>> >>> On the other hand, I think DSLs have a different & bigger problem
>>>> >>> than this, in that they want to programmatically adjust all the
>>>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
>>>> >>> another. Certainly some limited DSL use cases are addressed by
>>>> this,
>>>> >>> but I wouldn't take that as a primary use case for this feature.
>>>> >>> Ultimately they are probably better served by being able to
>>>> >>> explicitly author a DoFnInvoker and provide it to a variant of
>>>> >>> beam:transforms:ParDo where the do_fn field is a serialized
>>>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we
>>>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
>>>> >>> That would allow maximum flexibility in utilizing the portability
>>>> >>> framework.
>>>> >>
>>>> >> yes, exactly, but when DSLs are in question, we have to make sure
>>>> >> that DSLs are not bound to portability - we have to be able to
>>>> >> translate even in case of "legacy" runners as well. That might
>>>> >> complicate things a bit maybe.
>>>> >>
>>>> >> Jan
>>>> >>
>>>> >>>
>>>> >>> Kenn
>>>> >>>
>>>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <[email protected]
>>>> >>> <mailto:[email protected]>> wrote:
>>>> >>>
>>>> >>> BEAM-6857 documents the need for dynamic timer support in the
>>>> Beam
>>>> >>> API. I wanted to make a proposal for what this API would look
>>>> >>> like, and how to express it in the portability protos.
>>>> >>>
>>>> >>> Background: Today Beam (especially BeamJava) requires a ParDo to
>>>> >>> statically declare all timers it accesses at compile time. For
>>>> >>> example:
>>>> >>>
>>>> >>> class MyDoFn extends DoFn<String, String> {
>>>> >>> @TimerId("timer1") TimerSpec timer1 =
>>>> >>> TimerSpecs.timer(TimeDomain(EVENT_TIME));
>>>> >>> @TimerId("timer2") TimerSpec timer2 =
>>>> >>> TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>>>> >>>
>>>> >>> @ProcessElement
>>>> >>> public void process(@Element String e, @TimerId("timer1")
>>>> Timer
>>>> >>> timer1, @TimerId("timer2") Timer timer2)) {
>>>> >>> timer1.set(...);
>>>> >>> timer2.set(...);
>>>> >>> }
>>>> >>>
>>>> >>> @OnTimer("timer1") public void onTimer1() { ... }
>>>> >>> @OnTimer("timer2") public void onTimer2() { ... }
>>>> >>> }
>>>> >>>
>>>> >>> This requires the author of a ParDo to know the full list of
>>>> >>> timers ahead of time, which has been problematic in many cases.
>>>> >>> One example where it causes issues is for DSLs such as Euphoria
>>>> or
>>>> >>> Scio. DSL authors usually write ParDos to interpret the code
>>>> >>> written in the high-level DSL, and so don't know ahead of time
>>>> the
>>>> >>> list of timers needed; alternatives today are quite ugly:
>>>> physical
>>>> >>> code generation or creating a single timer that multiplexes all
>>>> of
>>>> >>> the users logical timers. There are also cases where a ParDo
>>>> needs
>>>> >>> multiple distinct timers, but the set of distinct timers is
>>>> >>> controlled by the input data, and therefore not knowable in
>>>> >>> advance. The Beam timer API has been insufficient for these use
>>>> >>> cases.
>>>> >>>
>>>> >>> I propose a new TimerMap construct, which allow a ParDo to
>>>> >>> dynamically set named timers. It's use in the Java API would
>>>> look
>>>> >>> as follows:
>>>> >>>
>>>> >>> class MyDoFn extends DoFn<String, String> {
>>>> >>> @TimerId("timers") TimerSpec timers =
>>>> >>> TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>>>> >>>
>>>> >>> @ProcessElement
>>>> >>> public void process(@Element String e, @TimerId("timers")
>>>> >>> TimerMap timer)) {
>>>> >>> timers.set("timer1", ...);
>>>> >>> timers.set("timer2", ...);
>>>> >>> }
>>>> >>>
>>>> >>> @OnTimer("timer") public void onTimer(@TimerId String
>>>> >>> timerFired, @Timestamp Instant timerTs) { ... }
>>>> >>> }
>>>> >>>
>>>> >>> There is a new TimerSpec type to specify a TimerMap. The
>>>> TimerMap
>>>> >>> class itself allows dynamically setting multiple timers based
>>>> on a
>>>> >>> String tag argument. Each TimerMap has a single callback which
>>>> >>> when called is given the id of the timer that is currently
>>>> firing.
>>>> >>>
>>>> >>> It is allowed to have multiple TimerMap objects in a ParDo (and
>>>> >>> required if you want to have both processing-time and event-time
>>>> >>> timers in the same ParDo). Each TimerMap is its own logical
>>>> >>> namespace. i.e. if the user sets timers with the same string tag
>>>> >>> on different TimerMap objects the timers will not collide.
>>>> >>>
>>>> >>> Currently the portability protos were written to mirror the Java
>>>> >>> API, expecting one TimerSpec per timer accessed by the ParDo. I
>>>> >>> suggest that we instead make TimerMap the default for
>>>> portability,
>>>> >>> and model the current behavior on top of timer map. If this
>>>> proves
>>>> >>> problematic for some runners, we could instead introduce a new
>>>> >>> TimerSpec proto to represent TimerMap.
>>>> >>>
>>>> >>> Thoughts?
>>>> >>>
>>>> >>> Reuven
>>>> >>>
>>>>
>>>