On Mon, Apr 13, 2020 at 8:53 AM Luke Cwik <[email protected]> wrote:
> In non portable implementations you would have to wait till the element/timer 
> was finished processing before you could process any newly created timers. 
> Timers that are created with the same key+window+timer family overwrite 
> existing timers that have been set which can lead to a timer being 
> overwritten multiple times while an element/timer is being processed and you 
> wouldn't want to process a newly created timer with the incorrect values or 
> process a timer you shouldn't have.

Timer overwrites/updates/deletions are a good argument for waiting until
the bundle finishes.

On 13.04.20 18:01, Reuven Lax wrote:
> I'm not sure I understand the difference - do any "classic" runners add new 
> timers to the bundle? I know that at least the Dataflow runner would end up 
> the new timer in a new bundle.

Yes, the "classic" Flink Runner has a timer processing loop which runs
for as long as there are still timers applicable for firing. New timers
can be added and existing ones can be updated all within a bundle.

I wasn't aware that Dataflow only includes new timers in the next bundle.

> One thing we do need to ensure is that modifications to timers are reflected 
> in a bundle. So if a bundle contains a processElement and a processTimer and 
> the processElement modifies the timer, that should be reflected in timer 
> firing.

That's is the case for the Flink Runner.

-Max

On 13.04.20 18:01, Reuven Lax wrote:
> I'm not sure I understand the difference - do any "classic" runners add
> new timers to the bundle? I know that at least the Dataflow runner would
> end up the new timer in a new bundle.
> 
> One thing we do need to ensure is that modifications to timers are
> reflected in a bundle. So if a bundle contains a processElement and a
> processTimer and the processElement modifies the timer, that should be
> reflected in timer firing.
> 
> On Mon, Apr 13, 2020 at 8:53 AM Luke Cwik <[email protected]
> <mailto:[email protected]>> wrote:
> 
>     In non portable implementations you would have to wait till the
>     element/timer was finished processing before you could process any
>     newly created timers. Timers that are created with the same
>     key+window+timer family overwrite existing timers that have been set
>     which can lead to a timer being overwritten multiple times while an
>     element/timer is being processed and you wouldn't want to process a
>     newly created timer with the incorrect values or process a timer you
>     shouldn't have.
> 
>     In portable implementations, you can only safely say that element
>     processing is done when the bundle completes. There could be value
>     in exposing when an element is done since this could have usage in
>     other parts of the system such as when a large GBK value is done.
> 
>     On Mon, Apr 13, 2020 at 8:06 AM Maximilian Michels <[email protected]
>     <mailto:[email protected]>> wrote:
> 
>         Hi,
> 
>         In the "classic" Java mode one can set timers which in turn can set
>         timers which enables to create a timer loop, e.g.:
> 
>         @ProcessElement
>         public void processElement(
>             ProcessContext context, @TimerId("timer") Timer timer) {
>           // Initial timer
>           timer.withOutputTimestamp(new
>         Instant(0)).set(context.timestamp());
>         }
> 
>         @OnTimer("timer")
>         public void onTimer(
>             OnTimerContext context,
>             @TimerId("timer") Timer timer) {
>           // Trigger again and start endless loop
>           timer.withOutputTimestamp(new
>         Instant(0)).set(context.timestamp());
>         }
> 
> 
>         In portability, since we are only guaranteed to receive the
>         timers at
>         the end of a bundle when we flush the outputs and have closed the
>         inputs, it looks like this behavior can only be supported by
>         starting a
>         new bundle and executing the deferred timers. This also means to
>         hold
>         back the watermark to allow for these loops. Plus, starting a
>         new bundle
>         comes with some cost.
> 
>         Another possibility would be to allow a direct feedback loop,
>         i.e. when
>         the bundles closes and the timers fire, we can still set new timers.
> 
>         I wonder do we want to allow a timer loop to execute within a
>         bundle?
> 
>         It would be possible to limit the number of iterations to
>         perform during
>         one bundle similar to how runners limit the number of elements
>         or the
>         duration of a bundle.
> 
>         -Max
> 

Reply via email to