dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229343728


##########
server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java:
##########
@@ -96,71 +94,94 @@
  * This class is not thread-safe. There should not be any add calls while 
advanceClock is executing.
  * It is caller's responsibility to enforce it. Simultaneous add calls are 
thread-safe.
  */
-@nonthreadsafe
-private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, 
taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
+public class TimingWheel {
+    private final long tickMs;
+    private final long startMs;
+    private final int wheelSize;
+    private final AtomicInteger taskCounter;
+    private final DelayQueue<TimerTaskList> queue;
+    private final long interval;
+    private final TimerTaskList[] buckets;
+    private long currentTimeMs;
 
-  private[this] val interval = tickMs * wheelSize
-  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => 
new TimerTaskList(taskCounter) }
+    // overflowWheel can potentially be updated and read by two concurrent 
threads through add().
+    // Therefore, it needs to be volatile due to the issue of Double-Checked 
Locking pattern with JVM
+    private volatile TimingWheel overflowWheel = null;
 
-  private[this] var currentTime = startMs - (startMs % tickMs) // rounding 
down to multiple of tickMs
+    TimingWheel(
+        long tickMs,
+        int wheelSize,
+        long startMs,
+        AtomicInteger taskCounter,
+        DelayQueue<TimerTaskList> queue
+    ) {
+        this.tickMs = tickMs;
+        this.startMs = startMs;
+        this.wheelSize = wheelSize;
+        this.taskCounter = taskCounter;
+        this.queue = queue;
+        this.buckets = new TimerTaskList[wheelSize];
+        this.interval = tickMs * wheelSize;
+        // rounding down to multiple of tickMs
+        this.currentTimeMs = startMs - (startMs % tickMs);
 
-  // overflowWheel can potentially be updated and read by two concurrent 
threads through add().
-  // Therefore, it needs to be volatile due to the issue of Double-Checked 
Locking pattern with JVM
-  @volatile private[this] var overflowWheel: TimingWheel = _
+        for (int i = 0; i < buckets.length; i++) {
+            buckets[i] = new TimerTaskList(taskCounter);
+        }
+    }
 
-  private[this] def addOverflowWheel(): Unit = {
-    synchronized {
-      if (overflowWheel == null) {
-        overflowWheel = new TimingWheel(
-          tickMs = interval,
-          wheelSize = wheelSize,
-          startMs = currentTime,
-          taskCounter = taskCounter,
-          queue
-        )
-      }
+    private synchronized void addOverflowWheel() {
+        if (overflowWheel == null) {
+            overflowWheel = new TimingWheel(
+                interval,
+                wheelSize,
+                currentTimeMs,
+                taskCounter,
+                queue
+            );
+        }
     }
-  }
 
-  def add(timerTaskEntry: TimerTaskEntry): Boolean = {
-    val expiration = timerTaskEntry.expirationMs
+    public boolean add(TimerTaskEntry timerTaskEntry) {
+        long expiration = timerTaskEntry.expirationMs;
+
+        if (timerTaskEntry.cancelled()) {
+            // Cancelled
+            return false;
+        } else if (expiration < currentTimeMs + tickMs) {
+            // Already expired
+            return false;
+        } else if (expiration < currentTimeMs + interval) {
+            // Put in its own bucket
+            long virtualId = expiration / tickMs;

Review Comment:
   hum... i think that dividing a long by a long gives a long, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to