divijvaidya commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229359813
########## server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java: ########## @@ -96,71 +94,94 @@ * This class is not thread-safe. There should not be any add calls while advanceClock is executing. * It is caller's responsibility to enforce it. Simultaneous add calls are thread-safe. */ -@nonthreadsafe -private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) { +public class TimingWheel { + private final long tickMs; + private final long startMs; + private final int wheelSize; + private final AtomicInteger taskCounter; + private final DelayQueue<TimerTaskList> queue; + private final long interval; + private final TimerTaskList[] buckets; + private long currentTimeMs; - private[this] val interval = tickMs * wheelSize - private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) } + // overflowWheel can potentially be updated and read by two concurrent threads through add(). + // Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM + private volatile TimingWheel overflowWheel = null; - private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs + TimingWheel( + long tickMs, + int wheelSize, + long startMs, + AtomicInteger taskCounter, + DelayQueue<TimerTaskList> queue + ) { + this.tickMs = tickMs; + this.startMs = startMs; + this.wheelSize = wheelSize; + this.taskCounter = taskCounter; + this.queue = queue; + this.buckets = new TimerTaskList[wheelSize]; + this.interval = tickMs * wheelSize; + // rounding down to multiple of tickMs + this.currentTimeMs = startMs - (startMs % tickMs); - // overflowWheel can potentially be updated and read by two concurrent threads through add(). - // Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM - @volatile private[this] var overflowWheel: TimingWheel = _ + for (int i = 0; i < buckets.length; i++) { + buckets[i] = new TimerTaskList(taskCounter); + } + } - private[this] def addOverflowWheel(): Unit = { - synchronized { - if (overflowWheel == null) { - overflowWheel = new TimingWheel( - tickMs = interval, - wheelSize = wheelSize, - startMs = currentTime, - taskCounter = taskCounter, - queue - ) - } + private synchronized void addOverflowWheel() { + if (overflowWheel == null) { + overflowWheel = new TimingWheel( + interval, + wheelSize, + currentTimeMs, + taskCounter, + queue + ); + } } - } - def add(timerTaskEntry: TimerTaskEntry): Boolean = { - val expiration = timerTaskEntry.expirationMs + public boolean add(TimerTaskEntry timerTaskEntry) { + long expiration = timerTaskEntry.expirationMs; + + if (timerTaskEntry.cancelled()) { + // Cancelled + return false; + } else if (expiration < currentTimeMs + tickMs) { + // Already expired + return false; + } else if (expiration < currentTimeMs + interval) { + // Put in its own bucket + long virtualId = expiration / tickMs; Review Comment: you are right, I was thinking from a perspective if this needs to be double for later use. But seems like we will take the floor of the double anyways to compute the bucket which is equivalent to boxing into long. Hence, long is good here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org