Thanks for finding this. Hopefully the bug is easy .to fix. The tests indeed never ran on any runner except for the DirectRunner, which is something I should've noticed in the code review.
Reuven On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <[email protected]> wrote: > I had a discussion with Rehman last week and we discovered that the > TimersMap > related tests were not running for all runners because they were not > tagged as > part of the ValidatesRunner category. I opened a PR [1] to enable this, so > please someone help me with the review/merge. > > I took a look just for curiosity and discovered that they are only passing > for > Direct runner and for the classic Flink runner in batch mode. They are not > passing for Dataflow [2][3] and for the Portable Flink runner, so probably > worth > to reopen the issue to investigate/fix. > > [1] https://github.com/apache/beam/pull/10747 > [2] > https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/ > [3] > https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/ > > > On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> wrote: > >> Yes. For now we exclude the flink runner, but fixing this should be >> fairly trivial. >> >> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]> >> wrote: >> >>> The Flink Runner was allowing to set a timer multiple times before we >>> made it comply with the Beam semantics of overwriting past invocations. >>> I wouldn't be surprised if the Spark Runner never addressed this. Flink >>> and Spark itself allow for a timer to be set to multiple times. In order >>> to fix this for Beam, the Flink Runner has to maintain a checkpointed >>> map which sits outside of its builtin TimerService. >>> >>> As far as I can see, multiple timer families are currently not supported >>> in the Flink Runner due to the map not taking the family name into >>> account. This can be easily fixed though. >>> >>> -Max >>> >>> On 24.01.20 21:31, Reuven Lax wrote: >>> > The new timer family is in the portability protos. I think >>> TimerReceiver >>> > needs to be updated to set it though (I think a 1-line change). >>> > >>> > The TimerInternals class that runners implement today already handles >>> > dynamic timers, so most of the work was in the Beam SDK to provide an >>> > API that allows users to access this feature. >>> > >>> > The main work needed in the runner was to take in account the timer >>> > family. Beam semantics say that if a timer is set twice with the same >>> > id, then the second timer overwrites the first. Several runners >>> > therefore had maps from timer id -> timer. However since the >>> > timer family scopes the timers, we now allow two timers with the same >>> id >>> > as long as the timer families are different. Runners had to be updated >>> > to include the timer family id in the map keys. >>> > >>> > Surprisingly, the new TimerMap tests seem to pass on Spark >>> > ValidatesRunner, even though the Spark runner wasn't updated! I wonder >>> > if this means that the Spark runner was incorrectly implementing the >>> > Beam semantics before, and setTimer was not overwriting timers with >>> the >>> > same id? >>> > >>> > Reuven >>> > >>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected] >>> > <mailto:[email protected]>> wrote: >>> > >>> > This looks great, thanks for the contribution Rehman! >>> > >>> > I have some questions (note I have not looked at the code at all). >>> > >>> > - Is this working for both portable and non portable runners? >>> > - What do other runners need to implement to support this (e.g. >>> Spark)? >>> > >>> > Maybe worth to add this to the website Compatibility Matrix. >>> > >>> > Regards, >>> > Ismaël >>> > >>> > >>> > On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali >>> > <[email protected] >>> > <mailto:[email protected]>> wrote: >>> > >>> > Thank you Reuven for the guidance throughout the development >>> > process. I am delighted to contribute my two cents to the Beam >>> > project. >>> > >>> > Looking forward to more active contributions. >>> > >>> > * >>> > * >>> > >>> > *Thanks & Regards____* >>> > >>> > >>> > >>> > *Rehman Murad Ali* >>> > Software Engineer >>> > Mobile: +92 3452076766 <+92%20345%202076766> >>> <tel:+92%20345%202076766> >>> > Skype: rehman.muradali >>> > >>> > >>> > >>> > On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <[email protected] >>> > <mailto:[email protected]>> wrote: >>> > >>> > Thanks to a lot of hard work by Rehman, Beam now supports >>> > dynamic timers. As a reminder, this was discussed on the >>> dev >>> > list some time back. >>> > >>> > As background, previously one had to statically declare all >>> > timers in your code. So if you wanted to have two timers, >>> > you needed to create two timer variables and two callbacks >>> - >>> > one for each timer. A number of users kept hitting >>> stumbling >>> > blocks where they needed a dynamic set of timers (often >>> > based on the element), which was not supported in Beam. The >>> > workarounds were quite ugly and complicated. >>> > >>> > The new support allows declaring a TimerMap, which is a map >>> > of timers. Each TimerMap is scoped by a family name, so you >>> > can create multiple TimerMaps each with its own callback. >>> > The use looks as follows: >>> > >>> > class MyDoFn extends DoFn<...> { >>> > @TimerFamily("timers") >>> > private final TimerSpec timerMap = >>> > TimerSpecs.timerMap(TimeDomain.EVENT_TIME); >>> > >>> > @ProcessElement >>> > public void process(@TimerFamily("timers") TimerMap >>> > timers, @Element Type e) { >>> > timers.set("mainTimer", timestamp); >>> > timers.set("actionType" + e.getActionType(), >>> timestamp); >>> > } >>> > >>> > @OnTimerFamily . >>> > public void onTimer(@TimerId String timerId) { >>> > System.out.println("Timer fired. id: " + timerId); >>> > } >>> > } >>> > >>> > This currently works for the Flink and the Dataflow >>> runners. >>> > >>> > Thank you Rehman for getting this done! Beam users will >>> find >>> > it very valuable. >>> > >>> > Reuven >>> > >>> >>
