This is an automated email from the ASF dual-hosted git repository. divijv pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f4981790c41 KAFKA-15085: Make Timer.java implement AutoCloseable (#13872) f4981790c41 is described below commit f4981790c414eafbb0cd29f9ce42297e10420ca6 Author: Joobi S B <joob...@gmail.com> AuthorDate: Mon Jun 19 19:20:30 2023 +0530 KAFKA-15085: Make Timer.java implement AutoCloseable (#13872) Change Timer.java to implement AutoCloseable because automatic bug finders will flag a warning if an object of a class is marked as AutoCloseable but is not closed properly in the code. Reviewers: Divij Vaidya <di...@amazon.com> --- core/src/main/scala/kafka/raft/RaftManager.scala | 2 +- core/src/main/scala/kafka/server/DelayedOperation.scala | 2 +- core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala | 2 +- .../main/java/org/apache/kafka/server/util/timer/SystemTimer.java | 3 ++- .../src/main/java/org/apache/kafka/server/util/timer/Timer.java | 6 +----- .../src/test/java/org/apache/kafka/server/util/timer/TimerTest.java | 4 ++-- 6 files changed, 8 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index 59c1b928e74..4799feb1245 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -195,7 +195,7 @@ class KafkaRaftManager[T]( def shutdown(): Unit = { CoreUtils.swallow(expirationService.shutdown(), this) - CoreUtils.swallow(expirationTimer.shutdown(), this) + CoreUtils.swallow(expirationTimer.close(), this) CoreUtils.swallow(raftIoThread.shutdown(), this) CoreUtils.swallow(client.close(), this) CoreUtils.swallow(scheduler.shutdown(), this) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index c6c90c3c85a..48a17442e0e 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -339,7 +339,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri }) expirationReaper.awaitShutdown() } - timeoutTimer.shutdown() + timeoutTimer.close() metricsGroup.removeMetric("PurgatorySize", metricsTags) metricsGroup.removeMetric("NumDelayedOperations", metricsTags) } diff --git a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala index 557915fece6..cee7dc097f1 100644 --- a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala +++ b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala @@ -67,6 +67,6 @@ class MockTimer(val time: MockTime = new MockTime) extends Timer { def size: Int = taskQueue.size - override def shutdown(): Unit = {} + override def close(): Unit = {} } diff --git a/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java b/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java index 97d2e75eb04..15b22bb3e8d 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java @@ -106,7 +106,8 @@ public class SystemTimer implements Timer { return taskCounter.get(); } - public void shutdown() { + @Override + public void close() { taskExecutor.shutdown(); } } diff --git a/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java b/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java index defed2bc90c..2771f34a7cd 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.server.util.timer; -public interface Timer { +public interface Timer extends AutoCloseable { /** * Add a new task to this executor. It will be executed after the task's delay * (beginning from the time of submission) @@ -38,8 +38,4 @@ public interface Timer { */ int size(); - /** - * Shutdown the timer service, leaving pending tasks unexecuted - */ - void shutdown(); } diff --git a/server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTest.java b/server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTest.java index effce622c67..b7463ad01ce 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTest.java @@ -74,8 +74,8 @@ public class TimerTest { } @AfterEach - public void teardown() { - timer.shutdown(); + public void teardown() throws Exception { + timer.close(); } @Test