This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f4981790c41 KAFKA-15085: Make Timer.java implement AutoCloseable 
(#13872)
f4981790c41 is described below

commit f4981790c414eafbb0cd29f9ce42297e10420ca6
Author: Joobi S B <joob...@gmail.com>
AuthorDate: Mon Jun 19 19:20:30 2023 +0530

    KAFKA-15085: Make Timer.java implement AutoCloseable (#13872)
    
    Change Timer.java to implement AutoCloseable because automatic bug finders 
will flag a warning if an object of a class is marked as AutoCloseable but is 
not closed properly in the code.
    
    Reviewers:  Divij Vaidya <di...@amazon.com>
---
 core/src/main/scala/kafka/raft/RaftManager.scala                    | 2 +-
 core/src/main/scala/kafka/server/DelayedOperation.scala             | 2 +-
 core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala          | 2 +-
 .../main/java/org/apache/kafka/server/util/timer/SystemTimer.java   | 3 ++-
 .../src/main/java/org/apache/kafka/server/util/timer/Timer.java     | 6 +-----
 .../src/test/java/org/apache/kafka/server/util/timer/TimerTest.java | 4 ++--
 6 files changed, 8 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala 
b/core/src/main/scala/kafka/raft/RaftManager.scala
index 59c1b928e74..4799feb1245 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -195,7 +195,7 @@ class KafkaRaftManager[T](
 
   def shutdown(): Unit = {
     CoreUtils.swallow(expirationService.shutdown(), this)
-    CoreUtils.swallow(expirationTimer.shutdown(), this)
+    CoreUtils.swallow(expirationTimer.close(), this)
     CoreUtils.swallow(raftIoThread.shutdown(), this)
     CoreUtils.swallow(client.close(), this)
     CoreUtils.swallow(scheduler.shutdown(), this)
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index c6c90c3c85a..48a17442e0e 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -339,7 +339,7 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
       })
       expirationReaper.awaitShutdown()
     }
-    timeoutTimer.shutdown()
+    timeoutTimer.close()
     metricsGroup.removeMetric("PurgatorySize", metricsTags)
     metricsGroup.removeMetric("NumDelayedOperations", metricsTags)
   }
diff --git a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala 
b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
index 557915fece6..cee7dc097f1 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
@@ -67,6 +67,6 @@ class MockTimer(val time: MockTime = new MockTime) extends 
Timer {
 
   def size: Int = taskQueue.size
 
-  override def shutdown(): Unit = {}
+  override def close(): Unit = {}
 
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java
 
b/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java
index 97d2e75eb04..15b22bb3e8d 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java
@@ -106,7 +106,8 @@ public class SystemTimer implements Timer {
         return taskCounter.get();
     }
 
-    public void shutdown() {
+    @Override
+    public void close() {
         taskExecutor.shutdown();
     }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java 
b/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java
index defed2bc90c..2771f34a7cd 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/timer/Timer.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.server.util.timer;
 
-public interface Timer {
+public interface Timer extends AutoCloseable {
     /**
      * Add a new task to this executor. It will be executed after the task's 
delay
      * (beginning from the time of submission)
@@ -38,8 +38,4 @@ public interface Timer {
      */
     int size();
 
-    /**
-     * Shutdown the timer service, leaving pending tasks unexecuted
-     */
-    void shutdown();
 }
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTest.java 
b/server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTest.java
index effce622c67..b7463ad01ce 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTest.java
@@ -74,8 +74,8 @@ public class TimerTest {
     }
 
     @AfterEach
-    public void teardown() {
-        timer.shutdown();
+    public void teardown() throws Exception {
+        timer.close();
     }
 
     @Test

Reply via email to