This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 059589ef979 KAFKA-19606; Implement request handler metrics defined in 
KIP-1207 (#20481)
059589ef979 is described below

commit 059589ef9791feb97086a343804fbf031ea61570
Author: anonymous <[email protected]>
AuthorDate: Thu Oct 30 09:57:52 2025 -0500

    KAFKA-19606; Implement request handler metrics defined in KIP-1207 (#20481)
    
    This change implements a global shared thread counter mechanism to
    properly calculate the RequestHandlerAvgIdlePercent metric across all
    KafkaRequestHandlerPool instances within the same JVM process. This
    ensures accurate idle percentage calculations, especially in combined
    mode where both broker and controller request handler pools coexist.
    
    Previously, each KafkaRequestHandlerPool calculated idle percentages
    independently using only its own thread count as the denominator. In
    combined mode, this led to inaccurate aggregate idle percentage
    calculations and potential metric values exceeding 100% (values > 1.0)
    
    Added aggregateThreads as a global AtomicInteger in
    KafkaRequestHandlerPoolFactory. Modified KafkaRequestHandler to
    calculate two idle metrics. One per-pool metric which uses the number of
    thread in a given pool. One aggregate metric for backward compatibility
    which uses the number of threads across all request thread pools.
    
    Reviewers: José Armando García Sancio <[email protected]>, Kevin Wu
     <[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala | 11 +++-
 .../main/scala/kafka/server/ControllerServer.scala |  7 +-
 .../scala/kafka/server/KafkaRequestHandler.scala   | 75 +++++++++++++++++++---
 .../src/main/scala/kafka/server/SharedServer.scala |  4 ++
 .../main/scala/kafka/tools/TestRaftServer.scala    |  7 +-
 .../kafka/server/KafkaRequestHandlerTest.scala     | 60 +++++++++++++++--
 6 files changed, 142 insertions(+), 22 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 9404c2b2167..8165ddde8d1 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -465,9 +465,14 @@ class BrokerServer(
         clientMetricsManager = clientMetricsManager,
         groupConfigManager = groupConfigManager)
 
-      dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
-        socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
-        config.numIoThreads, "RequestHandlerAvgIdlePercent")
+      dataPlaneRequestHandlerPool = 
sharedServer.requestHandlerPoolFactory.createPool(
+        config.nodeId,
+        socketServer.dataPlaneRequestChannel,
+        dataPlaneRequestProcessor,
+        time,
+        config.numIoThreads,
+        "broker"
+      )
 
       metadataPublishers.add(new 
MetadataVersionConfigValidator(config.brokerId,
         () => config.processRoles.contains(ProcessRole.BrokerRole) && 
config.logDirs().size() > 1,
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 28a1e3bdfb2..ef81f10cbbd 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -286,13 +286,14 @@ class ControllerServer(
         registrationsPublisher,
         apiVersionManager,
         metadataCache)
-      controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
+      controllerApisHandlerPool = 
sharedServer.requestHandlerPoolFactory.createPool(
+        config.nodeId,
         socketServer.dataPlaneRequestChannel,
         controllerApis,
         time,
         config.numIoThreads,
-        "RequestHandlerAvgIdlePercent",
-        "controller")
+        "controller"
+      )
 
       // Set up the metadata cache publisher.
       metadataPublishers.add(metadataCachePublisher)
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index d4998cbb734..e1f0176bbb7 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -89,11 +89,13 @@ class KafkaRequestHandler(
   id: Int,
   brokerId: Int,
   val aggregateIdleMeter: Meter,
-  val totalHandlerThreads: AtomicInteger,
+  val aggregateThreads: AtomicInteger,
+  val poolIdleMeter: Meter,
+  val poolHandlerThreads: AtomicInteger,
   val requestChannel: RequestChannel,
   apis: ApiRequestHandler,
   time: Time,
-  nodeName: String = "broker"
+  nodeName: String
 ) extends Runnable with Logging {
   this.logIdent = s"[Kafka Request Handler $id on ${nodeName.capitalize} 
$brokerId] "
   private val shutdownComplete = new CountDownLatch(1)
@@ -112,7 +114,10 @@ class KafkaRequestHandler(
       val req = requestChannel.receiveRequest(300)
       val endTime = time.nanoseconds
       val idleTime = endTime - startSelectTime
-      aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
+      // Per-pool idle ratio uses the pool's own thread count as denominator
+      poolIdleMeter.mark(idleTime / poolHandlerThreads.get)
+      // Aggregate idle ratio uses the total threads across all pools as 
denominator
+      aggregateIdleMeter.mark(idleTime / aggregateThreads.get)
 
       req match {
         case RequestChannel.ShutdownRequest =>
@@ -192,14 +197,38 @@ class KafkaRequestHandler(
 
 }
 
+/**
+ * Factory for creating KafkaRequestHandlerPool instances with shared 
aggregate metrics.
+ * All pools created by the same factory share the same aggregateThreads 
counter.
+ */
+class KafkaRequestHandlerPoolFactory {
+  private[this] val aggregateThreads = new AtomicInteger(0)
+  private[this] val RequestHandlerAvgIdleMetricName = 
"RequestHandlerAvgIdlePercent"
+  
+  def createPool(
+    brokerId: Int,
+    requestChannel: RequestChannel,
+    apis: ApiRequestHandler,
+    time: Time,
+    numThreads: Int,
+    nodeName: String
+  ): KafkaRequestHandlerPool = {
+    new KafkaRequestHandlerPool(aggregateThreads, 
RequestHandlerAvgIdleMetricName, brokerId, requestChannel, apis, time, 
numThreads, nodeName)
+  }
+
+  // Only used for test purpose
+  def aggregateThreadCount: Int = aggregateThreads.get()
+}
+
 class KafkaRequestHandlerPool(
+  val aggregateThreads: AtomicInteger,
+  val requestHandlerAvgIdleMetricName: String,
   val brokerId: Int,
   val requestChannel: RequestChannel,
   val apis: ApiRequestHandler,
   time: Time,
   numThreads: Int,
-  requestHandlerAvgIdleMetricName: String,
-  nodeName: String = "broker"
+  nodeName: String
 ) extends Logging {
   // Changing the package or class name may cause incompatibility with 
existing code and metrics configuration
   private val metricsPackage = "kafka.server"
@@ -207,7 +236,16 @@ class KafkaRequestHandlerPool(
   private val metricsGroup = new KafkaMetricsGroup(metricsPackage, 
metricsClassName)
 
   val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
-  /* a meter to track the average free capacity of the request handlers */
+  private val perPoolIdleMeterName = if (nodeName == "broker") {
+    "BrokerRequestHandlerAvgIdlePercent"
+  } else if (nodeName == "controller") {
+    "ControllerRequestHandlerAvgIdlePercent"
+  } else {
+    throw new IllegalArgumentException("Invalid node name:" + nodeName)
+  }
+  /* Per-pool idle meter (broker-only or controller-only) */
+  private val perPoolIdleMeter = metricsGroup.newMeter(perPoolIdleMeterName, 
"percent", TimeUnit.NANOSECONDS)
+  /* Aggregate meter to track the average free capacity of the request 
handlers */
   private val aggregateIdleMeter = 
metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", 
TimeUnit.NANOSECONDS)
 
   this.logIdent = s"[data-plane Kafka Request Handler on 
${nodeName.capitalize} $brokerId] "
@@ -216,11 +254,28 @@ class KafkaRequestHandlerPool(
     createHandler(i)
   }
 
-  def createHandler(id: Int): Unit = synchronized {
-    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, 
threadPoolSize, requestChannel, apis, time, nodeName)
+  private def createHandler(id: Int): Unit = {
+    runnables += new KafkaRequestHandler(
+      id,
+      brokerId,
+      aggregateIdleMeter,
+      aggregateThreads,
+      perPoolIdleMeter,
+      threadPoolSize,
+      requestChannel,
+      apis,
+      time,
+      nodeName
+    )
+    aggregateThreads.getAndIncrement()
     KafkaThread.daemon("data-plane-kafka-request-handler-" + id, 
runnables(id)).start()
   }
 
+  private def deleteHandler(id: Int): Unit = {
+    runnables.remove(id).stop()
+    aggregateThreads.getAndDecrement()
+  }
+
   def resizeThreadPool(newSize: Int): Unit = synchronized {
     val currentSize = threadPoolSize.get
     info(s"Resizing request handler thread pool size from $currentSize to 
$newSize")
@@ -230,7 +285,7 @@ class KafkaRequestHandlerPool(
       }
     } else if (newSize < currentSize) {
       for (i <- 1 to (currentSize - newSize)) {
-        runnables.remove(currentSize - i).stop()
+        deleteHandler(currentSize - i)
       }
     }
     threadPoolSize.set(newSize)
@@ -242,6 +297,8 @@ class KafkaRequestHandlerPool(
       handler.initiateShutdown()
     for (handler <- runnables)
       handler.awaitShutdown()
+    // Unregister this pool's threads from shared aggregate counter
+    aggregateThreads.addAndGet(-threadPoolSize.get)
     info("shut down completely")
   }
 }
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala 
b/core/src/main/scala/kafka/server/SharedServer.scala
index 9c245765569..77d02e405d8 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -112,6 +112,10 @@ class SharedServer(
   private var usedByController: Boolean = false
   val brokerConfig = new KafkaConfig(sharedServerConfig.props, false)
   val controllerConfig = new KafkaConfig(sharedServerConfig.props, false)
+  
+  // Factory for creating request handler pools with shared aggregate thread 
counter
+  val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory()
+
   @volatile var metrics: Metrics = _metrics
   @volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
   @volatile var brokerMetrics: BrokerServerMetrics = _
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala 
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 48e101443a1..1e629be5146 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.{CompletableFuture, 
CountDownLatch, LinkedBlockingDe
 import joptsimple.{OptionException, OptionSpec}
 import kafka.network.SocketServer
 import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager}
-import kafka.server.{KafkaConfig, KafkaRequestHandlerPool}
+import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, 
KafkaRequestHandlerPoolFactory}
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.metrics.Metrics
@@ -67,6 +67,7 @@ class TestRaftServer(
   private val metrics = new Metrics(time)
   private val shutdownLatch = new CountDownLatch(1)
   private val threadNamePrefix = "test-raft"
+  private val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory()
 
   var socketServer: SocketServer = _
   var credentialProvider: CredentialProvider = _
@@ -125,13 +126,13 @@ class TestRaftServer(
       apiVersionManager
     )
 
-    dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(
+    dataPlaneRequestHandlerPool = requestHandlerPoolFactory.createPool(
       config.brokerId,
       socketServer.dataPlaneRequestChannel,
       requestHandler,
       time,
       config.numIoThreads,
-      "RequestHandlerAvgIdlePercent"
+      "broker"
     )
 
     workloadGenerator.start()
diff --git a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala 
b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
index e67e041e1f5..c6eba176828 100644
--- a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
+++ b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
@@ -59,7 +59,7 @@ class KafkaRequestHandlerTest {
     val requestChannel = new RequestChannel(10, time, metrics)
     val apiHandler = mock(classOf[ApiRequestHandler])
     try {
-      val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new 
AtomicInteger(1), requestChannel, apiHandler, time)
+      val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new 
AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, 
apiHandler, time, "broker")
 
       val request = makeRequest(time, metrics)
       requestChannel.sendRequest(request)
@@ -95,7 +95,7 @@ class KafkaRequestHandlerTest {
     val metrics = mock(classOf[RequestChannelMetrics])
     val apiHandler = mock(classOf[ApiRequestHandler])
     val requestChannel = new RequestChannel(10, time, metrics)
-    val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new 
AtomicInteger(1), requestChannel, apiHandler, time)
+    val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new 
AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, 
apiHandler, time, "broker")
 
     var handledCount = 0
     var tryCompleteActionCount = 0
@@ -131,7 +131,7 @@ class KafkaRequestHandlerTest {
     val metrics = mock(classOf[RequestChannelMetrics])
     val apiHandler = mock(classOf[ApiRequestHandler])
     val requestChannel = new RequestChannel(10, time, metrics)
-    val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new 
AtomicInteger(1), requestChannel, apiHandler, time)
+    val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new 
AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, 
apiHandler, time, "broker")
 
     val originalRequestLocal = mock(classOf[RequestLocal])
 
@@ -165,7 +165,7 @@ class KafkaRequestHandlerTest {
     val metrics = mock(classOf[RequestChannelMetrics])
     val apiHandler = mock(classOf[ApiRequestHandler])
     val requestChannel = new RequestChannel(10, time, metrics)
-    val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new 
AtomicInteger(1), requestChannel, apiHandler, time)
+    val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new 
AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, 
apiHandler, time, "broker")
 
     val originalRequestLocal = mock(classOf[RequestLocal])
     
when(originalRequestLocal.bufferSupplier).thenReturn(BufferSupplier.create())
@@ -698,4 +698,56 @@ class KafkaRequestHandlerTest {
     // cleanup
     brokerTopicStats.close()
   }
+
+  @Test
+  def testMetricsForMultipleRequestPools(): Unit = {
+    val time = new MockTime()
+    val metricsBroker = mock(classOf[RequestChannelMetrics])
+    val metricsController = mock(classOf[RequestChannelMetrics])
+    val requestChannelBroker = new RequestChannel(10, time, metricsBroker)
+    val requestChannelController = new RequestChannel(10, time, 
metricsController)
+    val apiHandler = mock(classOf[ApiRequestHandler])
+
+    // Create a factory for this test
+    val factory = new KafkaRequestHandlerPoolFactory()
+    
+    // Create broker pool with 4 threads
+    val brokerPool = factory.createPool(
+      0,
+      requestChannelBroker,
+      apiHandler,
+      time,
+      4,
+      "broker"
+    )
+
+    // Verify global counter is updated
+    assertEquals(4, factory.aggregateThreadCount, "global counter should be 4 
after broker pool")
+
+    // Create controller pool with 4 threads
+    val controllerPool = factory.createPool(
+      0,
+      requestChannelController,
+      apiHandler,
+      time,
+      4,
+      "controller"
+    )
+
+    // Verify global counter is updated to sum of both pools
+    assertEquals(8, factory.aggregateThreadCount, "global counter should be 8 
after both pools")
+
+    // Test pool resizing
+    // Shrink broker pool from 4 to 2 threads
+    brokerPool.resizeThreadPool(2)
+    assertEquals(2, brokerPool.threadPoolSize.get)
+    assertEquals(4, controllerPool.threadPoolSize.get)
+    assertEquals(6, factory.aggregateThreadCount)
+
+    // Expand controller pool from 4 to 6 threads
+    controllerPool.resizeThreadPool(6)
+    assertEquals(2, brokerPool.threadPoolSize.get)
+    assertEquals(6, controllerPool.threadPoolSize.get)
+    assertEquals(8, factory.aggregateThreadCount)
+  }
 }

Reply via email to