Apparently the fix of Dynamic Timers for Dataflow broke the ValidatesRunner tests for Flink in batch mode that were passing before. Can you please take a look Reuven or Rehman. Tests are failing since the exact commit for the fix: 7719708a04d5d0eff3048dbd58ac1337889f8ba5 For details on the exception: https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/
On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <ieme...@gmail.com> wrote: > Great to know you get it working on Dataflow easily Reuven. As a new > feature it > looks great! > > Agree with Kenn maybe worth to open a new thread to discuss the changes > still > needed to support this in portable runners. > > On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <k...@apache.org> wrote: > >> I think the (lack of) portability bit may have been buried in this >> thread. Maybe a new thread about the design for that? >> >> Kenn >> >> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote: >> >>> FYI, this is now fixed for Dataflow. I also added better rejection so >>> that runners that don't support this feature will reject the pipeline. >>> >>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote: >>> >>>> I took a look, and I think this was a simple bug. Testing a fix now. >>>> >>>> A larger question is how to support this in the portability layer. >>>> Right now portability assumes that each timer id corresponds to a logical >>>> input PCollection, but that assumption no longer works as we now support a >>>> dynamic set of timers, each with their own id. We could instead model each >>>> timer family as a PColleciton, but the FnApiRunner would need to >>>> dynamically get the timer id in order to invoke it, and today it statically >>>> reads the timer id from the PCollection name. >>>> >>>> Reuven >>>> >>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests >>>>> indeed never ran on any runner except for the DirectRunner, which is >>>>> something I should've noticed in the code review. >>>>> >>>>> Reuven >>>>> >>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ieme...@gmail.com> >>>>> wrote: >>>>> >>>>>> I had a discussion with Rehman last week and we discovered that the >>>>>> TimersMap >>>>>> related tests were not running for all runners because they were not >>>>>> tagged as >>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable >>>>>> this, so >>>>>> please someone help me with the review/merge. >>>>>> >>>>>> I took a look just for curiosity and discovered that they are only >>>>>> passing for >>>>>> Direct runner and for the classic Flink runner in batch mode. They >>>>>> are not >>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so >>>>>> probably worth >>>>>> to reopen the issue to investigate/fix. >>>>>> >>>>>> [1] https://github.com/apache/beam/pull/10747 >>>>>> [2] >>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/ >>>>>> [3] >>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/ >>>>>> >>>>>> >>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> Yes. For now we exclude the flink runner, but fixing this should be >>>>>>> fairly trivial. >>>>>>> >>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <m...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> The Flink Runner was allowing to set a timer multiple times before >>>>>>>> we >>>>>>>> made it comply with the Beam semantics of overwriting past >>>>>>>> invocations. >>>>>>>> I wouldn't be surprised if the Spark Runner never addressed this. >>>>>>>> Flink >>>>>>>> and Spark itself allow for a timer to be set to multiple times. In >>>>>>>> order >>>>>>>> to fix this for Beam, the Flink Runner has to maintain a >>>>>>>> checkpointed >>>>>>>> map which sits outside of its builtin TimerService. >>>>>>>> >>>>>>>> As far as I can see, multiple timer families are currently not >>>>>>>> supported >>>>>>>> in the Flink Runner due to the map not taking the family name into >>>>>>>> account. This can be easily fixed though. >>>>>>>> >>>>>>>> -Max >>>>>>>> >>>>>>>> On 24.01.20 21:31, Reuven Lax wrote: >>>>>>>> > The new timer family is in the portability protos. I think >>>>>>>> TimerReceiver >>>>>>>> > needs to be updated to set it though (I think a 1-line change). >>>>>>>> > >>>>>>>> > The TimerInternals class that runners implement today already >>>>>>>> handles >>>>>>>> > dynamic timers, so most of the work was in the Beam SDK to >>>>>>>> provide an >>>>>>>> > API that allows users to access this feature. >>>>>>>> > >>>>>>>> > The main work needed in the runner was to take in account the >>>>>>>> timer >>>>>>>> > family. Beam semantics say that if a timer is set twice with the >>>>>>>> same >>>>>>>> > id, then the second timer overwrites the first. Several runners >>>>>>>> > therefore had maps from timer id -> timer. However since the >>>>>>>> > timer family scopes the timers, we now allow two timers with the >>>>>>>> same id >>>>>>>> > as long as the timer families are different. Runners had to be >>>>>>>> updated >>>>>>>> > to include the timer family id in the map keys. >>>>>>>> > >>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark >>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I >>>>>>>> wonder >>>>>>>> > if this means that the Spark runner was incorrectly implementing >>>>>>>> the >>>>>>>> > Beam semantics before, and setTimer was not overwriting timers >>>>>>>> with the >>>>>>>> > same id? >>>>>>>> > >>>>>>>> > Reuven >>>>>>>> > >>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <ieme...@gmail.com >>>>>>>> > <mailto:ieme...@gmail.com>> wrote: >>>>>>>> > >>>>>>>> > This looks great, thanks for the contribution Rehman! >>>>>>>> > >>>>>>>> > I have some questions (note I have not looked at the code at >>>>>>>> all). >>>>>>>> > >>>>>>>> > - Is this working for both portable and non portable runners? >>>>>>>> > - What do other runners need to implement to support this >>>>>>>> (e.g. Spark)? >>>>>>>> > >>>>>>>> > Maybe worth to add this to the website Compatibility Matrix. >>>>>>>> > >>>>>>>> > Regards, >>>>>>>> > Ismaël >>>>>>>> > >>>>>>>> > >>>>>>>> > On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali >>>>>>>> > <rehman.murad...@venturedive.com >>>>>>>> > <mailto:rehman.murad...@venturedive.com>> wrote: >>>>>>>> > >>>>>>>> > Thank you Reuven for the guidance throughout the >>>>>>>> development >>>>>>>> > process. I am delighted to contribute my two cents to the >>>>>>>> Beam >>>>>>>> > project. >>>>>>>> > >>>>>>>> > Looking forward to more active contributions. >>>>>>>> > >>>>>>>> > * >>>>>>>> > * >>>>>>>> > >>>>>>>> > *Thanks & Regards____* >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > *Rehman Murad Ali* >>>>>>>> > Software Engineer >>>>>>>> > Mobile: +92 3452076766 <+92%20345%202076766> >>>>>>>> <tel:+92%20345%202076766> >>>>>>>> > Skype: rehman.muradali >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax < >>>>>>>> re...@google.com >>>>>>>> > <mailto:re...@google.com>> wrote: >>>>>>>> > >>>>>>>> > Thanks to a lot of hard work by Rehman, Beam now >>>>>>>> supports >>>>>>>> > dynamic timers. As a reminder, this was discussed on >>>>>>>> the dev >>>>>>>> > list some time back. >>>>>>>> > >>>>>>>> > As background, previously one had to statically >>>>>>>> declare all >>>>>>>> > timers in your code. So if you wanted to have two >>>>>>>> timers, >>>>>>>> > you needed to create two timer variables and two >>>>>>>> callbacks - >>>>>>>> > one for each timer. A number of users kept hitting >>>>>>>> stumbling >>>>>>>> > blocks where they needed a dynamic set of timers >>>>>>>> (often >>>>>>>> > based on the element), which was not supported in >>>>>>>> Beam. The >>>>>>>> > workarounds were quite ugly and complicated. >>>>>>>> > >>>>>>>> > The new support allows declaring a TimerMap, which is >>>>>>>> a map >>>>>>>> > of timers. Each TimerMap is scoped by a family name, >>>>>>>> so you >>>>>>>> > can create multiple TimerMaps each with its own >>>>>>>> callback. >>>>>>>> > The use looks as follows: >>>>>>>> > >>>>>>>> > class MyDoFn extends DoFn<...> { >>>>>>>> > @TimerFamily("timers") >>>>>>>> > private final TimerSpec timerMap = >>>>>>>> > TimerSpecs.timerMap(TimeDomain.EVENT_TIME); >>>>>>>> > >>>>>>>> > @ProcessElement >>>>>>>> > public void process(@TimerFamily("timers") >>>>>>>> TimerMap >>>>>>>> > timers, @Element Type e) { >>>>>>>> > timers.set("mainTimer", timestamp); >>>>>>>> > timers.set("actionType" + e.getActionType(), >>>>>>>> timestamp); >>>>>>>> > } >>>>>>>> > >>>>>>>> > @OnTimerFamily . >>>>>>>> > public void onTimer(@TimerId String timerId) { >>>>>>>> > System.out.println("Timer fired. id: " + >>>>>>>> timerId); >>>>>>>> > } >>>>>>>> > } >>>>>>>> > >>>>>>>> > This currently works for the Flink and the Dataflow >>>>>>>> runners. >>>>>>>> > >>>>>>>> > Thank you Rehman for getting this done! Beam users >>>>>>>> will find >>>>>>>> > it very valuable. >>>>>>>> > >>>>>>>> > Reuven >>>>>>>> > >>>>>>>> >>>>>>>