Yes. For now we exclude the flink runner, but fixing this should be fairly trivial.
On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]> wrote: > The Flink Runner was allowing to set a timer multiple times before we > made it comply with the Beam semantics of overwriting past invocations. > I wouldn't be surprised if the Spark Runner never addressed this. Flink > and Spark itself allow for a timer to be set to multiple times. In order > to fix this for Beam, the Flink Runner has to maintain a checkpointed > map which sits outside of its builtin TimerService. > > As far as I can see, multiple timer families are currently not supported > in the Flink Runner due to the map not taking the family name into > account. This can be easily fixed though. > > -Max > > On 24.01.20 21:31, Reuven Lax wrote: > > The new timer family is in the portability protos. I think TimerReceiver > > needs to be updated to set it though (I think a 1-line change). > > > > The TimerInternals class that runners implement today already handles > > dynamic timers, so most of the work was in the Beam SDK to provide an > > API that allows users to access this feature. > > > > The main work needed in the runner was to take in account the timer > > family. Beam semantics say that if a timer is set twice with the same > > id, then the second timer overwrites the first. Several runners > > therefore had maps from timer id -> timer. However since the > > timer family scopes the timers, we now allow two timers with the same id > > as long as the timer families are different. Runners had to be updated > > to include the timer family id in the map keys. > > > > Surprisingly, the new TimerMap tests seem to pass on Spark > > ValidatesRunner, even though the Spark runner wasn't updated! I wonder > > if this means that the Spark runner was incorrectly implementing the > > Beam semantics before, and setTimer was not overwriting timers with the > > same id? > > > > Reuven > > > > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected] > > <mailto:[email protected]>> wrote: > > > > This looks great, thanks for the contribution Rehman! > > > > I have some questions (note I have not looked at the code at all). > > > > - Is this working for both portable and non portable runners? > > - What do other runners need to implement to support this (e.g. > Spark)? > > > > Maybe worth to add this to the website Compatibility Matrix. > > > > Regards, > > Ismaël > > > > > > On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali > > <[email protected] > > <mailto:[email protected]>> wrote: > > > > Thank you Reuven for the guidance throughout the development > > process. I am delighted to contribute my two cents to the Beam > > project. > > > > Looking forward to more active contributions. > > > > * > > * > > > > *Thanks & Regards____* > > > > > > > > *Rehman Murad Ali* > > Software Engineer > > Mobile: +92 3452076766 <+92%20345%202076766> > <tel:+92%20345%202076766> > > Skype: rehman.muradali > > > > > > > > On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <[email protected] > > <mailto:[email protected]>> wrote: > > > > Thanks to a lot of hard work by Rehman, Beam now supports > > dynamic timers. As a reminder, this was discussed on the dev > > list some time back. > > > > As background, previously one had to statically declare all > > timers in your code. So if you wanted to have two timers, > > you needed to create two timer variables and two callbacks - > > one for each timer. A number of users kept hitting stumbling > > blocks where they needed a dynamic set of timers (often > > based on the element), which was not supported in Beam. The > > workarounds were quite ugly and complicated. > > > > The new support allows declaring a TimerMap, which is a map > > of timers. Each TimerMap is scoped by a family name, so you > > can create multiple TimerMaps each with its own callback. > > The use looks as follows: > > > > class MyDoFn extends DoFn<...> { > > @TimerFamily("timers") > > private final TimerSpec timerMap = > > TimerSpecs.timerMap(TimeDomain.EVENT_TIME); > > > > @ProcessElement > > public void process(@TimerFamily("timers") TimerMap > > timers, @Element Type e) { > > timers.set("mainTimer", timestamp); > > timers.set("actionType" + e.getActionType(), > timestamp); > > } > > > > @OnTimerFamily . > > public void onTimer(@TimerId String timerId) { > > System.out.println("Timer fired. id: " + timerId); > > } > > } > > > > This currently works for the Flink and the Dataflow runners. > > > > Thank you Rehman for getting this done! Beam users will find > > it very valuable. > > > > Reuven > > >
