divijvaidya commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229257653


##########
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskList.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+class TimerTaskList implements Delayed {
+    final AtomicInteger taskCounter;
+    final AtomicLong expiration;
+
+    // TimerTaskList forms a doubly linked cyclic list using a dummy root entry
+    // root.next points to the head
+    // root.prev points to the tail
+    private TimerTaskEntry root;

Review Comment:
   private final (corresponding to val instead of var in scala)
   
   This is only set in the constructor.
   
   (same for other members of this class)



##########
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskList.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+class TimerTaskList implements Delayed {
+    final AtomicInteger taskCounter;
+    final AtomicLong expiration;
+
+    // TimerTaskList forms a doubly linked cyclic list using a dummy root entry
+    // root.next points to the head
+    // root.prev points to the tail
+    private TimerTaskEntry root;
+
+    TimerTaskList(
+        AtomicInteger taskCounter
+    ) {
+        this.taskCounter = taskCounter;
+        this.expiration = new AtomicLong(-1L);
+        this.root = new TimerTaskEntry(null, -1L);
+        this.root.next = root;
+        this.root.prev = root;
+    }
+
+    public boolean setExpiration(long expirationMs) {
+        return expiration.getAndSet(expirationMs) != expirationMs;
+    }
+
+    public long getExpiration() {
+        return expiration.get();
+    }
+
+    public synchronized void foreach(Consumer<TimerTask> f) {
+        TimerTaskEntry entry = root.next;
+        while (entry != root) {
+            TimerTaskEntry nextEntry = entry.next;
+            if (!entry.cancelled()) f.accept(entry.timerTask);
+            entry = nextEntry;
+        }
+    }
+
+    public void add(TimerTaskEntry timerTaskEntry) {
+        boolean done = false;
+        while (!done) {
+            // Remove the timer task entry if it is already in any other list
+            // We do this outside of the sync block below to avoid deadlocking.
+            // We may retry until timerTaskEntry.list becomes null.
+            timerTaskEntry.remove();
+
+            synchronized (this) {
+                synchronized (timerTaskEntry) {
+                    if (timerTaskEntry.list == null) {
+                        // put the timer task entry to the end of the list. 
(root.prev points to the tail entry)
+                        TimerTaskEntry tail = root.prev;
+                        timerTaskEntry.next = root;
+                        timerTaskEntry.prev = tail;
+                        timerTaskEntry.list = this;
+                        tail.next = timerTaskEntry;
+                        root.prev = timerTaskEntry;
+                        taskCounter.incrementAndGet();
+                        done = true;
+                    }
+                }
+            }
+        }
+    }
+
+    public synchronized void remove(TimerTaskEntry timerTaskEntry) {
+        synchronized (timerTaskEntry) {
+            if (timerTaskEntry.list == this) {
+                timerTaskEntry.next.prev = timerTaskEntry.prev;
+                timerTaskEntry.prev.next = timerTaskEntry.next;
+                timerTaskEntry.next = null;
+                timerTaskEntry.prev = null;
+                timerTaskEntry.list = null;
+                taskCounter.decrementAndGet();
+            }
+        }
+    }
+
+    public synchronized void flush(Consumer<TimerTaskEntry> f) {
+        TimerTaskEntry head = root.next;
+        while (head != root) {
+            remove(head);
+            f.accept(head);
+            head = root.next;
+        }
+        expiration.set(-1L);
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+        return unit.convert(Math.max(getExpiration() - 
Time.SYSTEM.hiResClockMs(), 0), TimeUnit.MICROSECONDS);

Review Comment:
   The timeunit is MILLISECONDS in the scala code here. Is the change to 
MICROSECONDS intentional? If no, then why didn't a unit test fail? Could we add 
that missing unit test please?



##########
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+public class TimerTaskEntry {
+    public final TimerTask timerTask;
+    public final long expirationMs;
+    volatile TimerTaskList list;
+    TimerTaskEntry next;
+    TimerTaskEntry prev;
+
+    public TimerTaskEntry(
+        TimerTask timerTask,
+        long expirationMs
+    ) {
+        this.timerTask = timerTask;
+        this.expirationMs = expirationMs;
+
+        // if this timerTask is already held by an existing timer task entry,
+        // setTimerTaskEntry will remove it.
+        if (timerTask != null) {
+            timerTask.setTimerTaskEntry(this);

Review Comment:
   does `this` reference exist even when constructor invocation isn't complete?



##########
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+public class TimerTaskEntry {

Review Comment:
   Missing compareTo method.
   
   A unit test should have caught this. Please add a unit test if we are having 
a gap in testing.



##########
server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java:
##########
@@ -96,71 +94,94 @@
  * This class is not thread-safe. There should not be any add calls while 
advanceClock is executing.
  * It is caller's responsibility to enforce it. Simultaneous add calls are 
thread-safe.
  */
-@nonthreadsafe
-private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, 
taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
+public class TimingWheel {
+    private final long tickMs;
+    private final long startMs;
+    private final int wheelSize;
+    private final AtomicInteger taskCounter;
+    private final DelayQueue<TimerTaskList> queue;
+    private final long interval;
+    private final TimerTaskList[] buckets;
+    private long currentTimeMs;
 
-  private[this] val interval = tickMs * wheelSize
-  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => 
new TimerTaskList(taskCounter) }
+    // overflowWheel can potentially be updated and read by two concurrent 
threads through add().
+    // Therefore, it needs to be volatile due to the issue of Double-Checked 
Locking pattern with JVM
+    private volatile TimingWheel overflowWheel = null;
 
-  private[this] var currentTime = startMs - (startMs % tickMs) // rounding 
down to multiple of tickMs
+    TimingWheel(
+        long tickMs,
+        int wheelSize,
+        long startMs,
+        AtomicInteger taskCounter,
+        DelayQueue<TimerTaskList> queue
+    ) {
+        this.tickMs = tickMs;
+        this.startMs = startMs;
+        this.wheelSize = wheelSize;
+        this.taskCounter = taskCounter;
+        this.queue = queue;
+        this.buckets = new TimerTaskList[wheelSize];
+        this.interval = tickMs * wheelSize;
+        // rounding down to multiple of tickMs
+        this.currentTimeMs = startMs - (startMs % tickMs);
 
-  // overflowWheel can potentially be updated and read by two concurrent 
threads through add().
-  // Therefore, it needs to be volatile due to the issue of Double-Checked 
Locking pattern with JVM
-  @volatile private[this] var overflowWheel: TimingWheel = _
+        for (int i = 0; i < buckets.length; i++) {
+            buckets[i] = new TimerTaskList(taskCounter);
+        }
+    }
 
-  private[this] def addOverflowWheel(): Unit = {
-    synchronized {
-      if (overflowWheel == null) {
-        overflowWheel = new TimingWheel(
-          tickMs = interval,
-          wheelSize = wheelSize,
-          startMs = currentTime,
-          taskCounter = taskCounter,
-          queue
-        )
-      }
+    private synchronized void addOverflowWheel() {
+        if (overflowWheel == null) {
+            overflowWheel = new TimingWheel(
+                interval,
+                wheelSize,
+                currentTimeMs,
+                taskCounter,
+                queue
+            );
+        }
     }
-  }
 
-  def add(timerTaskEntry: TimerTaskEntry): Boolean = {
-    val expiration = timerTaskEntry.expirationMs
+    public boolean add(TimerTaskEntry timerTaskEntry) {
+        long expiration = timerTaskEntry.expirationMs;
+
+        if (timerTaskEntry.cancelled()) {
+            // Cancelled
+            return false;
+        } else if (expiration < currentTimeMs + tickMs) {
+            // Already expired
+            return false;
+        } else if (expiration < currentTimeMs + interval) {
+            // Put in its own bucket
+            long virtualId = expiration / tickMs;

Review Comment:
   should this be `double` instead of long?



##########
core/src/main/scala/kafka/raft/TimingWheelExpirationService.scala:
##########
@@ -17,17 +17,18 @@
 package kafka.raft
 
 import java.util.concurrent.CompletableFuture
-import kafka.utils.timer.{Timer, TimerTask}
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.raft.ExpirationService
 import org.apache.kafka.server.util.ShutdownableThread
+import org.apache.kafka.server.util.timer.{Timer, TimerTask}
 
 object TimingWheelExpirationService {
   private val WorkTimeoutMs: Long = 200L
 
-  class TimerTaskCompletableFuture[T](override val delayMs: Long) extends 
CompletableFuture[T] with TimerTask {

Review Comment:
   why did we remove the inheritance from CompletableFuture?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to