Zhanghao Chen created FLINK-38932:
-------------------------------------

             Summary: Incorrect scheduled timestamp in ProcessingTimeCallback 
with scheduleWithFixedDelay
                 Key: FLINK-38932
                 URL: https://issues.apache.org/jira/browse/FLINK-38932
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Task
            Reporter: Zhanghao Chen


h2. {{Problem}}
{{{}ProcessingTimeCallback#{}}}{{{}onProcessingTime{}}}tasks a parameter 
{{time}} which is the scheduled time of the callback. We found that the 
received time keeps lagging behind the actual scheduled time when using 
{{scheduleWithFixedDelay}} to register the timer.
h2. Root Cause Analysis
The time is computed by {{{}SystemProcessingTimeService#{}}}{{ScheduledTask}} 
by cumulating a fixed period evety time when the scheduled task runs. This is 
correct for {{{}scheduleWithFixedPeriod{}}}, but incorrect for 
{{scheduleWithFixedDelay}} as the processing/queuing delay is not accounted 
here.
{code:java}
public void run() {
    if (serviceStatus.get() != STATUS_ALIVE) {
        return;
    }
    try {
        callback.onProcessingTime(nextTimestamp);
    } catch (Exception ex) {
        exceptionHandler.handleException(ex);
    }
    nextTimestamp += period;
} {code}
h2. Fix
{code:java}
public void run() {
    if (serviceStatus.get() != STATUS_ALIVE) {
        return;
    }
    try {
        callback.onProcessingTime(nextTimestamp);
    } catch (Exception ex) {
        exceptionHandler.handleException(ex);
    }
    if (fixedDelay) {
        nextTimestamp = System.currentTimeMillis() + period;
    } else {
        nextTimestamp += period;
    }
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to