Yes. For now we exclude the flink runner, but fixing this should be fairly
trivial.

On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]> wrote:

> The Flink Runner was allowing to set a timer multiple times before we
> made it comply with the Beam semantics of overwriting past invocations.
> I wouldn't be surprised if the Spark Runner never addressed this. Flink
> and Spark itself allow for a timer to be set to multiple times. In order
> to fix this for Beam, the Flink Runner has to maintain a checkpointed
> map which sits outside of its builtin TimerService.
>
> As far as I can see, multiple timer families are currently not supported
> in the Flink Runner due to the map not taking the family name into
> account. This can be easily fixed though.
>
> -Max
>
> On 24.01.20 21:31, Reuven Lax wrote:
> > The new timer family is in the portability protos. I think TimerReceiver
> > needs to be updated to set it though (I think a 1-line change).
> >
> > The TimerInternals class that runners implement today already handles
> > dynamic timers, so most of the work was in the Beam SDK  to provide an
> > API that allows users to access this feature.
> >
> > The main work needed in the runner was to take in account the timer
> > family. Beam semantics say that if a timer is set twice with the same
> > id, then the second timer overwrites the first.  Several runners
> > therefore had maps from timer id -> timer. However since the
> > timer family scopes the timers, we now allow two timers with the same id
> > as long as the timer families are different. Runners had to be updated
> > to include the timer family id in the map keys.
> >
> > Surprisingly, the new TimerMap tests seem to pass on Spark
> > ValidatesRunner, even though the Spark runner wasn't updated! I wonder
> > if this means that the Spark runner was incorrectly implementing the
> > Beam semantics before, and setTimer was not overwriting timers with the
> > same id?
> >
> > Reuven
> >
> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> >     This looks great, thanks for the contribution Rehman!
> >
> >     I have some questions (note I have not looked at the code at all).
> >
> >     - Is this working for both portable and non portable runners?
> >     - What do other runners need to implement to support this (e.g.
> Spark)?
> >
> >     Maybe worth to add this to the website Compatibility Matrix.
> >
> >     Regards,
> >     Ismaël
> >
> >
> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
> >     <[email protected]
> >     <mailto:[email protected]>> wrote:
> >
> >         Thank you Reuven for the guidance throughout the development
> >         process. I am delighted to contribute my two cents to the Beam
> >         project.
> >
> >         Looking forward to more active contributions.
> >
> >         *
> >         *
> >
> >         *Thanks & Regards____*
> >
> >
> >
> >         *Rehman Murad Ali*
> >         Software Engineer
> >         Mobile: +92 3452076766 <+92%20345%202076766>
> <tel:+92%20345%202076766>
> >         Skype: rehman.muradali
> >
> >
> >
> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <[email protected]
> >         <mailto:[email protected]>> wrote:
> >
> >             Thanks to a lot of hard work by Rehman, Beam now supports
> >             dynamic timers. As a reminder, this was discussed on the dev
> >             list some time back.
> >
> >             As background, previously one had to statically declare all
> >             timers in your code. So if you wanted to have two timers,
> >             you needed to create two timer variables and two callbacks -
> >             one for each timer. A number of users kept hitting stumbling
> >             blocks where they needed a dynamic set of timers (often
> >             based on the element), which was not supported in Beam. The
> >             workarounds were quite ugly and complicated.
> >
> >             The new support allows declaring a TimerMap, which is a map
> >             of timers. Each TimerMap is scoped by a family name, so you
> >             can create multiple TimerMaps each with its own callback.
> >             The use looks as follows:
> >
> >             class MyDoFn extends DoFn<...> {
> >                 @TimerFamily("timers")
> >                 private final TimerSpec timerMap =
> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
> >
> >                 @ProcessElement
> >                  public void process(@TimerFamily("timers") TimerMap
> >             timers, @Element Type e) {
> >                     timers.set("mainTimer", timestamp);
> >                     timers.set("actionType" + e.getActionType(),
> timestamp);
> >                 }
> >
> >                @OnTimerFamily .
> >                public void onTimer(@TimerId String timerId) {
> >                   System.out.println("Timer fired. id: " + timerId);
> >                }
> >             }
> >
> >             This currently works for the Flink and the Dataflow runners.
> >
> >             Thank you Rehman for getting this done! Beam users will find
> >             it very valuable.
> >
> >             Reuven
> >
>

Reply via email to