I had a discussion with Rehman last week and we discovered that the
TimersMap
related tests were not running for all runners because they were not tagged
as
part of the ValidatesRunner category. I opened a PR [1] to enable this, so
please someone help me with the review/merge.

I took a look just for curiosity and discovered that they are only passing
for
Direct runner and for the classic Flink runner in batch mode. They are not
passing for Dataflow [2][3] and for the Portable Flink runner, so probably
worth
to reopen the issue to investigate/fix.

[1] https://github.com/apache/beam/pull/10747
[2]
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
[3]
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/


On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> wrote:

> Yes. For now we exclude the flink runner, but fixing this should be fairly
> trivial.
>
> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]> wrote:
>
>> The Flink Runner was allowing to set a timer multiple times before we
>> made it comply with the Beam semantics of overwriting past invocations.
>> I wouldn't be surprised if the Spark Runner never addressed this. Flink
>> and Spark itself allow for a timer to be set to multiple times. In order
>> to fix this for Beam, the Flink Runner has to maintain a checkpointed
>> map which sits outside of its builtin TimerService.
>>
>> As far as I can see, multiple timer families are currently not supported
>> in the Flink Runner due to the map not taking the family name into
>> account. This can be easily fixed though.
>>
>> -Max
>>
>> On 24.01.20 21:31, Reuven Lax wrote:
>> > The new timer family is in the portability protos. I think
>> TimerReceiver
>> > needs to be updated to set it though (I think a 1-line change).
>> >
>> > The TimerInternals class that runners implement today already handles
>> > dynamic timers, so most of the work was in the Beam SDK  to provide an
>> > API that allows users to access this feature.
>> >
>> > The main work needed in the runner was to take in account the timer
>> > family. Beam semantics say that if a timer is set twice with the same
>> > id, then the second timer overwrites the first.  Several runners
>> > therefore had maps from timer id -> timer. However since the
>> > timer family scopes the timers, we now allow two timers with the same
>> id
>> > as long as the timer families are different. Runners had to be updated
>> > to include the timer family id in the map keys.
>> >
>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>> > ValidatesRunner, even though the Spark runner wasn't updated! I wonder
>> > if this means that the Spark runner was incorrectly implementing the
>> > Beam semantics before, and setTimer was not overwriting timers with the
>> > same id?
>> >
>> > Reuven
>> >
>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected]
>> > <mailto:[email protected]>> wrote:
>> >
>> >     This looks great, thanks for the contribution Rehman!
>> >
>> >     I have some questions (note I have not looked at the code at all).
>> >
>> >     - Is this working for both portable and non portable runners?
>> >     - What do other runners need to implement to support this (e.g.
>> Spark)?
>> >
>> >     Maybe worth to add this to the website Compatibility Matrix.
>> >
>> >     Regards,
>> >     Ismaël
>> >
>> >
>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>> >     <[email protected]
>> >     <mailto:[email protected]>> wrote:
>> >
>> >         Thank you Reuven for the guidance throughout the development
>> >         process. I am delighted to contribute my two cents to the Beam
>> >         project.
>> >
>> >         Looking forward to more active contributions.
>> >
>> >         *
>> >         *
>> >
>> >         *Thanks & Regards____*
>> >
>> >
>> >
>> >         *Rehman Murad Ali*
>> >         Software Engineer
>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>> <tel:+92%20345%202076766>
>> >         Skype: rehman.muradali
>> >
>> >
>> >
>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <[email protected]
>> >         <mailto:[email protected]>> wrote:
>> >
>> >             Thanks to a lot of hard work by Rehman, Beam now supports
>> >             dynamic timers. As a reminder, this was discussed on the dev
>> >             list some time back.
>> >
>> >             As background, previously one had to statically declare all
>> >             timers in your code. So if you wanted to have two timers,
>> >             you needed to create two timer variables and two callbacks -
>> >             one for each timer. A number of users kept hitting stumbling
>> >             blocks where they needed a dynamic set of timers (often
>> >             based on the element), which was not supported in Beam. The
>> >             workarounds were quite ugly and complicated.
>> >
>> >             The new support allows declaring a TimerMap, which is a map
>> >             of timers. Each TimerMap is scoped by a family name, so you
>> >             can create multiple TimerMaps each with its own callback.
>> >             The use looks as follows:
>> >
>> >             class MyDoFn extends DoFn<...> {
>> >                 @TimerFamily("timers")
>> >                 private final TimerSpec timerMap =
>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>> >
>> >                 @ProcessElement
>> >                  public void process(@TimerFamily("timers") TimerMap
>> >             timers, @Element Type e) {
>> >                     timers.set("mainTimer", timestamp);
>> >                     timers.set("actionType" + e.getActionType(),
>> timestamp);
>> >                 }
>> >
>> >                @OnTimerFamily .
>> >                public void onTimer(@TimerId String timerId) {
>> >                   System.out.println("Timer fired. id: " + timerId);
>> >                }
>> >             }
>> >
>> >             This currently works for the Flink and the Dataflow runners.
>> >
>> >             Thank you Rehman for getting this done! Beam users will find
>> >             it very valuable.
>> >
>> >             Reuven
>> >
>>
>

Reply via email to