Zhanghao Chen created FLINK-38932:
-------------------------------------
Summary: Incorrect scheduled timestamp in ProcessingTimeCallback
with scheduleWithFixedDelay
Key: FLINK-38932
URL: https://issues.apache.org/jira/browse/FLINK-38932
Project: Flink
Issue Type: Bug
Components: Runtime / Task
Reporter: Zhanghao Chen
h2. {{Problem}}
{{{}ProcessingTimeCallback#{}}}{{{}onProcessingTime{}}}tasks a parameter
{{time}} which is the scheduled time of the callback. We found that the
received time keeps lagging behind the actual scheduled time when using
{{scheduleWithFixedDelay}} to register the timer.
h2. Root Cause Analysis
The time is computed by {{{}SystemProcessingTimeService#{}}}{{ScheduledTask}}
by cumulating a fixed period evety time when the scheduled task runs. This is
correct for {{{}scheduleWithFixedPeriod{}}}, but incorrect for
{{scheduleWithFixedDelay}} as the processing/queuing delay is not accounted
here.
{code:java}
public void run() {
if (serviceStatus.get() != STATUS_ALIVE) {
return;
}
try {
callback.onProcessingTime(nextTimestamp);
} catch (Exception ex) {
exceptionHandler.handleException(ex);
}
nextTimestamp += period;
} {code}
h2. Fix
{code:java}
public void run() {
if (serviceStatus.get() != STATUS_ALIVE) {
return;
}
try {
callback.onProcessingTime(nextTimestamp);
} catch (Exception ex) {
exceptionHandler.handleException(ex);
}
if (fixedDelay) {
nextTimestamp = System.currentTimeMillis() + period;
} else {
nextTimestamp += period;
}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)