[ https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stas Levin closed BEAM-2859. ---------------------------- Resolution: Fixed Fix Version/s: 2.2.0 > ProcessingTime based timers are not properly fired in case the watermark > stays put > ---------------------------------------------------------------------------------- > > Key: BEAM-2859 > URL: https://issues.apache.org/jira/browse/BEAM-2859 > Project: Beam > Issue Type: Bug > Components: runner-spark > Affects Versions: 2.0.0, 2.1.0 > Reporter: Stas Levin > Assignee: Stas Levin > Fix For: 2.2.0 > > > {{AfterProcessingTime}} based timers are not fired when the input watermark > does not advance, preventing from buffered element to be emitted. > The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} > determines what triggers are ready to be processed based on the following > condition: > {code:java} > timer.getTimestamp().isBefore(inputWatermark) > {code} > However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position > of the input watermark should *NOT* have effect. > In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers > once they are deemed eligible for processing (but will not necessarily fire). > This may not be the correct behavior for timers in general and for timers in > the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain > scheduled until the corresponding window expires and all state is cleared. > For instance, consider a timer that is found eligible for processing and is > thus deleted, then it just so happens to be that its {{shouldFire()}} returns > {{false}} and it is not fired and needs to be re-run next time around, but > won't, since it's been deleted. The implied moral being that _"eligible for > processing"_ does not imply _"should be deleted"_. > It may be better to avoid removing timers in > {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management > up to {{ReduceFnRunner#clearAllState()}} which has more context to determine > whether it's time for a given timer to be deleted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)