On Mon, Apr 13, 2020 at 8:53 AM Luke Cwik <[email protected]> wrote: > In non portable implementations you would have to wait till the element/timer > was finished processing before you could process any newly created timers. > Timers that are created with the same key+window+timer family overwrite > existing timers that have been set which can lead to a timer being > overwritten multiple times while an element/timer is being processed and you > wouldn't want to process a newly created timer with the incorrect values or > process a timer you shouldn't have.
Timer overwrites/updates/deletions are a good argument for waiting until the bundle finishes. On 13.04.20 18:01, Reuven Lax wrote: > I'm not sure I understand the difference - do any "classic" runners add new > timers to the bundle? I know that at least the Dataflow runner would end up > the new timer in a new bundle. Yes, the "classic" Flink Runner has a timer processing loop which runs for as long as there are still timers applicable for firing. New timers can be added and existing ones can be updated all within a bundle. I wasn't aware that Dataflow only includes new timers in the next bundle. > One thing we do need to ensure is that modifications to timers are reflected > in a bundle. So if a bundle contains a processElement and a processTimer and > the processElement modifies the timer, that should be reflected in timer > firing. That's is the case for the Flink Runner. -Max On 13.04.20 18:01, Reuven Lax wrote: > I'm not sure I understand the difference - do any "classic" runners add > new timers to the bundle? I know that at least the Dataflow runner would > end up the new timer in a new bundle. > > One thing we do need to ensure is that modifications to timers are > reflected in a bundle. So if a bundle contains a processElement and a > processTimer and the processElement modifies the timer, that should be > reflected in timer firing. > > On Mon, Apr 13, 2020 at 8:53 AM Luke Cwik <[email protected] > <mailto:[email protected]>> wrote: > > In non portable implementations you would have to wait till the > element/timer was finished processing before you could process any > newly created timers. Timers that are created with the same > key+window+timer family overwrite existing timers that have been set > which can lead to a timer being overwritten multiple times while an > element/timer is being processed and you wouldn't want to process a > newly created timer with the incorrect values or process a timer you > shouldn't have. > > In portable implementations, you can only safely say that element > processing is done when the bundle completes. There could be value > in exposing when an element is done since this could have usage in > other parts of the system such as when a large GBK value is done. > > On Mon, Apr 13, 2020 at 8:06 AM Maximilian Michels <[email protected] > <mailto:[email protected]>> wrote: > > Hi, > > In the "classic" Java mode one can set timers which in turn can set > timers which enables to create a timer loop, e.g.: > > @ProcessElement > public void processElement( > ProcessContext context, @TimerId("timer") Timer timer) { > // Initial timer > timer.withOutputTimestamp(new > Instant(0)).set(context.timestamp()); > } > > @OnTimer("timer") > public void onTimer( > OnTimerContext context, > @TimerId("timer") Timer timer) { > // Trigger again and start endless loop > timer.withOutputTimestamp(new > Instant(0)).set(context.timestamp()); > } > > > In portability, since we are only guaranteed to receive the > timers at > the end of a bundle when we flush the outputs and have closed the > inputs, it looks like this behavior can only be supported by > starting a > new bundle and executing the deferred timers. This also means to > hold > back the watermark to allow for these loops. Plus, starting a > new bundle > comes with some cost. > > Another possibility would be to allow a direct feedback loop, > i.e. when > the bundles closes and the timers fire, we can still set new timers. > > I wonder do we want to allow a timer loop to execute within a > bundle? > > It would be possible to limit the number of iterations to > perform during > one bundle similar to how runners limit the number of elements > or the > duration of a bundle. > > -Max >
