Stas Levin created BEAM-2859:
--------------------------------

             Summary: Processing time based timers are not properly fired in 
case the watermark stays put
                 Key: BEAM-2859
                 URL: https://issues.apache.org/jira/browse/BEAM-2859
             Project: Beam
          Issue Type: Bug
          Components: runner-spark
    Affects Versions: 2.1.0
            Reporter: Stas Levin
            Assignee: Amit Sela


{{AfterProcessingTime}} based timers are not fired when the input watermark 
does not advance, preventing from buffered element to be emitted.

The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} 
determines what triggers are ready to be processed by using the following 
condition: 

{code:java}
timer.getTimestamp().isBefore(inputWatermark)
{code}

However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of 
the input watermark should *NOT* have effect.

In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers 
once they are deemed eligible for processing (but will not necessarily fire). 
This may not be the correct behavior for timers in general and for timers in 
the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain 
scheduled until the corresponding window expires and all state is cleared.
For instance, consider a timer that is found eligible for processing is thus 
deleted, then it just so happens to be that its {{shouldFire()}} returns 
{{false}} and it needs to be re-run next time around, but won't since it's been 
deleted.

It may be better to avoid removing timers in 
{{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up 
to {{ReduceFnRunner#clearAllState()}} which has more context to determine 
whether it's time for a given timer to deleted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to