Hey Jan,

Just saw your message since you posted right before I replied. What you
describe is precisely what I was experiencing. I also solved it the same
way, i.e. pushing back a newly set timer to the next bundle. Note that
there is no other way in portability because we can't fire timers once
we have closed the current bundle; we need to close the bundle to
receive all output which includes timers. It's definitely not efficient
but it appears that this behavior is even desired by some runners, e.g.
Dataflow.

Thanks,
Max

On 13.04.20 18:58, Jan Lukavský wrote:
> This is probably related to issue I was having with Direct runner and
> timer ordering. The problem is that there might be multiple timers (for
> given key) inside bundle and that each timer might set another timer. To
> ensure timer ordering, timers must be fired one at a time and when fired
> timer sets timer for time preceding current input watermark, the new
> timer and all remaining timers are pushed back to next bundle. That was
> the simplest yet efficient enough implementation for direct runner (see
> [1]), for different runners might exist better alternatives (e.g. what
> was discussed in [2]).
> 
> Jan
> 
> [1]
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java#L253
> 
> [2] https://github.com/apache/beam/pull/9190
> 
> On 4/13/20 6:01 PM, Reuven Lax wrote:
>> I'm not sure I understand the difference - do any "classic" runners
>> add new timers to the bundle? I know that at least the Dataflow runner
>> would end up the new timer in a new bundle.
>>
>> One thing we do need to ensure is that modifications to timers are
>> reflected in a bundle. So if a bundle contains a processElement and a
>> processTimer and the processElement modifies the timer, that should be
>> reflected in timer firing.
>>
>> On Mon, Apr 13, 2020 at 8:53 AM Luke Cwik <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>>     In non portable implementations you would have to wait till the
>>     element/timer was finished processing before you could process any
>>     newly created timers. Timers that are created with the same
>>     key+window+timer family overwrite existing timers that have been
>>     set which can lead to a timer being overwritten multiple times
>>     while an element/timer is being processed and you wouldn't want to
>>     process a newly created timer with the incorrect values or process
>>     a timer you shouldn't have.
>>
>>     In portable implementations, you can only safely say that element
>>     processing is done when the bundle completes. There could be value
>>     in exposing when an element is done since this could have usage in
>>     other parts of the system such as when a large GBK value is done.
>>
>>     On Mon, Apr 13, 2020 at 8:06 AM Maximilian Michels <[email protected]
>>     <mailto:[email protected]>> wrote:
>>
>>         Hi,
>>
>>         In the "classic" Java mode one can set timers which in turn
>>         can set
>>         timers which enables to create a timer loop, e.g.:
>>
>>         @ProcessElement
>>         public void processElement(
>>             ProcessContext context, @TimerId("timer") Timer timer) {
>>           // Initial timer
>>           timer.withOutputTimestamp(new
>>         Instant(0)).set(context.timestamp());
>>         }
>>
>>         @OnTimer("timer")
>>         public void onTimer(
>>             OnTimerContext context,
>>             @TimerId("timer") Timer timer) {
>>           // Trigger again and start endless loop
>>           timer.withOutputTimestamp(new
>>         Instant(0)).set(context.timestamp());
>>         }
>>
>>
>>         In portability, since we are only guaranteed to receive the
>>         timers at
>>         the end of a bundle when we flush the outputs and have closed the
>>         inputs, it looks like this behavior can only be supported by
>>         starting a
>>         new bundle and executing the deferred timers. This also means
>>         to hold
>>         back the watermark to allow for these loops. Plus, starting a
>>         new bundle
>>         comes with some cost.
>>
>>         Another possibility would be to allow a direct feedback loop,
>>         i.e. when
>>         the bundles closes and the timers fire, we can still set new
>>         timers.
>>
>>         I wonder do we want to allow a timer loop to execute within a
>>         bundle?
>>
>>         It would be possible to limit the number of iterations to
>>         perform during
>>         one bundle similar to how runners limit the number of elements
>>         or the
>>         duration of a bundle.
>>
>>         -Max
>>

Reply via email to