This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0b9a8ba  MINOR: greatly improve test runtime by unblocking purgatory 
and quota manager threads (#11653)
0b9a8ba is described below

commit 0b9a8bac36f16b5397e9ec3a0441758e4b60a384
Author: Lucas Bradstreet <[email protected]>
AuthorDate: Thu Jan 6 01:22:19 2022 -0800

    MINOR: greatly improve test runtime by unblocking purgatory and quota 
manager threads (#11653)
    
    Reviewers: Rajini Sivaram <[email protected]>
---
 core/src/main/scala/kafka/server/ClientQuotaManager.scala | 12 +++++++++++-
 core/src/main/scala/kafka/server/DelayedOperation.scala   | 11 +++++++++--
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 5860cfc..7334519 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -562,8 +562,18 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
       quotaMetricTags.asJava)
   }
 
+  def initiateShutdown(): Unit = {
+    throttledChannelReaper.initiateShutdown()
+    // improve shutdown time by waking up any ShutdownableThread(s) blocked on 
poll by sending a no-op
+    delayQueue.add(new ThrottledChannel(time, 0, new ThrottleCallback {
+      override def startThrottling(): Unit = {}
+      override def endThrottling(): Unit = {}
+    }))
+  }
+
   def shutdown(): Unit = {
-    throttledChannelReaper.shutdown()
+    initiateShutdown()
+    throttledChannelReaper.awaitShutdown()
   }
 
   class DefaultQuotaCallback extends ClientQuotaCallback {
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 251dd28..1151e65 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -328,8 +328,15 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
    * Shutdown the expire reaper thread
    */
   def shutdown(): Unit = {
-    if (reaperEnabled)
-      expirationReaper.shutdown()
+    if (reaperEnabled) {
+      expirationReaper.initiateShutdown()
+      // improve shutdown time by waking up any ShutdownableThread(s) blocked 
on poll by sending a no-op
+      timeoutTimer.add(new TimerTask {
+        override val delayMs: Long = 0
+        override def run(): Unit = {}
+      })
+      expirationReaper.awaitShutdown()
+    }
     timeoutTimer.shutdown()
     removeMetric("PurgatorySize", metricsTags)
     removeMetric("NumDelayedOperations", metricsTags)

Reply via email to