[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229446218


##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+public abstract class TimerTask implements Runnable {
+private volatile TimerTaskEntry timerTaskEntry;
+// timestamp in millisecond
+public final long delayMs;
+
+public TimerTask(long delayMs) {
+this.delayMs = delayMs;
+}
+
+public void cancel() {
+synchronized (this) {
+if (timerTaskEntry != null) timerTaskEntry.remove();
+timerTaskEntry = null;
+}
+}
+
+void setTimerTaskEntry(TimerTaskEntry entry) {
+synchronized (this) {
+// if this timerTask is already held by an existing timer task 
entry,
+// we will remove such an entry first.
+if (timerTaskEntry != null && !timerTaskEntry.equals(entry)) {

Review Comment:
   Fixed in last commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229407264


##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+public class TimerTaskEntry {

Review Comment:
   As explained in the other comment, this is not required any more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229369591


##
core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala:
##
@@ -17,12 +17,15 @@
 package kafka.utils.timer
 
 import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.util.timer.{Timer, TimerTask, TimerTaskEntry}
 
 import scala.collection.mutable
 
 class MockTimer(val time: MockTime = new MockTime) extends Timer {
 
-  private val taskQueue = 
mutable.PriorityQueue[TimerTaskEntry]()(Ordering[TimerTaskEntry].reverse)
+  private val taskQueue = mutable.PriorityQueue.empty[TimerTaskEntry](new 
Ordering[TimerTaskEntry] {
+override def compare(x: TimerTaskEntry, y: TimerTaskEntry): Int = 
java.lang.Long.compare(x.expirationMs, y.expirationMs)

Review Comment:
   Right but I don’t think that the scala priority queue accepts this. This is 
why I do it inline here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229352143


##
server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java:
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class SystemTimer implements Timer {

Review Comment:
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229351745


##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+public abstract class TimerTask implements Runnable {
+private volatile TimerTaskEntry timerTaskEntry;
+// timestamp in millisecond
+public final long delayMs;
+
+public TimerTask(long delayMs) {
+this.delayMs = delayMs;
+}
+
+public void cancel() {
+synchronized (this) {
+if (timerTaskEntry != null) timerTaskEntry.remove();
+timerTaskEntry = null;
+}
+}
+
+void setTimerTaskEntry(TimerTaskEntry entry) {
+synchronized (this) {
+// if this timerTask is already held by an existing timer task 
entry,
+// we will remove such an entry first.
+if (timerTaskEntry != null && !timerTaskEntry.equals(entry)) {

Review Comment:
   @divijvaidya I was wondering if using `equals` is correct here. It was like 
this in Scala but as we compare references in all other places, it is a bit 
weird. Practically, it does not make any difference because `equals` is not 
overridden anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229343728


##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java:
##
@@ -96,71 +94,94 @@
  * This class is not thread-safe. There should not be any add calls while 
advanceClock is executing.
  * It is caller's responsibility to enforce it. Simultaneous add calls are 
thread-safe.
  */
-@nonthreadsafe
-private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, 
taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
+public class TimingWheel {
+private final long tickMs;
+private final long startMs;
+private final int wheelSize;
+private final AtomicInteger taskCounter;
+private final DelayQueue queue;
+private final long interval;
+private final TimerTaskList[] buckets;
+private long currentTimeMs;
 
-  private[this] val interval = tickMs * wheelSize
-  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => 
new TimerTaskList(taskCounter) }
+// overflowWheel can potentially be updated and read by two concurrent 
threads through add().
+// Therefore, it needs to be volatile due to the issue of Double-Checked 
Locking pattern with JVM
+private volatile TimingWheel overflowWheel = null;
 
-  private[this] var currentTime = startMs - (startMs % tickMs) // rounding 
down to multiple of tickMs
+TimingWheel(
+long tickMs,
+int wheelSize,
+long startMs,
+AtomicInteger taskCounter,
+DelayQueue queue
+) {
+this.tickMs = tickMs;
+this.startMs = startMs;
+this.wheelSize = wheelSize;
+this.taskCounter = taskCounter;
+this.queue = queue;
+this.buckets = new TimerTaskList[wheelSize];
+this.interval = tickMs * wheelSize;
+// rounding down to multiple of tickMs
+this.currentTimeMs = startMs - (startMs % tickMs);
 
-  // overflowWheel can potentially be updated and read by two concurrent 
threads through add().
-  // Therefore, it needs to be volatile due to the issue of Double-Checked 
Locking pattern with JVM
-  @volatile private[this] var overflowWheel: TimingWheel = _
+for (int i = 0; i < buckets.length; i++) {
+buckets[i] = new TimerTaskList(taskCounter);
+}
+}
 
-  private[this] def addOverflowWheel(): Unit = {
-synchronized {
-  if (overflowWheel == null) {
-overflowWheel = new TimingWheel(
-  tickMs = interval,
-  wheelSize = wheelSize,
-  startMs = currentTime,
-  taskCounter = taskCounter,
-  queue
-)
-  }
+private synchronized void addOverflowWheel() {
+if (overflowWheel == null) {
+overflowWheel = new TimingWheel(
+interval,
+wheelSize,
+currentTimeMs,
+taskCounter,
+queue
+);
+}
 }
-  }
 
-  def add(timerTaskEntry: TimerTaskEntry): Boolean = {
-val expiration = timerTaskEntry.expirationMs
+public boolean add(TimerTaskEntry timerTaskEntry) {
+long expiration = timerTaskEntry.expirationMs;
+
+if (timerTaskEntry.cancelled()) {
+// Cancelled
+return false;
+} else if (expiration < currentTimeMs + tickMs) {
+// Already expired
+return false;
+} else if (expiration < currentTimeMs + interval) {
+// Put in its own bucket
+long virtualId = expiration / tickMs;

Review Comment:
   hum... i think that dividing a long by a long gives a long, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229338181


##
core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala:
##
@@ -17,12 +17,15 @@
 package kafka.utils.timer
 
 import org.apache.kafka.server.util.MockTime
+import org.apache.kafka.server.util.timer.{Timer, TimerTask, TimerTaskEntry}
 
 import scala.collection.mutable
 
 class MockTimer(val time: MockTime = new MockTime) extends Timer {
 
-  private val taskQueue = 
mutable.PriorityQueue[TimerTaskEntry]()(Ordering[TimerTaskEntry].reverse)
+  private val taskQueue = mutable.PriorityQueue.empty[TimerTaskEntry](new 
Ordering[TimerTaskEntry] {
+override def compare(x: TimerTaskEntry, y: TimerTaskEntry): Int = 
java.lang.Long.compare(x.expirationMs, y.expirationMs)

Review Comment:
   I cannot use `Ordering` in Java. This is why I change it this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229336976


##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+public class TimerTaskEntry {

Review Comment:
   You meant `compare`, right? We can't implement in Java because it comes from 
Scala trait. It was only used by `MockTimer` and I have refactored it to not 
require it any more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229335079


##
core/src/main/scala/kafka/raft/TimingWheelExpirationService.scala:
##
@@ -17,17 +17,18 @@
 package kafka.raft
 
 import java.util.concurrent.CompletableFuture
-import kafka.utils.timer.{Timer, TimerTask}
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.raft.ExpirationService
 import org.apache.kafka.server.util.ShutdownableThread
+import org.apache.kafka.server.util.timer.{Timer, TimerTask}
 
 object TimingWheelExpirationService {
   private val WorkTimeoutMs: Long = 200L
 
-  class TimerTaskCompletableFuture[T](override val delayMs: Long) extends 
CompletableFuture[T] with TimerTask {

Review Comment:
   `TimerTask` is an abstract class in Java vs a trait in Scala and we can't 
extend two classes. Therefore, I have refactored this piece to only extend 
`TimerTask`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229328322


##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+public class TimerTaskEntry {
+public final TimerTask timerTask;
+public final long expirationMs;
+volatile TimerTaskList list;
+TimerTaskEntry next;
+TimerTaskEntry prev;
+
+public TimerTaskEntry(
+TimerTask timerTask,
+long expirationMs
+) {
+this.timerTask = timerTask;
+this.expirationMs = expirationMs;
+
+// if this timerTask is already held by an existing timer task entry,
+// setTimerTaskEntry will remove it.
+if (timerTask != null) {
+timerTask.setTimerTaskEntry(this);

Review Comment:
   I think that it does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-14 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1229326517


##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskList.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+class TimerTaskList implements Delayed {
+final AtomicInteger taskCounter;
+final AtomicLong expiration;
+
+// TimerTaskList forms a doubly linked cyclic list using a dummy root entry
+// root.next points to the head
+// root.prev points to the tail
+private TimerTaskEntry root;
+
+TimerTaskList(
+AtomicInteger taskCounter
+) {
+this.taskCounter = taskCounter;
+this.expiration = new AtomicLong(-1L);
+this.root = new TimerTaskEntry(null, -1L);
+this.root.next = root;
+this.root.prev = root;
+}
+
+public boolean setExpiration(long expirationMs) {
+return expiration.getAndSet(expirationMs) != expirationMs;
+}
+
+public long getExpiration() {
+return expiration.get();
+}
+
+public synchronized void foreach(Consumer f) {
+TimerTaskEntry entry = root.next;
+while (entry != root) {
+TimerTaskEntry nextEntry = entry.next;
+if (!entry.cancelled()) f.accept(entry.timerTask);
+entry = nextEntry;
+}
+}
+
+public void add(TimerTaskEntry timerTaskEntry) {
+boolean done = false;
+while (!done) {
+// Remove the timer task entry if it is already in any other list
+// We do this outside of the sync block below to avoid deadlocking.
+// We may retry until timerTaskEntry.list becomes null.
+timerTaskEntry.remove();
+
+synchronized (this) {
+synchronized (timerTaskEntry) {
+if (timerTaskEntry.list == null) {
+// put the timer task entry to the end of the list. 
(root.prev points to the tail entry)
+TimerTaskEntry tail = root.prev;
+timerTaskEntry.next = root;
+timerTaskEntry.prev = tail;
+timerTaskEntry.list = this;
+tail.next = timerTaskEntry;
+root.prev = timerTaskEntry;
+taskCounter.incrementAndGet();
+done = true;
+}
+}
+}
+}
+}
+
+public synchronized void remove(TimerTaskEntry timerTaskEntry) {
+synchronized (timerTaskEntry) {
+if (timerTaskEntry.list == this) {
+timerTaskEntry.next.prev = timerTaskEntry.prev;
+timerTaskEntry.prev.next = timerTaskEntry.next;
+timerTaskEntry.next = null;
+timerTaskEntry.prev = null;
+timerTaskEntry.list = null;
+taskCounter.decrementAndGet();
+}
+}
+}
+
+public synchronized void flush(Consumer f) {
+TimerTaskEntry head = root.next;
+while (head != root) {
+remove(head);
+f.accept(head);
+head = root.next;
+}
+expiration.set(-1L);
+}
+
+@Override
+public long getDelay(TimeUnit unit) {
+return unit.convert(Math.max(getExpiration() - 
Time.SYSTEM.hiResClockMs(), 0), TimeUnit.MICROSECONDS);

Review Comment:
   It is not... Let me fix this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-12 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1226587095


##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+public abstract class TimerTask implements Runnable {
+private volatile TimerTaskEntry timerTaskEntry;

Review Comment:
   Yeah, that should work. I think that it is preferable to do this separately 
though. My goal was to have a 1:1 mapping between the scala and the java code 
in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-09 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1223930257


##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+public abstract class TimerTask implements Runnable {
+private volatile TimerTaskEntry timerTaskEntry;

Review Comment:
   There is a side effect as well when we mutate `timerTaskEntry`. We call 
`timerTaskEntry.remove()`. Is it safe to use an `AtomicReference` in this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-09 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1223928823


##
server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+public abstract class TimerTask implements Runnable {
+private volatile TimerTaskEntry timerTaskEntry;
+// timestamp in millisecond
+public final long delayMs;
+
+public TimerTask(long delayMs) {
+this.delayMs = delayMs;
+}
+
+public void cancel() {
+synchronized (this) {

Review Comment:
   I tried this but spotbugs does not like it because of the access 
`timerTaskEntry` without synchronisation later on. This is why I kept it like 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-09 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1223928104


##
server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java:
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class SystemTimer implements Timer {
+// timeout timer
+private final ExecutorService taskExecutor;
+private final DelayQueue delayQueue;
+private final AtomicInteger taskCounter;
+private final TimingWheel timingWheel;
+
+// Locks used to protect data structures while ticking
+private final ReentrantReadWriteLock readWriteLock = new 
ReentrantReadWriteLock();
+private final ReentrantReadWriteLock.ReadLock readLock = 
readWriteLock.readLock();
+private final ReentrantReadWriteLock.WriteLock writeLock = 
readWriteLock.writeLock();
+
+public SystemTimer(String executorName) {
+this(executorName, 1, 20, Time.SYSTEM.hiResClockMs());
+}
+
+public SystemTimer(
+String executorName,
+long tickMs,
+int wheelSize,
+long startMs
+) {
+this.taskExecutor = Executors.newFixedThreadPool(1,
+runnable -> KafkaThread.nonDaemon("executor-" + executorName, 
runnable));
+this.delayQueue = new DelayQueue<>();
+this.taskCounter = new AtomicInteger(0);
+this.timingWheel = new TimingWheel(
+tickMs,
+wheelSize,
+startMs,
+taskCounter,
+delayQueue
+);
+}
+
+public void add(TimerTask timerTask) {
+readLock.lock();

Review Comment:
   Personally, I am not a big fan of those helper methods for locks in our Java 
code. I think that we could consider this separately. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common

2023-06-09 Thread via GitHub


dajac commented on code in PR #13820:
URL: https://github.com/apache/kafka/pull/13820#discussion_r1223925835


##
server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java:
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util.timer;
+
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class SystemTimer implements Timer {

Review Comment:
   I am not sure to understand the benefits of doing this here. My 
understanding is that `AutoClosable` just adds a `close` method but you still 
need to class it explicitly or implicitly by using `try-with-resources`. If we 
want to do this, we would need to bring this change to the `Timer` interface 
instead of doing it here. I think that this is something that we could consider 
separately. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org