I had a discussion with Rehman last week and we discovered that the TimersMap related tests were not running for all runners because they were not tagged as part of the ValidatesRunner category. I opened a PR [1] to enable this, so please someone help me with the review/merge.
I took a look just for curiosity and discovered that they are only passing for Direct runner and for the classic Flink runner in batch mode. They are not passing for Dataflow [2][3] and for the Portable Flink runner, so probably worth to reopen the issue to investigate/fix. [1] https://github.com/apache/beam/pull/10747 [2] https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/ [3] https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/ On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> wrote: > Yes. For now we exclude the flink runner, but fixing this should be fairly > trivial. > > On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]> wrote: > >> The Flink Runner was allowing to set a timer multiple times before we >> made it comply with the Beam semantics of overwriting past invocations. >> I wouldn't be surprised if the Spark Runner never addressed this. Flink >> and Spark itself allow for a timer to be set to multiple times. In order >> to fix this for Beam, the Flink Runner has to maintain a checkpointed >> map which sits outside of its builtin TimerService. >> >> As far as I can see, multiple timer families are currently not supported >> in the Flink Runner due to the map not taking the family name into >> account. This can be easily fixed though. >> >> -Max >> >> On 24.01.20 21:31, Reuven Lax wrote: >> > The new timer family is in the portability protos. I think >> TimerReceiver >> > needs to be updated to set it though (I think a 1-line change). >> > >> > The TimerInternals class that runners implement today already handles >> > dynamic timers, so most of the work was in the Beam SDK to provide an >> > API that allows users to access this feature. >> > >> > The main work needed in the runner was to take in account the timer >> > family. Beam semantics say that if a timer is set twice with the same >> > id, then the second timer overwrites the first. Several runners >> > therefore had maps from timer id -> timer. However since the >> > timer family scopes the timers, we now allow two timers with the same >> id >> > as long as the timer families are different. Runners had to be updated >> > to include the timer family id in the map keys. >> > >> > Surprisingly, the new TimerMap tests seem to pass on Spark >> > ValidatesRunner, even though the Spark runner wasn't updated! I wonder >> > if this means that the Spark runner was incorrectly implementing the >> > Beam semantics before, and setTimer was not overwriting timers with the >> > same id? >> > >> > Reuven >> > >> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > This looks great, thanks for the contribution Rehman! >> > >> > I have some questions (note I have not looked at the code at all). >> > >> > - Is this working for both portable and non portable runners? >> > - What do other runners need to implement to support this (e.g. >> Spark)? >> > >> > Maybe worth to add this to the website Compatibility Matrix. >> > >> > Regards, >> > Ismaël >> > >> > >> > On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali >> > <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Thank you Reuven for the guidance throughout the development >> > process. I am delighted to contribute my two cents to the Beam >> > project. >> > >> > Looking forward to more active contributions. >> > >> > * >> > * >> > >> > *Thanks & Regards____* >> > >> > >> > >> > *Rehman Murad Ali* >> > Software Engineer >> > Mobile: +92 3452076766 <+92%20345%202076766> >> <tel:+92%20345%202076766> >> > Skype: rehman.muradali >> > >> > >> > >> > On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Thanks to a lot of hard work by Rehman, Beam now supports >> > dynamic timers. As a reminder, this was discussed on the dev >> > list some time back. >> > >> > As background, previously one had to statically declare all >> > timers in your code. So if you wanted to have two timers, >> > you needed to create two timer variables and two callbacks - >> > one for each timer. A number of users kept hitting stumbling >> > blocks where they needed a dynamic set of timers (often >> > based on the element), which was not supported in Beam. The >> > workarounds were quite ugly and complicated. >> > >> > The new support allows declaring a TimerMap, which is a map >> > of timers. Each TimerMap is scoped by a family name, so you >> > can create multiple TimerMaps each with its own callback. >> > The use looks as follows: >> > >> > class MyDoFn extends DoFn<...> { >> > @TimerFamily("timers") >> > private final TimerSpec timerMap = >> > TimerSpecs.timerMap(TimeDomain.EVENT_TIME); >> > >> > @ProcessElement >> > public void process(@TimerFamily("timers") TimerMap >> > timers, @Element Type e) { >> > timers.set("mainTimer", timestamp); >> > timers.set("actionType" + e.getActionType(), >> timestamp); >> > } >> > >> > @OnTimerFamily . >> > public void onTimer(@TimerId String timerId) { >> > System.out.println("Timer fired. id: " + timerId); >> > } >> > } >> > >> > This currently works for the Flink and the Dataflow runners. >> > >> > Thank you Rehman for getting this done! Beam users will find >> > it very valuable. >> > >> > Reuven >> > >> >
