Apparently the fix of Dynamic Timers for Dataflow broke the ValidatesRunner
tests for Flink in batch mode that were passing before.
Can you please take a look Reuven or Rehman.
Tests are failing since the exact commit for the fix:
7719708a04d5d0eff3048dbd58ac1337889f8ba5
For details on the exception:
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/


On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <ieme...@gmail.com> wrote:

> Great to know you get it working on Dataflow easily Reuven. As a new
> feature it
> looks great!
>
> Agree with Kenn maybe worth to open a new thread to discuss the changes
> still
> needed to support this in portable runners.
>
> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> I think the (lack of) portability bit may have been buried in this
>> thread. Maybe a new thread about the design for that?
>>
>> Kenn
>>
>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote:
>>
>>> FYI, this is now fixed for Dataflow. I also added better rejection so
>>> that runners that don't support this feature will reject the pipeline.
>>>
>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>>
>>>> A larger question is how to support this in the portability layer.
>>>> Right now portability assumes that each timer id corresponds to a logical
>>>> input PCollection, but that assumption no longer works as we now support a
>>>> dynamic set of timers, each with their own id. We could instead model each
>>>> timer family as a PColleciton, but the FnApiRunner would need to
>>>> dynamically get the timer id in order to invoke it, and today it statically
>>>> reads the timer id from the PCollection name.
>>>>
>>>> Reuven
>>>>
>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests
>>>>> indeed never ran on any runner except for the DirectRunner, which is
>>>>> something I should've noticed in the code review.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ieme...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I had a discussion with Rehman last week and we discovered that the
>>>>>> TimersMap
>>>>>> related tests were not running for all runners because they were not
>>>>>> tagged as
>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>>> this, so
>>>>>> please someone help me with the review/merge.
>>>>>>
>>>>>> I took a look just for curiosity and discovered that they are only
>>>>>> passing for
>>>>>> Direct runner and for the classic Flink runner in batch mode. They
>>>>>> are not
>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>>> probably worth
>>>>>> to reopen the issue to investigate/fix.
>>>>>>
>>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>>> [2]
>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>>> [3]
>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>>
>>>>>>
>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Yes. For now we exclude the flink runner, but fixing this should be
>>>>>>> fairly trivial.
>>>>>>>
>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <m...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> The Flink Runner was allowing to set a timer multiple times before
>>>>>>>> we
>>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>>> invocations.
>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>>>>> Flink
>>>>>>>> and Spark itself allow for a timer to be set to multiple times. In
>>>>>>>> order
>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>>> checkpointed
>>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>>
>>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>>> supported
>>>>>>>> in the Flink Runner due to the map not taking the family name into
>>>>>>>> account. This can be easily fixed though.
>>>>>>>>
>>>>>>>> -Max
>>>>>>>>
>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>>> TimerReceiver
>>>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>>>> >
>>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>>> handles
>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>>> provide an
>>>>>>>> > API that allows users to access this feature.
>>>>>>>> >
>>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>>> timer
>>>>>>>> > family. Beam semantics say that if a timer is set twice with the
>>>>>>>> same
>>>>>>>> > id, then the second timer overwrites the first.  Several runners
>>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>>> > timer family scopes the timers, we now allow two timers with the
>>>>>>>> same id
>>>>>>>> > as long as the timer families are different. Runners had to be
>>>>>>>> updated
>>>>>>>> > to include the timer family id in the map keys.
>>>>>>>> >
>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>>>>> wonder
>>>>>>>> > if this means that the Spark runner was incorrectly implementing
>>>>>>>> the
>>>>>>>> > Beam semantics before, and setTimer was not overwriting timers
>>>>>>>> with the
>>>>>>>> > same id?
>>>>>>>> >
>>>>>>>> > Reuven
>>>>>>>> >
>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <ieme...@gmail.com
>>>>>>>> > <mailto:ieme...@gmail.com>> wrote:
>>>>>>>> >
>>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>>> >
>>>>>>>> >     I have some questions (note I have not looked at the code at
>>>>>>>> all).
>>>>>>>> >
>>>>>>>> >     - Is this working for both portable and non portable runners?
>>>>>>>> >     - What do other runners need to implement to support this
>>>>>>>> (e.g. Spark)?
>>>>>>>> >
>>>>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>>>>> >
>>>>>>>> >     Regards,
>>>>>>>> >     Ismaël
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>>> >     <rehman.murad...@venturedive.com
>>>>>>>> >     <mailto:rehman.murad...@venturedive.com>> wrote:
>>>>>>>> >
>>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>>> development
>>>>>>>> >         process. I am delighted to contribute my two cents to the
>>>>>>>> Beam
>>>>>>>> >         project.
>>>>>>>> >
>>>>>>>> >         Looking forward to more active contributions.
>>>>>>>> >
>>>>>>>> >         *
>>>>>>>> >         *
>>>>>>>> >
>>>>>>>> >         *Thanks & Regards____*
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >         *Rehman Murad Ali*
>>>>>>>> >         Software Engineer
>>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>>> <tel:+92%20345%202076766>
>>>>>>>> >         Skype: rehman.muradali
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>>> re...@google.com
>>>>>>>> >         <mailto:re...@google.com>> wrote:
>>>>>>>> >
>>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>>> supports
>>>>>>>> >             dynamic timers. As a reminder, this was discussed on
>>>>>>>> the dev
>>>>>>>> >             list some time back.
>>>>>>>> >
>>>>>>>> >             As background, previously one had to statically
>>>>>>>> declare all
>>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>>> timers,
>>>>>>>> >             you needed to create two timer variables and two
>>>>>>>> callbacks -
>>>>>>>> >             one for each timer. A number of users kept hitting
>>>>>>>> stumbling
>>>>>>>> >             blocks where they needed a dynamic set of timers
>>>>>>>> (often
>>>>>>>> >             based on the element), which was not supported in
>>>>>>>> Beam. The
>>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>>> >
>>>>>>>> >             The new support allows declaring a TimerMap, which is
>>>>>>>> a map
>>>>>>>> >             of timers. Each TimerMap is scoped by a family name,
>>>>>>>> so you
>>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>>> callback.
>>>>>>>> >             The use looks as follows:
>>>>>>>> >
>>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>>> >                 @TimerFamily("timers")
>>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>>> >
>>>>>>>> >                 @ProcessElement
>>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>>> TimerMap
>>>>>>>> >             timers, @Element Type e) {
>>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>>> >                     timers.set("actionType" + e.getActionType(),
>>>>>>>> timestamp);
>>>>>>>> >                 }
>>>>>>>> >
>>>>>>>> >                @OnTimerFamily .
>>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>>> timerId);
>>>>>>>> >                }
>>>>>>>> >             }
>>>>>>>> >
>>>>>>>> >             This currently works for the Flink and the Dataflow
>>>>>>>> runners.
>>>>>>>> >
>>>>>>>> >             Thank you Rehman for getting this done! Beam users
>>>>>>>> will find
>>>>>>>> >             it very valuable.
>>>>>>>> >
>>>>>>>> >             Reuven
>>>>>>>> >
>>>>>>>>
>>>>>>>

Reply via email to