I took a look, and I think this was a simple bug. Testing a fix now. A larger question is how to support this in the portability layer. Right now portability assumes that each timer id corresponds to a logical input PCollection, but that assumption no longer works as we now support a dynamic set of timers, each with their own id. We could instead model each timer family as a PColleciton, but the FnApiRunner would need to dynamically get the timer id in order to invoke it, and today it statically reads the timer id from the PCollection name.
Reuven On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <[email protected]> wrote: > Thanks for finding this. Hopefully the bug is easy .to fix. The tests > indeed never ran on any runner except for the DirectRunner, which is > something I should've noticed in the code review. > > Reuven > > On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <[email protected]> wrote: > >> I had a discussion with Rehman last week and we discovered that the >> TimersMap >> related tests were not running for all runners because they were not >> tagged as >> part of the ValidatesRunner category. I opened a PR [1] to enable this, so >> please someone help me with the review/merge. >> >> I took a look just for curiosity and discovered that they are only >> passing for >> Direct runner and for the classic Flink runner in batch mode. They are not >> passing for Dataflow [2][3] and for the Portable Flink runner, so >> probably worth >> to reopen the issue to investigate/fix. >> >> [1] https://github.com/apache/beam/pull/10747 >> [2] >> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/ >> [3] >> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/ >> >> >> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> wrote: >> >>> Yes. For now we exclude the flink runner, but fixing this should be >>> fairly trivial. >>> >>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]> >>> wrote: >>> >>>> The Flink Runner was allowing to set a timer multiple times before we >>>> made it comply with the Beam semantics of overwriting past invocations. >>>> I wouldn't be surprised if the Spark Runner never addressed this. Flink >>>> and Spark itself allow for a timer to be set to multiple times. In >>>> order >>>> to fix this for Beam, the Flink Runner has to maintain a checkpointed >>>> map which sits outside of its builtin TimerService. >>>> >>>> As far as I can see, multiple timer families are currently not >>>> supported >>>> in the Flink Runner due to the map not taking the family name into >>>> account. This can be easily fixed though. >>>> >>>> -Max >>>> >>>> On 24.01.20 21:31, Reuven Lax wrote: >>>> > The new timer family is in the portability protos. I think >>>> TimerReceiver >>>> > needs to be updated to set it though (I think a 1-line change). >>>> > >>>> > The TimerInternals class that runners implement today already handles >>>> > dynamic timers, so most of the work was in the Beam SDK to provide >>>> an >>>> > API that allows users to access this feature. >>>> > >>>> > The main work needed in the runner was to take in account the timer >>>> > family. Beam semantics say that if a timer is set twice with the same >>>> > id, then the second timer overwrites the first. Several runners >>>> > therefore had maps from timer id -> timer. However since the >>>> > timer family scopes the timers, we now allow two timers with the same >>>> id >>>> > as long as the timer families are different. Runners had to be >>>> updated >>>> > to include the timer family id in the map keys. >>>> > >>>> > Surprisingly, the new TimerMap tests seem to pass on Spark >>>> > ValidatesRunner, even though the Spark runner wasn't updated! I >>>> wonder >>>> > if this means that the Spark runner was incorrectly implementing the >>>> > Beam semantics before, and setTimer was not overwriting timers with >>>> the >>>> > same id? >>>> > >>>> > Reuven >>>> > >>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected] >>>> > <mailto:[email protected]>> wrote: >>>> > >>>> > This looks great, thanks for the contribution Rehman! >>>> > >>>> > I have some questions (note I have not looked at the code at all). >>>> > >>>> > - Is this working for both portable and non portable runners? >>>> > - What do other runners need to implement to support this (e.g. >>>> Spark)? >>>> > >>>> > Maybe worth to add this to the website Compatibility Matrix. >>>> > >>>> > Regards, >>>> > Ismaël >>>> > >>>> > >>>> > On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali >>>> > <[email protected] >>>> > <mailto:[email protected]>> wrote: >>>> > >>>> > Thank you Reuven for the guidance throughout the development >>>> > process. I am delighted to contribute my two cents to the Beam >>>> > project. >>>> > >>>> > Looking forward to more active contributions. >>>> > >>>> > * >>>> > * >>>> > >>>> > *Thanks & Regards____* >>>> > >>>> > >>>> > >>>> > *Rehman Murad Ali* >>>> > Software Engineer >>>> > Mobile: +92 3452076766 <+92%20345%202076766> >>>> <tel:+92%20345%202076766> >>>> > Skype: rehman.muradali >>>> > >>>> > >>>> > >>>> > On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <[email protected] >>>> > <mailto:[email protected]>> wrote: >>>> > >>>> > Thanks to a lot of hard work by Rehman, Beam now supports >>>> > dynamic timers. As a reminder, this was discussed on the >>>> dev >>>> > list some time back. >>>> > >>>> > As background, previously one had to statically declare >>>> all >>>> > timers in your code. So if you wanted to have two timers, >>>> > you needed to create two timer variables and two >>>> callbacks - >>>> > one for each timer. A number of users kept hitting >>>> stumbling >>>> > blocks where they needed a dynamic set of timers (often >>>> > based on the element), which was not supported in Beam. >>>> The >>>> > workarounds were quite ugly and complicated. >>>> > >>>> > The new support allows declaring a TimerMap, which is a >>>> map >>>> > of timers. Each TimerMap is scoped by a family name, so >>>> you >>>> > can create multiple TimerMaps each with its own callback. >>>> > The use looks as follows: >>>> > >>>> > class MyDoFn extends DoFn<...> { >>>> > @TimerFamily("timers") >>>> > private final TimerSpec timerMap = >>>> > TimerSpecs.timerMap(TimeDomain.EVENT_TIME); >>>> > >>>> > @ProcessElement >>>> > public void process(@TimerFamily("timers") TimerMap >>>> > timers, @Element Type e) { >>>> > timers.set("mainTimer", timestamp); >>>> > timers.set("actionType" + e.getActionType(), >>>> timestamp); >>>> > } >>>> > >>>> > @OnTimerFamily . >>>> > public void onTimer(@TimerId String timerId) { >>>> > System.out.println("Timer fired. id: " + timerId); >>>> > } >>>> > } >>>> > >>>> > This currently works for the Flink and the Dataflow >>>> runners. >>>> > >>>> > Thank you Rehman for getting this done! Beam users will >>>> find >>>> > it very valuable. >>>> > >>>> > Reuven >>>> > >>>> >>>
