Thanks for finding this. Hopefully the bug is easy .to fix. The tests
indeed never ran on any runner except for the DirectRunner, which is
something I should've noticed in the code review.

Reuven

On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <[email protected]> wrote:

> I had a discussion with Rehman last week and we discovered that the
> TimersMap
> related tests were not running for all runners because they were not
> tagged as
> part of the ValidatesRunner category. I opened a PR [1] to enable this, so
> please someone help me with the review/merge.
>
> I took a look just for curiosity and discovered that they are only passing
> for
> Direct runner and for the classic Flink runner in batch mode. They are not
> passing for Dataflow [2][3] and for the Portable Flink runner, so probably
> worth
> to reopen the issue to investigate/fix.
>
> [1] https://github.com/apache/beam/pull/10747
> [2]
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
> [3]
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>
>
> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> wrote:
>
>> Yes. For now we exclude the flink runner, but fixing this should be
>> fairly trivial.
>>
>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]>
>> wrote:
>>
>>> The Flink Runner was allowing to set a timer multiple times before we
>>> made it comply with the Beam semantics of overwriting past invocations.
>>> I wouldn't be surprised if the Spark Runner never addressed this. Flink
>>> and Spark itself allow for a timer to be set to multiple times. In order
>>> to fix this for Beam, the Flink Runner has to maintain a checkpointed
>>> map which sits outside of its builtin TimerService.
>>>
>>> As far as I can see, multiple timer families are currently not supported
>>> in the Flink Runner due to the map not taking the family name into
>>> account. This can be easily fixed though.
>>>
>>> -Max
>>>
>>> On 24.01.20 21:31, Reuven Lax wrote:
>>> > The new timer family is in the portability protos. I think
>>> TimerReceiver
>>> > needs to be updated to set it though (I think a 1-line change).
>>> >
>>> > The TimerInternals class that runners implement today already handles
>>> > dynamic timers, so most of the work was in the Beam SDK  to provide an
>>> > API that allows users to access this feature.
>>> >
>>> > The main work needed in the runner was to take in account the timer
>>> > family. Beam semantics say that if a timer is set twice with the same
>>> > id, then the second timer overwrites the first.  Several runners
>>> > therefore had maps from timer id -> timer. However since the
>>> > timer family scopes the timers, we now allow two timers with the same
>>> id
>>> > as long as the timer families are different. Runners had to be updated
>>> > to include the timer family id in the map keys.
>>> >
>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>> > ValidatesRunner, even though the Spark runner wasn't updated! I wonder
>>> > if this means that the Spark runner was incorrectly implementing the
>>> > Beam semantics before, and setTimer was not overwriting timers with
>>> the
>>> > same id?
>>> >
>>> > Reuven
>>> >
>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected]
>>> > <mailto:[email protected]>> wrote:
>>> >
>>> >     This looks great, thanks for the contribution Rehman!
>>> >
>>> >     I have some questions (note I have not looked at the code at all).
>>> >
>>> >     - Is this working for both portable and non portable runners?
>>> >     - What do other runners need to implement to support this (e.g.
>>> Spark)?
>>> >
>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>> >
>>> >     Regards,
>>> >     Ismaël
>>> >
>>> >
>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>> >     <[email protected]
>>> >     <mailto:[email protected]>> wrote:
>>> >
>>> >         Thank you Reuven for the guidance throughout the development
>>> >         process. I am delighted to contribute my two cents to the Beam
>>> >         project.
>>> >
>>> >         Looking forward to more active contributions.
>>> >
>>> >         *
>>> >         *
>>> >
>>> >         *Thanks & Regards____*
>>> >
>>> >
>>> >
>>> >         *Rehman Murad Ali*
>>> >         Software Engineer
>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>> <tel:+92%20345%202076766>
>>> >         Skype: rehman.muradali
>>> >
>>> >
>>> >
>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <[email protected]
>>> >         <mailto:[email protected]>> wrote:
>>> >
>>> >             Thanks to a lot of hard work by Rehman, Beam now supports
>>> >             dynamic timers. As a reminder, this was discussed on the
>>> dev
>>> >             list some time back.
>>> >
>>> >             As background, previously one had to statically declare all
>>> >             timers in your code. So if you wanted to have two timers,
>>> >             you needed to create two timer variables and two callbacks
>>> -
>>> >             one for each timer. A number of users kept hitting
>>> stumbling
>>> >             blocks where they needed a dynamic set of timers (often
>>> >             based on the element), which was not supported in Beam. The
>>> >             workarounds were quite ugly and complicated.
>>> >
>>> >             The new support allows declaring a TimerMap, which is a map
>>> >             of timers. Each TimerMap is scoped by a family name, so you
>>> >             can create multiple TimerMaps each with its own callback.
>>> >             The use looks as follows:
>>> >
>>> >             class MyDoFn extends DoFn<...> {
>>> >                 @TimerFamily("timers")
>>> >                 private final TimerSpec timerMap =
>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>> >
>>> >                 @ProcessElement
>>> >                  public void process(@TimerFamily("timers") TimerMap
>>> >             timers, @Element Type e) {
>>> >                     timers.set("mainTimer", timestamp);
>>> >                     timers.set("actionType" + e.getActionType(),
>>> timestamp);
>>> >                 }
>>> >
>>> >                @OnTimerFamily .
>>> >                public void onTimer(@TimerId String timerId) {
>>> >                   System.out.println("Timer fired. id: " + timerId);
>>> >                }
>>> >             }
>>> >
>>> >             This currently works for the Flink and the Dataflow
>>> runners.
>>> >
>>> >             Thank you Rehman for getting this done! Beam users will
>>> find
>>> >             it very valuable.
>>> >
>>> >             Reuven
>>> >
>>>
>>

Reply via email to