The Flink Runner was allowing to set a timer multiple times before we made it comply with the Beam semantics of overwriting past invocations. I wouldn't be surprised if the Spark Runner never addressed this. Flink and Spark itself allow for a timer to be set to multiple times. In order to fix this for Beam, the Flink Runner has to maintain a checkpointed map which sits outside of its builtin TimerService.

As far as I can see, multiple timer families are currently not supported in the Flink Runner due to the map not taking the family name into account. This can be easily fixed though.

-Max

On 24.01.20 21:31, Reuven Lax wrote:
The new timer family is in the portability protos. I think TimerReceiver needs to be updated to set it though (I think a 1-line change).

The TimerInternals class that runners implement today already handles dynamic timers, so most of the work was in the Beam SDK  to provide an API that allows users to access this feature.

The main work needed in the runner was to take in account the timer family. Beam semantics say that if a timer is set twice with the same id, then the second timer overwrites the first.  Several runners therefore had maps from timer id -> timer. However since the timer family scopes the timers, we now allow two timers with the same id as long as the timer families are different. Runners had to be updated to include the timer family id in the map keys.

Surprisingly, the new TimerMap tests seem to pass on Spark ValidatesRunner, even though the Spark runner wasn't updated! I wonder if this means that the Spark runner was incorrectly implementing the Beam semantics before, and setTimer was not overwriting timers with the same id?

Reuven

On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <ieme...@gmail.com <mailto:ieme...@gmail.com>> wrote:

    This looks great, thanks for the contribution Rehman!

    I have some questions (note I have not looked at the code at all).

    - Is this working for both portable and non portable runners?
    - What do other runners need to implement to support this (e.g. Spark)?

    Maybe worth to add this to the website Compatibility Matrix.

    Regards,
    Ismaël


    On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
    <rehman.murad...@venturedive.com
    <mailto:rehman.murad...@venturedive.com>> wrote:

        Thank you Reuven for the guidance throughout the development
        process. I am delighted to contribute my two cents to the Beam
        project.

        Looking forward to more active contributions.

        *
        *

        *Thanks & Regards____*



        *Rehman Murad Ali*
        Software Engineer
        Mobile: +92 3452076766 <tel:+92%20345%202076766>
        Skype: rehman.muradali



        On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <re...@google.com
        <mailto:re...@google.com>> wrote:

            Thanks to a lot of hard work by Rehman, Beam now supports
            dynamic timers. As a reminder, this was discussed on the dev
            list some time back.

            As background, previously one had to statically declare all
            timers in your code. So if you wanted to have two timers,
            you needed to create two timer variables and two callbacks -
            one for each timer. A number of users kept hitting stumbling
            blocks where they needed a dynamic set of timers (often
            based on the element), which was not supported in Beam. The
            workarounds were quite ugly and complicated.

            The new support allows declaring a TimerMap, which is a map
            of timers. Each TimerMap is scoped by a family name, so you
            can create multiple TimerMaps each with its own callback.
            The use looks as follows:

            class MyDoFn extends DoFn<...> {
                @TimerFamily("timers")
                private final TimerSpec timerMap =
            TimerSpecs.timerMap(TimeDomain.EVENT_TIME);

                @ProcessElement
                 public void process(@TimerFamily("timers") TimerMap
            timers, @Element Type e) {
                    timers.set("mainTimer", timestamp);
                    timers.set("actionType" + e.getActionType(), timestamp);
                }

               @OnTimerFamily .
               public void onTimer(@TimerId String timerId) {
                  System.out.println("Timer fired. id: " + timerId);
               }
            }

            This currently works for the Flink and the Dataflow runners.

            Thank you Rehman for getting this done! Beam users will find
            it very valuable.

            Reuven

Reply via email to