In non portable implementations you would have to wait till the element/timer was finished processing before you could process any newly created timers. Timers that are created with the same key+window+timer family overwrite existing timers that have been set which can lead to a timer being overwritten multiple times while an element/timer is being processed and you wouldn't want to process a newly created timer with the incorrect values or process a timer you shouldn't have.
In portable implementations, you can only safely say that element processing is done when the bundle completes. There could be value in exposing when an element is done since this could have usage in other parts of the system such as when a large GBK value is done. On Mon, Apr 13, 2020 at 8:06 AM Maximilian Michels <[email protected]> wrote: > Hi, > > In the "classic" Java mode one can set timers which in turn can set > timers which enables to create a timer loop, e.g.: > > @ProcessElement > public void processElement( > ProcessContext context, @TimerId("timer") Timer timer) { > // Initial timer > timer.withOutputTimestamp(new Instant(0)).set(context.timestamp()); > } > > @OnTimer("timer") > public void onTimer( > OnTimerContext context, > @TimerId("timer") Timer timer) { > // Trigger again and start endless loop > timer.withOutputTimestamp(new Instant(0)).set(context.timestamp()); > } > > > In portability, since we are only guaranteed to receive the timers at > the end of a bundle when we flush the outputs and have closed the > inputs, it looks like this behavior can only be supported by starting a > new bundle and executing the deferred timers. This also means to hold > back the watermark to allow for these loops. Plus, starting a new bundle > comes with some cost. > > Another possibility would be to allow a direct feedback loop, i.e. when > the bundles closes and the timers fire, we can still set new timers. > > I wonder do we want to allow a timer loop to execute within a bundle? > > It would be possible to limit the number of iterations to perform during > one bundle similar to how runners limit the number of elements or the > duration of a bundle. > > -Max >
