Exact, since the new tests use Unbounded PCollections we have to add the UsesUnboundedPCollections category. Also the Flink runner was not excluding this category for the batch (bounded) tests. I opened this one to fix it, PTAL Reuven https://github.com/apache/beam/pull/10871
On Fri, Feb 14, 2020 at 8:52 PM Reuven Lax <[email protected]> wrote: > This is running as part of the validatesRunnerBatch test, but it is > executing a streaming test. Maybe that's why it's failing? > > On Fri, Feb 14, 2020 at 9:42 AM Reuven Lax <[email protected]> wrote: > >> Ismael, >> >> As part of that fix I added some new tests to make sure to run these >> tests on both batch and streaming runners, as I realized that the test was >> running only on batch runners before. I did this by explicitly setting the >> isBounded attribute on the output of the Create transform. Somehow these >> new tests I added are making the Flink runner unhappy. >> >> I'm not sure why explicitly setting the PCollection to be unbounded is >> breaking on the Flink runner. We can try and exclude the flink runner from >> these tests for now, but maybe Max has an idea. >> >> Also Ismael, what makes this a batch mode test? When I look at the >> failing stack trace, the failure is in FlinkStreamingPipelineTranslator. >> >> Reuven >> >> On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <[email protected]> wrote: >> >>> Apparently the fix of Dynamic Timers for Dataflow broke the >>> ValidatesRunner tests for Flink in batch mode that were passing before. >>> Can you please take a look Reuven or Rehman. >>> Tests are failing since the exact commit for the fix: >>> 7719708a04d5d0eff3048dbd58ac1337889f8ba5 >>> For details on the exception: >>> >>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/ >>> >>> >>> On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <[email protected]> wrote: >>> >>>> Great to know you get it working on Dataflow easily Reuven. As a new >>>> feature it >>>> looks great! >>>> >>>> Agree with Kenn maybe worth to open a new thread to discuss the changes >>>> still >>>> needed to support this in portable runners. >>>> >>>> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <[email protected]> >>>> wrote: >>>> >>>>> I think the (lack of) portability bit may have been buried in this >>>>> thread. Maybe a new thread about the design for that? >>>>> >>>>> Kenn >>>>> >>>>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <[email protected]> wrote: >>>>> >>>>>> FYI, this is now fixed for Dataflow. I also added better rejection so >>>>>> that runners that don't support this feature will reject the pipeline. >>>>>> >>>>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> I took a look, and I think this was a simple bug. Testing a fix now. >>>>>>> >>>>>>> A larger question is how to support this in the portability layer. >>>>>>> Right now portability assumes that each timer id corresponds to a >>>>>>> logical >>>>>>> input PCollection, but that assumption no longer works as we now >>>>>>> support a >>>>>>> dynamic set of timers, each with their own id. We could instead model >>>>>>> each >>>>>>> timer family as a PColleciton, but the FnApiRunner would need to >>>>>>> dynamically get the timer id in order to invoke it, and today it >>>>>>> statically >>>>>>> reads the timer id from the PCollection name. >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <[email protected]> wrote: >>>>>>> >>>>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The >>>>>>>> tests indeed never ran on any runner except for the DirectRunner, >>>>>>>> which is >>>>>>>> something I should've noticed in the code review. >>>>>>>> >>>>>>>> Reuven >>>>>>>> >>>>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I had a discussion with Rehman last week and we discovered that >>>>>>>>> the TimersMap >>>>>>>>> related tests were not running for all runners because they were >>>>>>>>> not tagged as >>>>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable >>>>>>>>> this, so >>>>>>>>> please someone help me with the review/merge. >>>>>>>>> >>>>>>>>> I took a look just for curiosity and discovered that they are only >>>>>>>>> passing for >>>>>>>>> Direct runner and for the classic Flink runner in batch mode. They >>>>>>>>> are not >>>>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so >>>>>>>>> probably worth >>>>>>>>> to reopen the issue to investigate/fix. >>>>>>>>> >>>>>>>>> [1] https://github.com/apache/beam/pull/10747 >>>>>>>>> [2] >>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/ >>>>>>>>> [3] >>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/ >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Yes. For now we exclude the flink runner, but fixing this should >>>>>>>>>> be fairly trivial. >>>>>>>>>> >>>>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> The Flink Runner was allowing to set a timer multiple times >>>>>>>>>>> before we >>>>>>>>>>> made it comply with the Beam semantics of overwriting past >>>>>>>>>>> invocations. >>>>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed >>>>>>>>>>> this. Flink >>>>>>>>>>> and Spark itself allow for a timer to be set to multiple times. >>>>>>>>>>> In order >>>>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a >>>>>>>>>>> checkpointed >>>>>>>>>>> map which sits outside of its builtin TimerService. >>>>>>>>>>> >>>>>>>>>>> As far as I can see, multiple timer families are currently not >>>>>>>>>>> supported >>>>>>>>>>> in the Flink Runner due to the map not taking the family name >>>>>>>>>>> into >>>>>>>>>>> account. This can be easily fixed though. >>>>>>>>>>> >>>>>>>>>>> -Max >>>>>>>>>>> >>>>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote: >>>>>>>>>>> > The new timer family is in the portability protos. I think >>>>>>>>>>> TimerReceiver >>>>>>>>>>> > needs to be updated to set it though (I think a 1-line change). >>>>>>>>>>> > >>>>>>>>>>> > The TimerInternals class that runners implement today already >>>>>>>>>>> handles >>>>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK to >>>>>>>>>>> provide an >>>>>>>>>>> > API that allows users to access this feature. >>>>>>>>>>> > >>>>>>>>>>> > The main work needed in the runner was to take in account the >>>>>>>>>>> timer >>>>>>>>>>> > family. Beam semantics say that if a timer is set twice with >>>>>>>>>>> the same >>>>>>>>>>> > id, then the second timer overwrites the first. Several >>>>>>>>>>> runners >>>>>>>>>>> > therefore had maps from timer id -> timer. However since the >>>>>>>>>>> > timer family scopes the timers, we now allow two timers with >>>>>>>>>>> the same id >>>>>>>>>>> > as long as the timer families are different. Runners had to be >>>>>>>>>>> updated >>>>>>>>>>> > to include the timer family id in the map keys. >>>>>>>>>>> > >>>>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark >>>>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! >>>>>>>>>>> I wonder >>>>>>>>>>> > if this means that the Spark runner was incorrectly >>>>>>>>>>> implementing the >>>>>>>>>>> > Beam semantics before, and setTimer was not overwriting timers >>>>>>>>>>> with the >>>>>>>>>>> > same id? >>>>>>>>>>> > >>>>>>>>>>> > Reuven >>>>>>>>>>> > >>>>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía < >>>>>>>>>>> [email protected] >>>>>>>>>>> > <mailto:[email protected]>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > This looks great, thanks for the contribution Rehman! >>>>>>>>>>> > >>>>>>>>>>> > I have some questions (note I have not looked at the code >>>>>>>>>>> at all). >>>>>>>>>>> > >>>>>>>>>>> > - Is this working for both portable and non portable >>>>>>>>>>> runners? >>>>>>>>>>> > - What do other runners need to implement to support this >>>>>>>>>>> (e.g. Spark)? >>>>>>>>>>> > >>>>>>>>>>> > Maybe worth to add this to the website Compatibility >>>>>>>>>>> Matrix. >>>>>>>>>>> > >>>>>>>>>>> > Regards, >>>>>>>>>>> > Ismaël >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali >>>>>>>>>>> > <[email protected] >>>>>>>>>>> > <mailto:[email protected]>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > Thank you Reuven for the guidance throughout the >>>>>>>>>>> development >>>>>>>>>>> > process. I am delighted to contribute my two cents to >>>>>>>>>>> the Beam >>>>>>>>>>> > project. >>>>>>>>>>> > >>>>>>>>>>> > Looking forward to more active contributions. >>>>>>>>>>> > >>>>>>>>>>> > * >>>>>>>>>>> > * >>>>>>>>>>> > >>>>>>>>>>> > *Thanks & Regards____* >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > *Rehman Murad Ali* >>>>>>>>>>> > Software Engineer >>>>>>>>>>> > Mobile: +92 3452076766 <+92%20345%202076766> >>>>>>>>>>> <tel:+92%20345%202076766> >>>>>>>>>>> > Skype: rehman.muradali >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax < >>>>>>>>>>> [email protected] >>>>>>>>>>> > <mailto:[email protected]>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > Thanks to a lot of hard work by Rehman, Beam now >>>>>>>>>>> supports >>>>>>>>>>> > dynamic timers. As a reminder, this was discussed >>>>>>>>>>> on the dev >>>>>>>>>>> > list some time back. >>>>>>>>>>> > >>>>>>>>>>> > As background, previously one had to statically >>>>>>>>>>> declare all >>>>>>>>>>> > timers in your code. So if you wanted to have two >>>>>>>>>>> timers, >>>>>>>>>>> > you needed to create two timer variables and two >>>>>>>>>>> callbacks - >>>>>>>>>>> > one for each timer. A number of users kept hitting >>>>>>>>>>> stumbling >>>>>>>>>>> > blocks where they needed a dynamic set of timers >>>>>>>>>>> (often >>>>>>>>>>> > based on the element), which was not supported in >>>>>>>>>>> Beam. The >>>>>>>>>>> > workarounds were quite ugly and complicated. >>>>>>>>>>> > >>>>>>>>>>> > The new support allows declaring a TimerMap, which >>>>>>>>>>> is a map >>>>>>>>>>> > of timers. Each TimerMap is scoped by a family >>>>>>>>>>> name, so you >>>>>>>>>>> > can create multiple TimerMaps each with its own >>>>>>>>>>> callback. >>>>>>>>>>> > The use looks as follows: >>>>>>>>>>> > >>>>>>>>>>> > class MyDoFn extends DoFn<...> { >>>>>>>>>>> > @TimerFamily("timers") >>>>>>>>>>> > private final TimerSpec timerMap = >>>>>>>>>>> > TimerSpecs.timerMap(TimeDomain.EVENT_TIME); >>>>>>>>>>> > >>>>>>>>>>> > @ProcessElement >>>>>>>>>>> > public void process(@TimerFamily("timers") >>>>>>>>>>> TimerMap >>>>>>>>>>> > timers, @Element Type e) { >>>>>>>>>>> > timers.set("mainTimer", timestamp); >>>>>>>>>>> > timers.set("actionType" + >>>>>>>>>>> e.getActionType(), timestamp); >>>>>>>>>>> > } >>>>>>>>>>> > >>>>>>>>>>> > @OnTimerFamily . >>>>>>>>>>> > public void onTimer(@TimerId String timerId) { >>>>>>>>>>> > System.out.println("Timer fired. id: " + >>>>>>>>>>> timerId); >>>>>>>>>>> > } >>>>>>>>>>> > } >>>>>>>>>>> > >>>>>>>>>>> > This currently works for the Flink and the >>>>>>>>>>> Dataflow runners. >>>>>>>>>>> > >>>>>>>>>>> > Thank you Rehman for getting this done! Beam users >>>>>>>>>>> will find >>>>>>>>>>> > it very valuable. >>>>>>>>>>> > >>>>>>>>>>> > Reuven >>>>>>>>>>> > >>>>>>>>>>> >>>>>>>>>>
