The new timer family is in the portability protos. I think TimerReceiver
needs to be updated to set it though (I think a 1-line change).

The TimerInternals class that runners implement today already handles
dynamic timers, so most of the work was in the Beam SDK  to provide an API
that allows users to access this feature.

The main work needed in the runner was to take in account the timer family.
Beam semantics say that if a timer is set twice with the same id, then the
second timer overwrites the first.  Several runners therefore had maps from
timer id -> timer. However since the timer family scopes the timers, we now
allow two timers with the same id as long as the timer families are
different. Runners had to be updated to include the timer family id in the
map keys.

Surprisingly, the new TimerMap tests seem to pass on Spark ValidatesRunner,
even though the Spark runner wasn't updated! I wonder if this means that
the Spark runner was incorrectly implementing the Beam semantics before,
and setTimer was not overwriting timers with the same id?

Reuven

On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected]> wrote:

> This looks great, thanks for the contribution Rehman!
>
> I have some questions (note I have not looked at the code at all).
>
> - Is this working for both portable and non portable runners?
> - What do other runners need to implement to support this (e.g. Spark)?
>
> Maybe worth to add this to the website Compatibility Matrix.
>
> Regards,
> Ismaël
>
>
> On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali <
> [email protected]> wrote:
>
>> Thank you Reuven for the guidance throughout the development process. I
>> am delighted to contribute my two cents to the Beam project.
>>
>> Looking forward to more active contributions.
>>
>>
>> *Thanks & Regards*
>>
>>
>>
>> *Rehman Murad Ali*
>> Software Engineer
>> Mobile: +92 3452076766 <+92%20345%202076766>
>> Skype: rehman.muradali
>>
>>
>> On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <[email protected]> wrote:
>>
>>> Thanks to a lot of hard work by Rehman, Beam now supports dynamic
>>> timers. As a reminder, this was discussed on the dev list some time back.
>>>
>>> As background, previously one had to statically declare all timers in
>>> your code. So if you wanted to have two timers, you needed to create two
>>> timer variables and two callbacks - one for each timer. A number of users
>>> kept hitting stumbling blocks where they needed a dynamic set of timers
>>> (often based on the element), which was not supported in Beam. The
>>> workarounds were quite ugly and complicated.
>>>
>>> The new support allows declaring a TimerMap, which is a map of timers.
>>> Each TimerMap is scoped by a family name, so you can create multiple
>>> TimerMaps each with its own callback. The use looks as follows:
>>>
>>> class MyDoFn extends DoFn<...> {
>>>    @TimerFamily("timers")
>>>    private final TimerSpec timerMap =
>>> TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>
>>>    @ProcessElement
>>>     public void process(@TimerFamily("timers") TimerMap timers, @Element
>>> Type e) {
>>>        timers.set("mainTimer", timestamp);
>>>        timers.set("actionType" + e.getActionType(), timestamp);
>>>    }
>>>
>>>   @OnTimerFamily .
>>>   public void onTimer(@TimerId String timerId) {
>>>      System.out.println("Timer fired. id: " + timerId);
>>>   }
>>> }
>>>
>>> This currently works for the Flink and the Dataflow runners.
>>>
>>> Thank you Rehman for getting this done! Beam users will find it very
>>> valuable.
>>>
>>> Reuven
>>>
>>

Reply via email to