FYI, this is now fixed for Dataflow. I also added better rejection so that
runners that don't support this feature will reject the pipeline.

On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <[email protected]> wrote:

> I took a look, and I think this was a simple bug. Testing a fix now.
>
> A larger question is how to support this in the portability layer. Right
> now portability assumes that each timer id corresponds to a logical input
> PCollection, but that assumption no longer works as we now support a
> dynamic set of timers, each with their own id. We could instead model each
> timer family as a PColleciton, but the FnApiRunner would need to
> dynamically get the timer id in order to invoke it, and today it statically
> reads the timer id from the PCollection name.
>
> Reuven
>
> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <[email protected]> wrote:
>
>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests
>> indeed never ran on any runner except for the DirectRunner, which is
>> something I should've noticed in the code review.
>>
>> Reuven
>>
>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <[email protected]> wrote:
>>
>>> I had a discussion with Rehman last week and we discovered that the
>>> TimersMap
>>> related tests were not running for all runners because they were not
>>> tagged as
>>> part of the ValidatesRunner category. I opened a PR [1] to enable this,
>>> so
>>> please someone help me with the review/merge.
>>>
>>> I took a look just for curiosity and discovered that they are only
>>> passing for
>>> Direct runner and for the classic Flink runner in batch mode. They are
>>> not
>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>> probably worth
>>> to reopen the issue to investigate/fix.
>>>
>>> [1] https://github.com/apache/beam/pull/10747
>>> [2]
>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>> [3]
>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>
>>>
>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]> wrote:
>>>
>>>> Yes. For now we exclude the flink runner, but fixing this should be
>>>> fairly trivial.
>>>>
>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <[email protected]>
>>>> wrote:
>>>>
>>>>> The Flink Runner was allowing to set a timer multiple times before we
>>>>> made it comply with the Beam semantics of overwriting past
>>>>> invocations.
>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>> Flink
>>>>> and Spark itself allow for a timer to be set to multiple times. In
>>>>> order
>>>>> to fix this for Beam, the Flink Runner has to maintain a checkpointed
>>>>> map which sits outside of its builtin TimerService.
>>>>>
>>>>> As far as I can see, multiple timer families are currently not
>>>>> supported
>>>>> in the Flink Runner due to the map not taking the family name into
>>>>> account. This can be easily fixed though.
>>>>>
>>>>> -Max
>>>>>
>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>> > The new timer family is in the portability protos. I think
>>>>> TimerReceiver
>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>> >
>>>>> > The TimerInternals class that runners implement today already
>>>>> handles
>>>>> > dynamic timers, so most of the work was in the Beam SDK  to provide
>>>>> an
>>>>> > API that allows users to access this feature.
>>>>> >
>>>>> > The main work needed in the runner was to take in account the timer
>>>>> > family. Beam semantics say that if a timer is set twice with the
>>>>> same
>>>>> > id, then the second timer overwrites the first.  Several runners
>>>>> > therefore had maps from timer id -> timer. However since the
>>>>> > timer family scopes the timers, we now allow two timers with the
>>>>> same id
>>>>> > as long as the timer families are different. Runners had to be
>>>>> updated
>>>>> > to include the timer family id in the map keys.
>>>>> >
>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>> wonder
>>>>> > if this means that the Spark runner was incorrectly implementing the
>>>>> > Beam semantics before, and setTimer was not overwriting timers with
>>>>> the
>>>>> > same id?
>>>>> >
>>>>> > Reuven
>>>>> >
>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <[email protected]
>>>>> > <mailto:[email protected]>> wrote:
>>>>> >
>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>> >
>>>>> >     I have some questions (note I have not looked at the code at
>>>>> all).
>>>>> >
>>>>> >     - Is this working for both portable and non portable runners?
>>>>> >     - What do other runners need to implement to support this (e.g.
>>>>> Spark)?
>>>>> >
>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>> >
>>>>> >     Regards,
>>>>> >     Ismaël
>>>>> >
>>>>> >
>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>> >     <[email protected]
>>>>> >     <mailto:[email protected]>> wrote:
>>>>> >
>>>>> >         Thank you Reuven for the guidance throughout the development
>>>>> >         process. I am delighted to contribute my two cents to the
>>>>> Beam
>>>>> >         project.
>>>>> >
>>>>> >         Looking forward to more active contributions.
>>>>> >
>>>>> >         *
>>>>> >         *
>>>>> >
>>>>> >         *Thanks & Regards____*
>>>>> >
>>>>> >
>>>>> >
>>>>> >         *Rehman Murad Ali*
>>>>> >         Software Engineer
>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>> <tel:+92%20345%202076766>
>>>>> >         Skype: rehman.muradali
>>>>> >
>>>>> >
>>>>> >
>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>> [email protected]
>>>>> >         <mailto:[email protected]>> wrote:
>>>>> >
>>>>> >             Thanks to a lot of hard work by Rehman, Beam now supports
>>>>> >             dynamic timers. As a reminder, this was discussed on the
>>>>> dev
>>>>> >             list some time back.
>>>>> >
>>>>> >             As background, previously one had to statically declare
>>>>> all
>>>>> >             timers in your code. So if you wanted to have two timers,
>>>>> >             you needed to create two timer variables and two
>>>>> callbacks -
>>>>> >             one for each timer. A number of users kept hitting
>>>>> stumbling
>>>>> >             blocks where they needed a dynamic set of timers (often
>>>>> >             based on the element), which was not supported in Beam.
>>>>> The
>>>>> >             workarounds were quite ugly and complicated.
>>>>> >
>>>>> >             The new support allows declaring a TimerMap, which is a
>>>>> map
>>>>> >             of timers. Each TimerMap is scoped by a family name, so
>>>>> you
>>>>> >             can create multiple TimerMaps each with its own callback.
>>>>> >             The use looks as follows:
>>>>> >
>>>>> >             class MyDoFn extends DoFn<...> {
>>>>> >                 @TimerFamily("timers")
>>>>> >                 private final TimerSpec timerMap =
>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>> >
>>>>> >                 @ProcessElement
>>>>> >                  public void process(@TimerFamily("timers") TimerMap
>>>>> >             timers, @Element Type e) {
>>>>> >                     timers.set("mainTimer", timestamp);
>>>>> >                     timers.set("actionType" + e.getActionType(),
>>>>> timestamp);
>>>>> >                 }
>>>>> >
>>>>> >                @OnTimerFamily .
>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>> >                   System.out.println("Timer fired. id: " + timerId);
>>>>> >                }
>>>>> >             }
>>>>> >
>>>>> >             This currently works for the Flink and the Dataflow
>>>>> runners.
>>>>> >
>>>>> >             Thank you Rehman for getting this done! Beam users will
>>>>> find
>>>>> >             it very valuable.
>>>>> >
>>>>> >             Reuven
>>>>> >
>>>>>
>>>>

Reply via email to