I believe that timers correspond to watermark holds,
which hold up the output watermark, not the input watermark.
On Thu, Jun 27, 2019 at 11:21 PM Lukasz Cwik
<[email protected] <mailto:[email protected]>> wrote:
I'm confused as to why it is valid to advance the
watermark to T3 in the original scenario.
T1 and T2 should be treated as inputs to the
function and hold the input watermark hence T1
should fire and if it doesn't produce any new timers
before T2, then T2 should fire since the watermark
will now advance to T2. The only time you would have
multiple watermark timers fire as part of the same
bundle is if they were distinct timers both set to
the same time.
I have some examples[1] documented in the modelling,
scheduling, and executing timers doc.
1:
https://docs.google.com/document/d/1GRL88rKLHbMR0zJnBHYwM4xtj66VYlB112EWVUFcGB0/edit#heading=h.fzptl5h0vi9k
On Wed, Jun 26, 2019 at 6:40 AM Reuven Lax
<[email protected] <mailto:[email protected]>> wrote:
Earlier than the input watermark only applies to
event time timers, but the above problem holds
for processing time timers as well.
On Wed, Jun 26, 2019, 1:50 PM Robert Bradshaw
<[email protected]
<mailto:[email protected]>> wrote:
Yeah, it wouldn't be optimal
performance-wise, but I think it's good
to keep the bar for a correct SDK low. Might
still be better than
sending one timer per bundle, and you only
pay the performance if
timers are set earlier than the input
watermark (and there was a timer
firing in this range). (How often this
happens probably varies a lot
in practice.)
On Wed, Jun 26, 2019 at 2:33 PM Reuven Lax
<[email protected] <mailto:[email protected]>>
wrote:
>
> This would have a lot of performance
problems (especially since there is user
code that caches within a bundle, and
invalidates the cache at the end of every
bundle). However this would be a valid
"lazy" implementation.
>
> On Wed, Jun 26, 2019 at 2:29 PM Robert
Bradshaw <[email protected]
<mailto:[email protected]>> wrote:
>>
>> Note also that a "lazy" SDK
implementation would be to simply return
>> all the timers (as if they were new
timers) to runner once a timer set
>> (before or at the last requested timer in
the bundle) is encountered.
>> E.g. Suppose we had timers T1, T3, T5 in
the bundle. On firing T1, we
>> set T2 and delete T3. The SDK could then
claim that a timers were
>> (again) set at T3, T5, then set one at at
T2 and deleted at T3 and
>> then be done with the bundle (not
actually process T3 and T5). (One
>> way to think about this is that timers
are actually bundle splits into
>> a bundle of "done" and "future" work.) A
more intelligent SDK could,
>> of course, process the whole bundle by
tracking modifications to the
>> to-be-fired timers itself rather than
requiring a trip through the
>> runner.
>>
>> On Wed, Jun 26, 2019 at 1:51 PM Reuven
Lax <[email protected]
<mailto:[email protected]>> wrote:
>> >
>> > I like this option the best. It might
be trickier to implement, but seems like it
would be the most consistent solution.
>> >
>> > Another problem it would solve is the
following: let's say a bundle arrives
containing timers T1 and T2, and while
processing T1 the user code deletes T2 (or
resets it to a time in the far future). I'm
actually not sure what we do today, but I'm
a bit afraid that we will go ahead and fire
T2 since it's already in the bundle, which
is clearly incorrect. The SDK needs to keep
track of this and skip T2 in order to solve
this, which is the same sort of work needed
to implement Robert's suggestion.
>> >
>> > Reuven
>> >
>> > On Wed, Jun 26, 2019 at 12:28 PM Robert
Bradshaw <[email protected]
<mailto:[email protected]>> wrote:
>> >>
>> >> Another option, that is nice from an
API perspective but places a
>> >> burden on SDK implementers (and
possibly runners), is to maintain the
>> >> ordering of timers by requiring timers
to be fired in order, and if
>> >> any timers are set to fire them
immediately before processing later
>> >> timers. In other words, if T1 sets T2
and modifies T3, these would
>> >> take effect (locally, the runner may
not even know about T2) before T3
>> >> was processed.
>> >>
>> >> On Wed, Jun 26, 2019 at 11:13 AM Jan
Lukavský <[email protected]
<mailto:[email protected]>> wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I have mentioned an issue I have
come across [1] on several other
>> >> > threads, but it probably didn't
attract the attention that it would desire.
>> >> >
>> >> > I will try to restate the problem
here for clarity:
>> >> >
>> >> > - on runners that use concept of
bundles (the original issue mentions
>> >> > DirectRunner, but it will probably
apply for other runners, which use
>> >> > bundles, as well), the workflow is
as follows:
>> >> >
>> >> > a) process elements in bundle
>> >> >
>> >> > b) advance watermark
>> >> >
>> >> > c) process timers
>> >> >
>> >> > d) continue to next bundle
>> >> >
>> >> > - the issue with this is that when
we are initially at time T0, set
>> >> > two timers for T1 and T3, then
advance watermark to T3 (or beyond), the
>> >> > timers will fire (correctly) in
order T1, T3, but if timer at T1 sets
>> >> > another timer for T2, then this
timer will be fired in next bundle (and
>> >> > therefore after T3)
>> >> >
>> >> > - this causes issues mostly with
race conditions in window GC timers
>> >> > and user timers (and users don't
have any way to solve that!)
>> >> >
>> >> > - note that the same applies when
one timer tries to reset timer that
>> >> > is already in the current bundle
>> >> >
>> >> > I have investigated a way of solving
this by running timers only for
>> >> > single timestamp (instant) at each
bundle, but as Reuven pointed out,
>> >> > that could regress performance
(mostly by delaying firing of timers,
>> >> > that could have fired). Options I see:
>> >> >
>> >> > 1) either set the
OnTimerContext#timestamp() to current input
>> >> > watermark (not the time that user
actually set the timer), or
>> >> >
>> >> > 2) add
OnTimerContext#getCurrentInputWatermark()
and disallow setting
>> >> > (or resetting) timers for time
between OnProcessContext#timestamp and
>> >> >
OnProcessContext#getCurrentInputWatermark(),
by throwing an exception
>> >> >
>> >> > 3) any other option?
>> >> >
>> >> > Option 1) seems to be broken by
design, as it can result in corrupt data
>> >> > (emitted with wrong timestamp, which
is even somewhat arbitrary), I'm
>> >> > including it just for completeness.
Option 2) is breaking change, that
>> >> > can result in PIpeline failures
(although the failures will happen on
>> >> > Pipelines, that are probably already
broken).
>> >> >
>> >> > Although I have come with a
workaround in the work where I originally
>> >> > come across this issue, I think that
this is generally serious and
>> >> > should be dealt with. Mostly because
when using user-facing APIs, there
>> >> > are no workarounds possible, today.
>> >> >
>> >> > Thanks for discussion!
>> >> >
>> >> > Jan
>> >> >
>> >> > [1]
https://issues.apache.org/jira/browse/BEAM-7520
>> >> >