I think the (lack of) portability bit may have been buried in this thread. Maybe a new thread about the design for that?
Kenn On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <[email protected]> wrote: > FYI, this is now fixed for Dataflow. I also added better rejection so that > runners that don't support this feature will reject the pipeline. > > On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <[email protected]> wrote: > >> I took a look, and I think this was a simple bug. Testing a fix now. >> >> A larger question is how to support this in the portability layer. Right >> now portability assumes that each timer id corresponds to a logical input >> PCollection, but that assumption no longer works as we now support a >> dynamic set of timers, each with their own id. We could instead model each >> timer family as a PColleciton, but the FnApiRunner would need to >> dynamically get the timer id in order to invoke it, and today it statically >> reads the timer id from the PCollection name. >> >> Reuven >> >> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <[email protected]> wrote: >> >>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests >>> indeed never ran on any runner except for the DirectRunner, which is >>> something I should've noticed in the code review. >>> >>> Reuven >>> >>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <[email protected]> wrote: >>> >>>> I had a discussion with Rehman last week and we discovered that the >>>> TimersMap >>>> related tests were not running for all runners because they were not >>>> tagged as >>>> part of the ValidatesRunner category. I opened a PR [1] to enable this, >>>> so >>>> please someone help me with the review/merge. >>>> >>>> I took a look just for curiosity and discovered that they are only >>>> passing for >>>> Direct runner and for the classic Flink runner in batch mode. They are >>>> not >>>> passing for Dataflow [2][3] and for the Portable Flink runner, so >>>> probably worth >>>> to reopen the issue to investigate/fix. >>>> >>>> [1] https://github.com/apache/beam/pull/10747 >>>> [2] >>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/ >>>> [3] >>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/ >>>> >>>> >>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> wrote: >>>> >>>>> Yes. For now we exclude the flink runner, but fixing this should be >>>>> fairly trivial. >>>>> >>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]> >>>>> wrote: >>>>> >>>>>> The Flink Runner was allowing to set a timer multiple times before we >>>>>> made it comply with the Beam semantics of overwriting past >>>>>> invocations. >>>>>> I wouldn't be surprised if the Spark Runner never addressed this. >>>>>> Flink >>>>>> and Spark itself allow for a timer to be set to multiple times. In >>>>>> order >>>>>> to fix this for Beam, the Flink Runner has to maintain a checkpointed >>>>>> map which sits outside of its builtin TimerService. >>>>>> >>>>>> As far as I can see, multiple timer families are currently not >>>>>> supported >>>>>> in the Flink Runner due to the map not taking the family name into >>>>>> account. This can be easily fixed though. >>>>>> >>>>>> -Max >>>>>> >>>>>> On 24.01.20 21:31, Reuven Lax wrote: >>>>>> > The new timer family is in the portability protos. I think >>>>>> TimerReceiver >>>>>> > needs to be updated to set it though (I think a 1-line change). >>>>>> > >>>>>> > The TimerInternals class that runners implement today already >>>>>> handles >>>>>> > dynamic timers, so most of the work was in the Beam SDK to provide >>>>>> an >>>>>> > API that allows users to access this feature. >>>>>> > >>>>>> > The main work needed in the runner was to take in account the timer >>>>>> > family. Beam semantics say that if a timer is set twice with the >>>>>> same >>>>>> > id, then the second timer overwrites the first. Several runners >>>>>> > therefore had maps from timer id -> timer. However since the >>>>>> > timer family scopes the timers, we now allow two timers with the >>>>>> same id >>>>>> > as long as the timer families are different. Runners had to be >>>>>> updated >>>>>> > to include the timer family id in the map keys. >>>>>> > >>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark >>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I >>>>>> wonder >>>>>> > if this means that the Spark runner was incorrectly implementing >>>>>> the >>>>>> > Beam semantics before, and setTimer was not overwriting timers with >>>>>> the >>>>>> > same id? >>>>>> > >>>>>> > Reuven >>>>>> > >>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected] >>>>>> > <mailto:[email protected]>> wrote: >>>>>> > >>>>>> > This looks great, thanks for the contribution Rehman! >>>>>> > >>>>>> > I have some questions (note I have not looked at the code at >>>>>> all). >>>>>> > >>>>>> > - Is this working for both portable and non portable runners? >>>>>> > - What do other runners need to implement to support this (e.g. >>>>>> Spark)? >>>>>> > >>>>>> > Maybe worth to add this to the website Compatibility Matrix. >>>>>> > >>>>>> > Regards, >>>>>> > Ismaël >>>>>> > >>>>>> > >>>>>> > On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali >>>>>> > <[email protected] >>>>>> > <mailto:[email protected]>> wrote: >>>>>> > >>>>>> > Thank you Reuven for the guidance throughout the development >>>>>> > process. I am delighted to contribute my two cents to the >>>>>> Beam >>>>>> > project. >>>>>> > >>>>>> > Looking forward to more active contributions. >>>>>> > >>>>>> > * >>>>>> > * >>>>>> > >>>>>> > *Thanks & Regards____* >>>>>> > >>>>>> > >>>>>> > >>>>>> > *Rehman Murad Ali* >>>>>> > Software Engineer >>>>>> > Mobile: +92 3452076766 <+92%20345%202076766> >>>>>> <tel:+92%20345%202076766> >>>>>> > Skype: rehman.muradali >>>>>> > >>>>>> > >>>>>> > >>>>>> > On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax < >>>>>> [email protected] >>>>>> > <mailto:[email protected]>> wrote: >>>>>> > >>>>>> > Thanks to a lot of hard work by Rehman, Beam now >>>>>> supports >>>>>> > dynamic timers. As a reminder, this was discussed on >>>>>> the dev >>>>>> > list some time back. >>>>>> > >>>>>> > As background, previously one had to statically declare >>>>>> all >>>>>> > timers in your code. So if you wanted to have two >>>>>> timers, >>>>>> > you needed to create two timer variables and two >>>>>> callbacks - >>>>>> > one for each timer. A number of users kept hitting >>>>>> stumbling >>>>>> > blocks where they needed a dynamic set of timers (often >>>>>> > based on the element), which was not supported in Beam. >>>>>> The >>>>>> > workarounds were quite ugly and complicated. >>>>>> > >>>>>> > The new support allows declaring a TimerMap, which is a >>>>>> map >>>>>> > of timers. Each TimerMap is scoped by a family name, so >>>>>> you >>>>>> > can create multiple TimerMaps each with its own >>>>>> callback. >>>>>> > The use looks as follows: >>>>>> > >>>>>> > class MyDoFn extends DoFn<...> { >>>>>> > @TimerFamily("timers") >>>>>> > private final TimerSpec timerMap = >>>>>> > TimerSpecs.timerMap(TimeDomain.EVENT_TIME); >>>>>> > >>>>>> > @ProcessElement >>>>>> > public void process(@TimerFamily("timers") TimerMap >>>>>> > timers, @Element Type e) { >>>>>> > timers.set("mainTimer", timestamp); >>>>>> > timers.set("actionType" + e.getActionType(), >>>>>> timestamp); >>>>>> > } >>>>>> > >>>>>> > @OnTimerFamily . >>>>>> > public void onTimer(@TimerId String timerId) { >>>>>> > System.out.println("Timer fired. id: " + timerId); >>>>>> > } >>>>>> > } >>>>>> > >>>>>> > This currently works for the Flink and the Dataflow >>>>>> runners. >>>>>> > >>>>>> > Thank you Rehman for getting this done! Beam users will >>>>>> find >>>>>> > it very valuable. >>>>>> > >>>>>> > Reuven >>>>>> > >>>>>> >>>>>
