FYI, this is now fixed for Dataflow. I also added better rejection so that runners that don't support this feature will reject the pipeline.
On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <[email protected]> wrote: > I took a look, and I think this was a simple bug. Testing a fix now. > > A larger question is how to support this in the portability layer. Right > now portability assumes that each timer id corresponds to a logical input > PCollection, but that assumption no longer works as we now support a > dynamic set of timers, each with their own id. We could instead model each > timer family as a PColleciton, but the FnApiRunner would need to > dynamically get the timer id in order to invoke it, and today it statically > reads the timer id from the PCollection name. > > Reuven > > On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <[email protected]> wrote: > >> Thanks for finding this. Hopefully the bug is easy .to fix. The tests >> indeed never ran on any runner except for the DirectRunner, which is >> something I should've noticed in the code review. >> >> Reuven >> >> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <[email protected]> wrote: >> >>> I had a discussion with Rehman last week and we discovered that the >>> TimersMap >>> related tests were not running for all runners because they were not >>> tagged as >>> part of the ValidatesRunner category. I opened a PR [1] to enable this, >>> so >>> please someone help me with the review/merge. >>> >>> I took a look just for curiosity and discovered that they are only >>> passing for >>> Direct runner and for the classic Flink runner in batch mode. They are >>> not >>> passing for Dataflow [2][3] and for the Portable Flink runner, so >>> probably worth >>> to reopen the issue to investigate/fix. >>> >>> [1] https://github.com/apache/beam/pull/10747 >>> [2] >>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/ >>> [3] >>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/ >>> >>> >>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> wrote: >>> >>>> Yes. For now we exclude the flink runner, but fixing this should be >>>> fairly trivial. >>>> >>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]> >>>> wrote: >>>> >>>>> The Flink Runner was allowing to set a timer multiple times before we >>>>> made it comply with the Beam semantics of overwriting past >>>>> invocations. >>>>> I wouldn't be surprised if the Spark Runner never addressed this. >>>>> Flink >>>>> and Spark itself allow for a timer to be set to multiple times. In >>>>> order >>>>> to fix this for Beam, the Flink Runner has to maintain a checkpointed >>>>> map which sits outside of its builtin TimerService. >>>>> >>>>> As far as I can see, multiple timer families are currently not >>>>> supported >>>>> in the Flink Runner due to the map not taking the family name into >>>>> account. This can be easily fixed though. >>>>> >>>>> -Max >>>>> >>>>> On 24.01.20 21:31, Reuven Lax wrote: >>>>> > The new timer family is in the portability protos. I think >>>>> TimerReceiver >>>>> > needs to be updated to set it though (I think a 1-line change). >>>>> > >>>>> > The TimerInternals class that runners implement today already >>>>> handles >>>>> > dynamic timers, so most of the work was in the Beam SDK to provide >>>>> an >>>>> > API that allows users to access this feature. >>>>> > >>>>> > The main work needed in the runner was to take in account the timer >>>>> > family. Beam semantics say that if a timer is set twice with the >>>>> same >>>>> > id, then the second timer overwrites the first. Several runners >>>>> > therefore had maps from timer id -> timer. However since the >>>>> > timer family scopes the timers, we now allow two timers with the >>>>> same id >>>>> > as long as the timer families are different. Runners had to be >>>>> updated >>>>> > to include the timer family id in the map keys. >>>>> > >>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark >>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I >>>>> wonder >>>>> > if this means that the Spark runner was incorrectly implementing the >>>>> > Beam semantics before, and setTimer was not overwriting timers with >>>>> the >>>>> > same id? >>>>> > >>>>> > Reuven >>>>> > >>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected] >>>>> > <mailto:[email protected]>> wrote: >>>>> > >>>>> > This looks great, thanks for the contribution Rehman! >>>>> > >>>>> > I have some questions (note I have not looked at the code at >>>>> all). >>>>> > >>>>> > - Is this working for both portable and non portable runners? >>>>> > - What do other runners need to implement to support this (e.g. >>>>> Spark)? >>>>> > >>>>> > Maybe worth to add this to the website Compatibility Matrix. >>>>> > >>>>> > Regards, >>>>> > Ismaël >>>>> > >>>>> > >>>>> > On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali >>>>> > <[email protected] >>>>> > <mailto:[email protected]>> wrote: >>>>> > >>>>> > Thank you Reuven for the guidance throughout the development >>>>> > process. I am delighted to contribute my two cents to the >>>>> Beam >>>>> > project. >>>>> > >>>>> > Looking forward to more active contributions. >>>>> > >>>>> > * >>>>> > * >>>>> > >>>>> > *Thanks & Regards____* >>>>> > >>>>> > >>>>> > >>>>> > *Rehman Murad Ali* >>>>> > Software Engineer >>>>> > Mobile: +92 3452076766 <+92%20345%202076766> >>>>> <tel:+92%20345%202076766> >>>>> > Skype: rehman.muradali >>>>> > >>>>> > >>>>> > >>>>> > On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax < >>>>> [email protected] >>>>> > <mailto:[email protected]>> wrote: >>>>> > >>>>> > Thanks to a lot of hard work by Rehman, Beam now supports >>>>> > dynamic timers. As a reminder, this was discussed on the >>>>> dev >>>>> > list some time back. >>>>> > >>>>> > As background, previously one had to statically declare >>>>> all >>>>> > timers in your code. So if you wanted to have two timers, >>>>> > you needed to create two timer variables and two >>>>> callbacks - >>>>> > one for each timer. A number of users kept hitting >>>>> stumbling >>>>> > blocks where they needed a dynamic set of timers (often >>>>> > based on the element), which was not supported in Beam. >>>>> The >>>>> > workarounds were quite ugly and complicated. >>>>> > >>>>> > The new support allows declaring a TimerMap, which is a >>>>> map >>>>> > of timers. Each TimerMap is scoped by a family name, so >>>>> you >>>>> > can create multiple TimerMaps each with its own callback. >>>>> > The use looks as follows: >>>>> > >>>>> > class MyDoFn extends DoFn<...> { >>>>> > @TimerFamily("timers") >>>>> > private final TimerSpec timerMap = >>>>> > TimerSpecs.timerMap(TimeDomain.EVENT_TIME); >>>>> > >>>>> > @ProcessElement >>>>> > public void process(@TimerFamily("timers") TimerMap >>>>> > timers, @Element Type e) { >>>>> > timers.set("mainTimer", timestamp); >>>>> > timers.set("actionType" + e.getActionType(), >>>>> timestamp); >>>>> > } >>>>> > >>>>> > @OnTimerFamily . >>>>> > public void onTimer(@TimerId String timerId) { >>>>> > System.out.println("Timer fired. id: " + timerId); >>>>> > } >>>>> > } >>>>> > >>>>> > This currently works for the Flink and the Dataflow >>>>> runners. >>>>> > >>>>> > Thank you Rehman for getting this done! Beam users will >>>>> find >>>>> > it very valuable. >>>>> > >>>>> > Reuven >>>>> > >>>>> >>>>
