Repository: kafka Updated Branches: refs/heads/trunk d903babb7 -> dedacd06e
KAFKA-4051: Use nanosecond clock for timers in broker Use System.nanoseconds instead of System.currentTimeMillis in broker timer tasks to cope with changes to wall-clock time. Author: Rajini Sivaram <[email protected]> Reviewers: Gwen Shapira Closes #1768 from rajinisivaram/KAFKA-4051 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dedacd06 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dedacd06 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dedacd06 Branch: refs/heads/trunk Commit: dedacd06e4d1e967261b9bca3e32ba0e44b52ba1 Parents: d903bab Author: Rajini Sivaram <[email protected]> Authored: Mon Aug 22 15:58:32 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Mon Aug 22 15:58:32 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/utils/Time.scala | 4 ++++ core/src/main/scala/kafka/utils/timer/Timer.scala | 5 +++-- core/src/main/scala/kafka/utils/timer/TimerTaskList.scala | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dedacd06/core/src/main/scala/kafka/utils/Time.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Time.scala b/core/src/main/scala/kafka/utils/Time.scala index 194cc1f..f562ef7 100644 --- a/core/src/main/scala/kafka/utils/Time.scala +++ b/core/src/main/scala/kafka/utils/Time.scala @@ -17,6 +17,8 @@ package kafka.utils +import java.util.concurrent.TimeUnit + /** * Some common constants */ @@ -44,6 +46,8 @@ trait Time { def nanoseconds: Long + def hiResClockMs: Long = TimeUnit.NANOSECONDS.toMillis(nanoseconds) + def sleep(ms: Long) } http://git-wip-us.apache.org/repos/asf/kafka/blob/dedacd06/core/src/main/scala/kafka/utils/timer/Timer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index 2d78665..67de276 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.utils.threadsafe import org.apache.kafka.common.utils.Utils +import kafka.utils.SystemTime trait Timer { /** @@ -55,7 +56,7 @@ trait Timer { class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20, - startMs: Long = System.currentTimeMillis) extends Timer { + startMs: Long = SystemTime.hiResClockMs) extends Timer { // timeout timer private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() { @@ -81,7 +82,7 @@ class SystemTimer(executorName: String, def add(timerTask: TimerTask): Unit = { readLock.lock() try { - addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + System.currentTimeMillis())) + addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + SystemTime.hiResClockMs)) } finally { readLock.unlock() } http://git-wip-us.apache.org/repos/asf/kafka/blob/dedacd06/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index e862f4f..7a77b27 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -117,7 +117,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { } def getDelay(unit: TimeUnit): Long = { - unit.convert(max(getExpiration - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS) + unit.convert(max(getExpiration - SystemTime.hiResClockMs, 0), TimeUnit.MILLISECONDS) } def compareTo(d: Delayed): Int = {
