divijvaidya commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229359813


##########
server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java:
##########
@@ -96,71 +94,94 @@
  * This class is not thread-safe. There should not be any add calls while 
advanceClock is executing.
  * It is caller's responsibility to enforce it. Simultaneous add calls are 
thread-safe.
  */
-@nonthreadsafe
-private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, 
taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
+public class TimingWheel {
+    private final long tickMs;
+    private final long startMs;
+    private final int wheelSize;
+    private final AtomicInteger taskCounter;
+    private final DelayQueue<TimerTaskList> queue;
+    private final long interval;
+    private final TimerTaskList[] buckets;
+    private long currentTimeMs;
 
-  private[this] val interval = tickMs * wheelSize
-  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => 
new TimerTaskList(taskCounter) }
+    // overflowWheel can potentially be updated and read by two concurrent 
threads through add().
+    // Therefore, it needs to be volatile due to the issue of Double-Checked 
Locking pattern with JVM
+    private volatile TimingWheel overflowWheel = null;
 
-  private[this] var currentTime = startMs - (startMs % tickMs) // rounding 
down to multiple of tickMs
+    TimingWheel(
+        long tickMs,
+        int wheelSize,
+        long startMs,
+        AtomicInteger taskCounter,
+        DelayQueue<TimerTaskList> queue
+    ) {
+        this.tickMs = tickMs;
+        this.startMs = startMs;
+        this.wheelSize = wheelSize;
+        this.taskCounter = taskCounter;
+        this.queue = queue;
+        this.buckets = new TimerTaskList[wheelSize];
+        this.interval = tickMs * wheelSize;
+        // rounding down to multiple of tickMs
+        this.currentTimeMs = startMs - (startMs % tickMs);
 
-  // overflowWheel can potentially be updated and read by two concurrent 
threads through add().
-  // Therefore, it needs to be volatile due to the issue of Double-Checked 
Locking pattern with JVM
-  @volatile private[this] var overflowWheel: TimingWheel = _
+        for (int i = 0; i < buckets.length; i++) {
+            buckets[i] = new TimerTaskList(taskCounter);
+        }
+    }
 
-  private[this] def addOverflowWheel(): Unit = {
-    synchronized {
-      if (overflowWheel == null) {
-        overflowWheel = new TimingWheel(
-          tickMs = interval,
-          wheelSize = wheelSize,
-          startMs = currentTime,
-          taskCounter = taskCounter,
-          queue
-        )
-      }
+    private synchronized void addOverflowWheel() {
+        if (overflowWheel == null) {
+            overflowWheel = new TimingWheel(
+                interval,
+                wheelSize,
+                currentTimeMs,
+                taskCounter,
+                queue
+            );
+        }
     }
-  }
 
-  def add(timerTaskEntry: TimerTaskEntry): Boolean = {
-    val expiration = timerTaskEntry.expirationMs
+    public boolean add(TimerTaskEntry timerTaskEntry) {
+        long expiration = timerTaskEntry.expirationMs;
+
+        if (timerTaskEntry.cancelled()) {
+            // Cancelled
+            return false;
+        } else if (expiration < currentTimeMs + tickMs) {
+            // Already expired
+            return false;
+        } else if (expiration < currentTimeMs + interval) {
+            // Put in its own bucket
+            long virtualId = expiration / tickMs;

Review Comment:
   you are right, I was thinking from a perspective if this needs to be double 
for later use. But seems like we will take the floor of the double anyways to 
compute the bucket which is equivalent to boxing into long.
   
   Hence, long is good here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to