[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229446218 ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +public abstract class TimerTask implements Runnable { +private volatile TimerTaskEntry timerTaskEntry; +// timestamp in millisecond +public final long delayMs; + +public TimerTask(long delayMs) { +this.delayMs = delayMs; +} + +public void cancel() { +synchronized (this) { +if (timerTaskEntry != null) timerTaskEntry.remove(); +timerTaskEntry = null; +} +} + +void setTimerTaskEntry(TimerTaskEntry entry) { +synchronized (this) { +// if this timerTask is already held by an existing timer task entry, +// we will remove such an entry first. +if (timerTaskEntry != null && !timerTaskEntry.equals(entry)) { Review Comment: Fixed in last commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229407264 ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +public class TimerTaskEntry { Review Comment: As explained in the other comment, this is not required any more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229369591 ## core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala: ## @@ -17,12 +17,15 @@ package kafka.utils.timer import org.apache.kafka.server.util.MockTime +import org.apache.kafka.server.util.timer.{Timer, TimerTask, TimerTaskEntry} import scala.collection.mutable class MockTimer(val time: MockTime = new MockTime) extends Timer { - private val taskQueue = mutable.PriorityQueue[TimerTaskEntry]()(Ordering[TimerTaskEntry].reverse) + private val taskQueue = mutable.PriorityQueue.empty[TimerTaskEntry](new Ordering[TimerTaskEntry] { +override def compare(x: TimerTaskEntry, y: TimerTaskEntry): Int = java.lang.Long.compare(x.expirationMs, y.expirationMs) Review Comment: Right but I don’t think that the scala priority queue accepts this. This is why I do it inline here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229352143 ## server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java: ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Time; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class SystemTimer implements Timer { Review Comment: Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229351745 ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +public abstract class TimerTask implements Runnable { +private volatile TimerTaskEntry timerTaskEntry; +// timestamp in millisecond +public final long delayMs; + +public TimerTask(long delayMs) { +this.delayMs = delayMs; +} + +public void cancel() { +synchronized (this) { +if (timerTaskEntry != null) timerTaskEntry.remove(); +timerTaskEntry = null; +} +} + +void setTimerTaskEntry(TimerTaskEntry entry) { +synchronized (this) { +// if this timerTask is already held by an existing timer task entry, +// we will remove such an entry first. +if (timerTaskEntry != null && !timerTaskEntry.equals(entry)) { Review Comment: @divijvaidya I was wondering if using `equals` is correct here. It was like this in Scala but as we compare references in all other places, it is a bit weird. Practically, it does not make any difference because `equals` is not overridden anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229343728 ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimingWheel.java: ## @@ -96,71 +94,94 @@ * This class is not thread-safe. There should not be any add calls while advanceClock is executing. * It is caller's responsibility to enforce it. Simultaneous add calls are thread-safe. */ -@nonthreadsafe -private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) { +public class TimingWheel { +private final long tickMs; +private final long startMs; +private final int wheelSize; +private final AtomicInteger taskCounter; +private final DelayQueue queue; +private final long interval; +private final TimerTaskList[] buckets; +private long currentTimeMs; - private[this] val interval = tickMs * wheelSize - private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) } +// overflowWheel can potentially be updated and read by two concurrent threads through add(). +// Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM +private volatile TimingWheel overflowWheel = null; - private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs +TimingWheel( +long tickMs, +int wheelSize, +long startMs, +AtomicInteger taskCounter, +DelayQueue queue +) { +this.tickMs = tickMs; +this.startMs = startMs; +this.wheelSize = wheelSize; +this.taskCounter = taskCounter; +this.queue = queue; +this.buckets = new TimerTaskList[wheelSize]; +this.interval = tickMs * wheelSize; +// rounding down to multiple of tickMs +this.currentTimeMs = startMs - (startMs % tickMs); - // overflowWheel can potentially be updated and read by two concurrent threads through add(). - // Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM - @volatile private[this] var overflowWheel: TimingWheel = _ +for (int i = 0; i < buckets.length; i++) { +buckets[i] = new TimerTaskList(taskCounter); +} +} - private[this] def addOverflowWheel(): Unit = { -synchronized { - if (overflowWheel == null) { -overflowWheel = new TimingWheel( - tickMs = interval, - wheelSize = wheelSize, - startMs = currentTime, - taskCounter = taskCounter, - queue -) - } +private synchronized void addOverflowWheel() { +if (overflowWheel == null) { +overflowWheel = new TimingWheel( +interval, +wheelSize, +currentTimeMs, +taskCounter, +queue +); +} } - } - def add(timerTaskEntry: TimerTaskEntry): Boolean = { -val expiration = timerTaskEntry.expirationMs +public boolean add(TimerTaskEntry timerTaskEntry) { +long expiration = timerTaskEntry.expirationMs; + +if (timerTaskEntry.cancelled()) { +// Cancelled +return false; +} else if (expiration < currentTimeMs + tickMs) { +// Already expired +return false; +} else if (expiration < currentTimeMs + interval) { +// Put in its own bucket +long virtualId = expiration / tickMs; Review Comment: hum... i think that dividing a long by a long gives a long, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229338181 ## core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala: ## @@ -17,12 +17,15 @@ package kafka.utils.timer import org.apache.kafka.server.util.MockTime +import org.apache.kafka.server.util.timer.{Timer, TimerTask, TimerTaskEntry} import scala.collection.mutable class MockTimer(val time: MockTime = new MockTime) extends Timer { - private val taskQueue = mutable.PriorityQueue[TimerTaskEntry]()(Ordering[TimerTaskEntry].reverse) + private val taskQueue = mutable.PriorityQueue.empty[TimerTaskEntry](new Ordering[TimerTaskEntry] { +override def compare(x: TimerTaskEntry, y: TimerTaskEntry): Int = java.lang.Long.compare(x.expirationMs, y.expirationMs) Review Comment: I cannot use `Ordering` in Java. This is why I change it this way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229336976 ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +public class TimerTaskEntry { Review Comment: You meant `compare`, right? We can't implement in Java because it comes from Scala trait. It was only used by `MockTimer` and I have refactored it to not require it any more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229335079 ## core/src/main/scala/kafka/raft/TimingWheelExpirationService.scala: ## @@ -17,17 +17,18 @@ package kafka.raft import java.util.concurrent.CompletableFuture -import kafka.utils.timer.{Timer, TimerTask} import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.raft.ExpirationService import org.apache.kafka.server.util.ShutdownableThread +import org.apache.kafka.server.util.timer.{Timer, TimerTask} object TimingWheelExpirationService { private val WorkTimeoutMs: Long = 200L - class TimerTaskCompletableFuture[T](override val delayMs: Long) extends CompletableFuture[T] with TimerTask { Review Comment: `TimerTask` is an abstract class in Java vs a trait in Scala and we can't extend two classes. Therefore, I have refactored this piece to only extend `TimerTask`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229328322 ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskEntry.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +public class TimerTaskEntry { +public final TimerTask timerTask; +public final long expirationMs; +volatile TimerTaskList list; +TimerTaskEntry next; +TimerTaskEntry prev; + +public TimerTaskEntry( +TimerTask timerTask, +long expirationMs +) { +this.timerTask = timerTask; +this.expirationMs = expirationMs; + +// if this timerTask is already held by an existing timer task entry, +// setTimerTaskEntry will remove it. +if (timerTask != null) { +timerTask.setTimerTaskEntry(this); Review Comment: I think that it does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1229326517 ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTaskList.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +import org.apache.kafka.common.utils.Time; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +class TimerTaskList implements Delayed { +final AtomicInteger taskCounter; +final AtomicLong expiration; + +// TimerTaskList forms a doubly linked cyclic list using a dummy root entry +// root.next points to the head +// root.prev points to the tail +private TimerTaskEntry root; + +TimerTaskList( +AtomicInteger taskCounter +) { +this.taskCounter = taskCounter; +this.expiration = new AtomicLong(-1L); +this.root = new TimerTaskEntry(null, -1L); +this.root.next = root; +this.root.prev = root; +} + +public boolean setExpiration(long expirationMs) { +return expiration.getAndSet(expirationMs) != expirationMs; +} + +public long getExpiration() { +return expiration.get(); +} + +public synchronized void foreach(Consumer f) { +TimerTaskEntry entry = root.next; +while (entry != root) { +TimerTaskEntry nextEntry = entry.next; +if (!entry.cancelled()) f.accept(entry.timerTask); +entry = nextEntry; +} +} + +public void add(TimerTaskEntry timerTaskEntry) { +boolean done = false; +while (!done) { +// Remove the timer task entry if it is already in any other list +// We do this outside of the sync block below to avoid deadlocking. +// We may retry until timerTaskEntry.list becomes null. +timerTaskEntry.remove(); + +synchronized (this) { +synchronized (timerTaskEntry) { +if (timerTaskEntry.list == null) { +// put the timer task entry to the end of the list. (root.prev points to the tail entry) +TimerTaskEntry tail = root.prev; +timerTaskEntry.next = root; +timerTaskEntry.prev = tail; +timerTaskEntry.list = this; +tail.next = timerTaskEntry; +root.prev = timerTaskEntry; +taskCounter.incrementAndGet(); +done = true; +} +} +} +} +} + +public synchronized void remove(TimerTaskEntry timerTaskEntry) { +synchronized (timerTaskEntry) { +if (timerTaskEntry.list == this) { +timerTaskEntry.next.prev = timerTaskEntry.prev; +timerTaskEntry.prev.next = timerTaskEntry.next; +timerTaskEntry.next = null; +timerTaskEntry.prev = null; +timerTaskEntry.list = null; +taskCounter.decrementAndGet(); +} +} +} + +public synchronized void flush(Consumer f) { +TimerTaskEntry head = root.next; +while (head != root) { +remove(head); +f.accept(head); +head = root.next; +} +expiration.set(-1L); +} + +@Override +public long getDelay(TimeUnit unit) { +return unit.convert(Math.max(getExpiration() - Time.SYSTEM.hiResClockMs(), 0), TimeUnit.MICROSECONDS); Review Comment: It is not... Let me fix this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1226587095 ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +public abstract class TimerTask implements Runnable { +private volatile TimerTaskEntry timerTaskEntry; Review Comment: Yeah, that should work. I think that it is preferable to do this separately though. My goal was to have a 1:1 mapping between the scala and the java code in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1223930257 ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +public abstract class TimerTask implements Runnable { +private volatile TimerTaskEntry timerTaskEntry; Review Comment: There is a side effect as well when we mutate `timerTaskEntry`. We call `timerTaskEntry.remove()`. Is it safe to use an `AtomicReference` in this case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1223928823 ## server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +public abstract class TimerTask implements Runnable { +private volatile TimerTaskEntry timerTaskEntry; +// timestamp in millisecond +public final long delayMs; + +public TimerTask(long delayMs) { +this.delayMs = delayMs; +} + +public void cancel() { +synchronized (this) { Review Comment: I tried this but spotbugs does not like it because of the access `timerTaskEntry` without synchronisation later on. This is why I kept it like this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1223928104 ## server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java: ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Time; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class SystemTimer implements Timer { +// timeout timer +private final ExecutorService taskExecutor; +private final DelayQueue delayQueue; +private final AtomicInteger taskCounter; +private final TimingWheel timingWheel; + +// Locks used to protect data structures while ticking +private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); +private final ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); +private final ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); + +public SystemTimer(String executorName) { +this(executorName, 1, 20, Time.SYSTEM.hiResClockMs()); +} + +public SystemTimer( +String executorName, +long tickMs, +int wheelSize, +long startMs +) { +this.taskExecutor = Executors.newFixedThreadPool(1, +runnable -> KafkaThread.nonDaemon("executor-" + executorName, runnable)); +this.delayQueue = new DelayQueue<>(); +this.taskCounter = new AtomicInteger(0); +this.timingWheel = new TimingWheel( +tickMs, +wheelSize, +startMs, +taskCounter, +delayQueue +); +} + +public void add(TimerTask timerTask) { +readLock.lock(); Review Comment: Personally, I am not a big fan of those helper methods for locks in our Java code. I think that we could consider this separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13820: MINOR: Move Timer/TimingWheel to server-common
dajac commented on code in PR #13820: URL: https://github.com/apache/kafka/pull/13820#discussion_r1223925835 ## server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java: ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util.timer; + +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Time; + +import java.util.concurrent.DelayQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class SystemTimer implements Timer { Review Comment: I am not sure to understand the benefits of doing this here. My understanding is that `AutoClosable` just adds a `close` method but you still need to class it explicitly or implicitly by using `try-with-resources`. If we want to do this, we would need to bring this change to the `Timer` interface instead of doing it here. I think that this is something that we could consider separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org