I think that this approach breaks the assumption that bundles are executed as immutable pieces of work. This way, runners would have to update the runner while executing it. It is another possible option, but seems to have issues of its own.

On 6/26/19 12:28 PM, Robert Bradshaw wrote:
Another option, that is nice from an API perspective but places a
burden on SDK implementers (and possibly runners), is to maintain the
ordering of timers by requiring timers to be fired in order, and if
any timers are set to fire them immediately before processing later
timers. In other words, if T1 sets T2 and modifies T3, these would
take effect (locally, the runner may not even know about T2) before T3
was processed.

On Wed, Jun 26, 2019 at 11:13 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,

I have mentioned an issue I have come across [1] on several other
threads, but it probably didn't attract the attention that it would desire.

I will try to restate the problem here for clarity:

   - on runners that use concept of bundles (the original issue mentions
DirectRunner, but it will probably apply for other runners, which use
bundles, as well), the workflow is as follows:

    a) process elements in bundle

    b) advance watermark

    c) process timers

    d) continue to next bundle

   - the issue with this is that when we are initially at time T0, set
two timers for T1 and T3, then advance watermark to T3 (or beyond), the
timers will fire (correctly) in order T1, T3, but if timer at T1 sets
another timer for T2, then this timer will be fired in next bundle (and
therefore after T3)

   - this causes issues mostly with race conditions in window GC timers
and user timers (and users don't have any way to solve that!)

   - note that the same applies when one timer tries to reset timer that
is already in the current bundle

I have investigated a way of solving this by running timers only for
single timestamp (instant) at each bundle, but as Reuven pointed out,
that could regress performance (mostly by delaying firing of timers,
that could have fired). Options I see:

   1) either set the OnTimerContext#timestamp() to current input
watermark (not the time that user actually set the timer), or

   2) add OnTimerContext#getCurrentInputWatermark() and disallow setting
(or resetting) timers for time between OnProcessContext#timestamp and
OnProcessContext#getCurrentInputWatermark(), by throwing an exception

   3) any other option?

Option 1) seems to be broken by design, as it can result in corrupt data
(emitted with wrong timestamp, which is even somewhat arbitrary), I'm
including it just for completeness. Option 2) is breaking change, that
can result in PIpeline failures (although the failures will happen on
Pipelines, that are probably already broken).

Although I have come with a workaround in the work where I originally
come across this issue, I think that this is generally serious and
should be dealt with. Mostly because when using user-facing APIs, there
are no workarounds possible, today.

Thanks for discussion!

Jan

[1] https://issues.apache.org/jira/browse/BEAM-7520

Reply via email to