Thanks for figuring this out! I didn't know about the UsesUnboundedPCollections category.
On Fri, Feb 14, 2020 at 12:57 PM Ismaël Mejía <ieme...@gmail.com> wrote: > Exact, since the new tests use Unbounded PCollections we have to add the > UsesUnboundedPCollections category. > Also the Flink runner was not excluding this category for the batch > (bounded) tests. > I opened this one to fix it, PTAL Reuven > https://github.com/apache/beam/pull/10871 > > On Fri, Feb 14, 2020 at 8:52 PM Reuven Lax <re...@google.com> wrote: > >> This is running as part of the validatesRunnerBatch test, but it is >> executing a streaming test. Maybe that's why it's failing? >> >> On Fri, Feb 14, 2020 at 9:42 AM Reuven Lax <re...@google.com> wrote: >> >>> Ismael, >>> >>> As part of that fix I added some new tests to make sure to run these >>> tests on both batch and streaming runners, as I realized that the test was >>> running only on batch runners before. I did this by explicitly setting the >>> isBounded attribute on the output of the Create transform. Somehow these >>> new tests I added are making the Flink runner unhappy. >>> >>> I'm not sure why explicitly setting the PCollection to be unbounded is >>> breaking on the Flink runner. We can try and exclude the flink runner from >>> these tests for now, but maybe Max has an idea. >>> >>> Also Ismael, what makes this a batch mode test? When I look at the >>> failing stack trace, the failure is in FlinkStreamingPipelineTranslator. >>> >>> Reuven >>> >>> On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <ieme...@gmail.com> wrote: >>> >>>> Apparently the fix of Dynamic Timers for Dataflow broke the >>>> ValidatesRunner tests for Flink in batch mode that were passing before. >>>> Can you please take a look Reuven or Rehman. >>>> Tests are failing since the exact commit for the fix: >>>> 7719708a04d5d0eff3048dbd58ac1337889f8ba5 >>>> For details on the exception: >>>> >>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/ >>>> >>>> >>>> On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <ieme...@gmail.com> >>>> wrote: >>>> >>>>> Great to know you get it working on Dataflow easily Reuven. As a new >>>>> feature it >>>>> looks great! >>>>> >>>>> Agree with Kenn maybe worth to open a new thread to discuss the >>>>> changes still >>>>> needed to support this in portable runners. >>>>> >>>>> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <k...@apache.org> >>>>> wrote: >>>>> >>>>>> I think the (lack of) portability bit may have been buried in this >>>>>> thread. Maybe a new thread about the design for that? >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> FYI, this is now fixed for Dataflow. I also added better rejection >>>>>>> so that runners that don't support this feature will reject the >>>>>>> pipeline. >>>>>>> >>>>>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> I took a look, and I think this was a simple bug. Testing a fix now. >>>>>>>> >>>>>>>> A larger question is how to support this in the portability layer. >>>>>>>> Right now portability assumes that each timer id corresponds to a >>>>>>>> logical >>>>>>>> input PCollection, but that assumption no longer works as we now >>>>>>>> support a >>>>>>>> dynamic set of timers, each with their own id. We could instead model >>>>>>>> each >>>>>>>> timer family as a PColleciton, but the FnApiRunner would need to >>>>>>>> dynamically get the timer id in order to invoke it, and today it >>>>>>>> statically >>>>>>>> reads the timer id from the PCollection name. >>>>>>>> >>>>>>>> Reuven >>>>>>>> >>>>>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote: >>>>>>>> >>>>>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The >>>>>>>>> tests indeed never ran on any runner except for the DirectRunner, >>>>>>>>> which is >>>>>>>>> something I should've noticed in the code review. >>>>>>>>> >>>>>>>>> Reuven >>>>>>>>> >>>>>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ieme...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I had a discussion with Rehman last week and we discovered that >>>>>>>>>> the TimersMap >>>>>>>>>> related tests were not running for all runners because they were >>>>>>>>>> not tagged as >>>>>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable >>>>>>>>>> this, so >>>>>>>>>> please someone help me with the review/merge. >>>>>>>>>> >>>>>>>>>> I took a look just for curiosity and discovered that they are >>>>>>>>>> only passing for >>>>>>>>>> Direct runner and for the classic Flink runner in batch mode. >>>>>>>>>> They are not >>>>>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so >>>>>>>>>> probably worth >>>>>>>>>> to reopen the issue to investigate/fix. >>>>>>>>>> >>>>>>>>>> [1] https://github.com/apache/beam/pull/10747 >>>>>>>>>> [2] >>>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/ >>>>>>>>>> [3] >>>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/ >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Yes. For now we exclude the flink runner, but fixing this should >>>>>>>>>>> be fairly trivial. >>>>>>>>>>> >>>>>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels < >>>>>>>>>>> m...@apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> The Flink Runner was allowing to set a timer multiple times >>>>>>>>>>>> before we >>>>>>>>>>>> made it comply with the Beam semantics of overwriting past >>>>>>>>>>>> invocations. >>>>>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed >>>>>>>>>>>> this. Flink >>>>>>>>>>>> and Spark itself allow for a timer to be set to multiple times. >>>>>>>>>>>> In order >>>>>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a >>>>>>>>>>>> checkpointed >>>>>>>>>>>> map which sits outside of its builtin TimerService. >>>>>>>>>>>> >>>>>>>>>>>> As far as I can see, multiple timer families are currently not >>>>>>>>>>>> supported >>>>>>>>>>>> in the Flink Runner due to the map not taking the family name >>>>>>>>>>>> into >>>>>>>>>>>> account. This can be easily fixed though. >>>>>>>>>>>> >>>>>>>>>>>> -Max >>>>>>>>>>>> >>>>>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote: >>>>>>>>>>>> > The new timer family is in the portability protos. I think >>>>>>>>>>>> TimerReceiver >>>>>>>>>>>> > needs to be updated to set it though (I think a 1-line >>>>>>>>>>>> change). >>>>>>>>>>>> > >>>>>>>>>>>> > The TimerInternals class that runners implement today already >>>>>>>>>>>> handles >>>>>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK to >>>>>>>>>>>> provide an >>>>>>>>>>>> > API that allows users to access this feature. >>>>>>>>>>>> > >>>>>>>>>>>> > The main work needed in the runner was to take in account the >>>>>>>>>>>> timer >>>>>>>>>>>> > family. Beam semantics say that if a timer is set twice with >>>>>>>>>>>> the same >>>>>>>>>>>> > id, then the second timer overwrites the first. Several >>>>>>>>>>>> runners >>>>>>>>>>>> > therefore had maps from timer id -> timer. However since the >>>>>>>>>>>> > timer family scopes the timers, we now allow two timers with >>>>>>>>>>>> the same id >>>>>>>>>>>> > as long as the timer families are different. Runners had to >>>>>>>>>>>> be updated >>>>>>>>>>>> > to include the timer family id in the map keys. >>>>>>>>>>>> > >>>>>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark >>>>>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! >>>>>>>>>>>> I wonder >>>>>>>>>>>> > if this means that the Spark runner was incorrectly >>>>>>>>>>>> implementing the >>>>>>>>>>>> > Beam semantics before, and setTimer was not overwriting >>>>>>>>>>>> timers with the >>>>>>>>>>>> > same id? >>>>>>>>>>>> > >>>>>>>>>>>> > Reuven >>>>>>>>>>>> > >>>>>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía < >>>>>>>>>>>> ieme...@gmail.com >>>>>>>>>>>> > <mailto:ieme...@gmail.com>> wrote: >>>>>>>>>>>> > >>>>>>>>>>>> > This looks great, thanks for the contribution Rehman! >>>>>>>>>>>> > >>>>>>>>>>>> > I have some questions (note I have not looked at the code >>>>>>>>>>>> at all). >>>>>>>>>>>> > >>>>>>>>>>>> > - Is this working for both portable and non portable >>>>>>>>>>>> runners? >>>>>>>>>>>> > - What do other runners need to implement to support this >>>>>>>>>>>> (e.g. Spark)? >>>>>>>>>>>> > >>>>>>>>>>>> > Maybe worth to add this to the website Compatibility >>>>>>>>>>>> Matrix. >>>>>>>>>>>> > >>>>>>>>>>>> > Regards, >>>>>>>>>>>> > Ismaël >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali >>>>>>>>>>>> > <rehman.murad...@venturedive.com >>>>>>>>>>>> > <mailto:rehman.murad...@venturedive.com>> wrote: >>>>>>>>>>>> > >>>>>>>>>>>> > Thank you Reuven for the guidance throughout the >>>>>>>>>>>> development >>>>>>>>>>>> > process. I am delighted to contribute my two cents to >>>>>>>>>>>> the Beam >>>>>>>>>>>> > project. >>>>>>>>>>>> > >>>>>>>>>>>> > Looking forward to more active contributions. >>>>>>>>>>>> > >>>>>>>>>>>> > * >>>>>>>>>>>> > * >>>>>>>>>>>> > >>>>>>>>>>>> > *Thanks & Regards____* >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > *Rehman Murad Ali* >>>>>>>>>>>> > Software Engineer >>>>>>>>>>>> > Mobile: +92 3452076766 <+92%20345%202076766> >>>>>>>>>>>> <tel:+92%20345%202076766> >>>>>>>>>>>> > Skype: rehman.muradali >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax < >>>>>>>>>>>> re...@google.com >>>>>>>>>>>> > <mailto:re...@google.com>> wrote: >>>>>>>>>>>> > >>>>>>>>>>>> > Thanks to a lot of hard work by Rehman, Beam now >>>>>>>>>>>> supports >>>>>>>>>>>> > dynamic timers. As a reminder, this was discussed >>>>>>>>>>>> on the dev >>>>>>>>>>>> > list some time back. >>>>>>>>>>>> > >>>>>>>>>>>> > As background, previously one had to statically >>>>>>>>>>>> declare all >>>>>>>>>>>> > timers in your code. So if you wanted to have two >>>>>>>>>>>> timers, >>>>>>>>>>>> > you needed to create two timer variables and two >>>>>>>>>>>> callbacks - >>>>>>>>>>>> > one for each timer. A number of users kept >>>>>>>>>>>> hitting stumbling >>>>>>>>>>>> > blocks where they needed a dynamic set of timers >>>>>>>>>>>> (often >>>>>>>>>>>> > based on the element), which was not supported in >>>>>>>>>>>> Beam. The >>>>>>>>>>>> > workarounds were quite ugly and complicated. >>>>>>>>>>>> > >>>>>>>>>>>> > The new support allows declaring a TimerMap, >>>>>>>>>>>> which is a map >>>>>>>>>>>> > of timers. Each TimerMap is scoped by a family >>>>>>>>>>>> name, so you >>>>>>>>>>>> > can create multiple TimerMaps each with its own >>>>>>>>>>>> callback. >>>>>>>>>>>> > The use looks as follows: >>>>>>>>>>>> > >>>>>>>>>>>> > class MyDoFn extends DoFn<...> { >>>>>>>>>>>> > @TimerFamily("timers") >>>>>>>>>>>> > private final TimerSpec timerMap = >>>>>>>>>>>> > TimerSpecs.timerMap(TimeDomain.EVENT_TIME); >>>>>>>>>>>> > >>>>>>>>>>>> > @ProcessElement >>>>>>>>>>>> > public void process(@TimerFamily("timers") >>>>>>>>>>>> TimerMap >>>>>>>>>>>> > timers, @Element Type e) { >>>>>>>>>>>> > timers.set("mainTimer", timestamp); >>>>>>>>>>>> > timers.set("actionType" + >>>>>>>>>>>> e.getActionType(), timestamp); >>>>>>>>>>>> > } >>>>>>>>>>>> > >>>>>>>>>>>> > @OnTimerFamily . >>>>>>>>>>>> > public void onTimer(@TimerId String timerId) { >>>>>>>>>>>> > System.out.println("Timer fired. id: " + >>>>>>>>>>>> timerId); >>>>>>>>>>>> > } >>>>>>>>>>>> > } >>>>>>>>>>>> > >>>>>>>>>>>> > This currently works for the Flink and the >>>>>>>>>>>> Dataflow runners. >>>>>>>>>>>> > >>>>>>>>>>>> > Thank you Rehman for getting this done! Beam >>>>>>>>>>>> users will find >>>>>>>>>>>> > it very valuable. >>>>>>>>>>>> > >>>>>>>>>>>> > Reuven >>>>>>>>>>>> > >>>>>>>>>>>> >>>>>>>>>>>