The Flink Runner was allowing to set a timer multiple times before we
made it comply with the Beam semantics of overwriting past invocations.
I wouldn't be surprised if the Spark Runner never addressed this. Flink
and Spark itself allow for a timer to be set to multiple times. In order
to fix this for Beam, the Flink Runner has to maintain a checkpointed
map which sits outside of its builtin TimerService.
As far as I can see, multiple timer families are currently not supported
in the Flink Runner due to the map not taking the family name into
account. This can be easily fixed though.
-Max
On 24.01.20 21:31, Reuven Lax wrote:
The new timer family is in the portability protos. I think TimerReceiver
needs to be updated to set it though (I think a 1-line change).
The TimerInternals class that runners implement today already handles
dynamic timers, so most of the work was in the Beam SDK to provide an
API that allows users to access this feature.
The main work needed in the runner was to take in account the timer
family. Beam semantics say that if a timer is set twice with the same
id, then the second timer overwrites the first. Several runners
therefore had maps from timer id -> timer. However since the
timer family scopes the timers, we now allow two timers with the same id
as long as the timer families are different. Runners had to be updated
to include the timer family id in the map keys.
Surprisingly, the new TimerMap tests seem to pass on Spark
ValidatesRunner, even though the Spark runner wasn't updated! I wonder
if this means that the Spark runner was incorrectly implementing the
Beam semantics before, and setTimer was not overwriting timers with the
same id?
Reuven
On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <ieme...@gmail.com
<mailto:ieme...@gmail.com>> wrote:
This looks great, thanks for the contribution Rehman!
I have some questions (note I have not looked at the code at all).
- Is this working for both portable and non portable runners?
- What do other runners need to implement to support this (e.g. Spark)?
Maybe worth to add this to the website Compatibility Matrix.
Regards,
Ismaël
On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
<rehman.murad...@venturedive.com
<mailto:rehman.murad...@venturedive.com>> wrote:
Thank you Reuven for the guidance throughout the development
process. I am delighted to contribute my two cents to the Beam
project.
Looking forward to more active contributions.
*
*
*Thanks & Regards____*
*Rehman Murad Ali*
Software Engineer
Mobile: +92 3452076766 <tel:+92%20345%202076766>
Skype: rehman.muradali
On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
Thanks to a lot of hard work by Rehman, Beam now supports
dynamic timers. As a reminder, this was discussed on the dev
list some time back.
As background, previously one had to statically declare all
timers in your code. So if you wanted to have two timers,
you needed to create two timer variables and two callbacks -
one for each timer. A number of users kept hitting stumbling
blocks where they needed a dynamic set of timers (often
based on the element), which was not supported in Beam. The
workarounds were quite ugly and complicated.
The new support allows declaring a TimerMap, which is a map
of timers. Each TimerMap is scoped by a family name, so you
can create multiple TimerMaps each with its own callback.
The use looks as follows:
class MyDoFn extends DoFn<...> {
@TimerFamily("timers")
private final TimerSpec timerMap =
TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(@TimerFamily("timers") TimerMap
timers, @Element Type e) {
timers.set("mainTimer", timestamp);
timers.set("actionType" + e.getActionType(), timestamp);
}
@OnTimerFamily .
public void onTimer(@TimerId String timerId) {
System.out.println("Timer fired. id: " + timerId);
}
}
This currently works for the Flink and the Dataflow runners.
Thank you Rehman for getting this done! Beam users will find
it very valuable.
Reuven