This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 059589ef979 KAFKA-19606; Implement request handler metrics defined in
KIP-1207 (#20481)
059589ef979 is described below
commit 059589ef9791feb97086a343804fbf031ea61570
Author: anonymous <[email protected]>
AuthorDate: Thu Oct 30 09:57:52 2025 -0500
KAFKA-19606; Implement request handler metrics defined in KIP-1207 (#20481)
This change implements a global shared thread counter mechanism to
properly calculate the RequestHandlerAvgIdlePercent metric across all
KafkaRequestHandlerPool instances within the same JVM process. This
ensures accurate idle percentage calculations, especially in combined
mode where both broker and controller request handler pools coexist.
Previously, each KafkaRequestHandlerPool calculated idle percentages
independently using only its own thread count as the denominator. In
combined mode, this led to inaccurate aggregate idle percentage
calculations and potential metric values exceeding 100% (values > 1.0)
Added aggregateThreads as a global AtomicInteger in
KafkaRequestHandlerPoolFactory. Modified KafkaRequestHandler to
calculate two idle metrics. One per-pool metric which uses the number of
thread in a given pool. One aggregate metric for backward compatibility
which uses the number of threads across all request thread pools.
Reviewers: José Armando García Sancio <[email protected]>, Kevin Wu
<[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 11 +++-
.../main/scala/kafka/server/ControllerServer.scala | 7 +-
.../scala/kafka/server/KafkaRequestHandler.scala | 75 +++++++++++++++++++---
.../src/main/scala/kafka/server/SharedServer.scala | 4 ++
.../main/scala/kafka/tools/TestRaftServer.scala | 7 +-
.../kafka/server/KafkaRequestHandlerTest.scala | 60 +++++++++++++++--
6 files changed, 142 insertions(+), 22 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 9404c2b2167..8165ddde8d1 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -465,9 +465,14 @@ class BrokerServer(
clientMetricsManager = clientMetricsManager,
groupConfigManager = groupConfigManager)
- dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
- socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
- config.numIoThreads, "RequestHandlerAvgIdlePercent")
+ dataPlaneRequestHandlerPool =
sharedServer.requestHandlerPoolFactory.createPool(
+ config.nodeId,
+ socketServer.dataPlaneRequestChannel,
+ dataPlaneRequestProcessor,
+ time,
+ config.numIoThreads,
+ "broker"
+ )
metadataPublishers.add(new
MetadataVersionConfigValidator(config.brokerId,
() => config.processRoles.contains(ProcessRole.BrokerRole) &&
config.logDirs().size() > 1,
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 28a1e3bdfb2..ef81f10cbbd 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -286,13 +286,14 @@ class ControllerServer(
registrationsPublisher,
apiVersionManager,
metadataCache)
- controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
+ controllerApisHandlerPool =
sharedServer.requestHandlerPoolFactory.createPool(
+ config.nodeId,
socketServer.dataPlaneRequestChannel,
controllerApis,
time,
config.numIoThreads,
- "RequestHandlerAvgIdlePercent",
- "controller")
+ "controller"
+ )
// Set up the metadata cache publisher.
metadataPublishers.add(metadataCachePublisher)
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index d4998cbb734..e1f0176bbb7 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -89,11 +89,13 @@ class KafkaRequestHandler(
id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
- val totalHandlerThreads: AtomicInteger,
+ val aggregateThreads: AtomicInteger,
+ val poolIdleMeter: Meter,
+ val poolHandlerThreads: AtomicInteger,
val requestChannel: RequestChannel,
apis: ApiRequestHandler,
time: Time,
- nodeName: String = "broker"
+ nodeName: String
) extends Runnable with Logging {
this.logIdent = s"[Kafka Request Handler $id on ${nodeName.capitalize}
$brokerId] "
private val shutdownComplete = new CountDownLatch(1)
@@ -112,7 +114,10 @@ class KafkaRequestHandler(
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
- aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
+ // Per-pool idle ratio uses the pool's own thread count as denominator
+ poolIdleMeter.mark(idleTime / poolHandlerThreads.get)
+ // Aggregate idle ratio uses the total threads across all pools as
denominator
+ aggregateIdleMeter.mark(idleTime / aggregateThreads.get)
req match {
case RequestChannel.ShutdownRequest =>
@@ -192,14 +197,38 @@ class KafkaRequestHandler(
}
+/**
+ * Factory for creating KafkaRequestHandlerPool instances with shared
aggregate metrics.
+ * All pools created by the same factory share the same aggregateThreads
counter.
+ */
+class KafkaRequestHandlerPoolFactory {
+ private[this] val aggregateThreads = new AtomicInteger(0)
+ private[this] val RequestHandlerAvgIdleMetricName =
"RequestHandlerAvgIdlePercent"
+
+ def createPool(
+ brokerId: Int,
+ requestChannel: RequestChannel,
+ apis: ApiRequestHandler,
+ time: Time,
+ numThreads: Int,
+ nodeName: String
+ ): KafkaRequestHandlerPool = {
+ new KafkaRequestHandlerPool(aggregateThreads,
RequestHandlerAvgIdleMetricName, brokerId, requestChannel, apis, time,
numThreads, nodeName)
+ }
+
+ // Only used for test purpose
+ def aggregateThreadCount: Int = aggregateThreads.get()
+}
+
class KafkaRequestHandlerPool(
+ val aggregateThreads: AtomicInteger,
+ val requestHandlerAvgIdleMetricName: String,
val brokerId: Int,
val requestChannel: RequestChannel,
val apis: ApiRequestHandler,
time: Time,
numThreads: Int,
- requestHandlerAvgIdleMetricName: String,
- nodeName: String = "broker"
+ nodeName: String
) extends Logging {
// Changing the package or class name may cause incompatibility with
existing code and metrics configuration
private val metricsPackage = "kafka.server"
@@ -207,7 +236,16 @@ class KafkaRequestHandlerPool(
private val metricsGroup = new KafkaMetricsGroup(metricsPackage,
metricsClassName)
val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
- /* a meter to track the average free capacity of the request handlers */
+ private val perPoolIdleMeterName = if (nodeName == "broker") {
+ "BrokerRequestHandlerAvgIdlePercent"
+ } else if (nodeName == "controller") {
+ "ControllerRequestHandlerAvgIdlePercent"
+ } else {
+ throw new IllegalArgumentException("Invalid node name:" + nodeName)
+ }
+ /* Per-pool idle meter (broker-only or controller-only) */
+ private val perPoolIdleMeter = metricsGroup.newMeter(perPoolIdleMeterName,
"percent", TimeUnit.NANOSECONDS)
+ /* Aggregate meter to track the average free capacity of the request
handlers */
private val aggregateIdleMeter =
metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent",
TimeUnit.NANOSECONDS)
this.logIdent = s"[data-plane Kafka Request Handler on
${nodeName.capitalize} $brokerId] "
@@ -216,11 +254,28 @@ class KafkaRequestHandlerPool(
createHandler(i)
}
- def createHandler(id: Int): Unit = synchronized {
- runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter,
threadPoolSize, requestChannel, apis, time, nodeName)
+ private def createHandler(id: Int): Unit = {
+ runnables += new KafkaRequestHandler(
+ id,
+ brokerId,
+ aggregateIdleMeter,
+ aggregateThreads,
+ perPoolIdleMeter,
+ threadPoolSize,
+ requestChannel,
+ apis,
+ time,
+ nodeName
+ )
+ aggregateThreads.getAndIncrement()
KafkaThread.daemon("data-plane-kafka-request-handler-" + id,
runnables(id)).start()
}
+ private def deleteHandler(id: Int): Unit = {
+ runnables.remove(id).stop()
+ aggregateThreads.getAndDecrement()
+ }
+
def resizeThreadPool(newSize: Int): Unit = synchronized {
val currentSize = threadPoolSize.get
info(s"Resizing request handler thread pool size from $currentSize to
$newSize")
@@ -230,7 +285,7 @@ class KafkaRequestHandlerPool(
}
} else if (newSize < currentSize) {
for (i <- 1 to (currentSize - newSize)) {
- runnables.remove(currentSize - i).stop()
+ deleteHandler(currentSize - i)
}
}
threadPoolSize.set(newSize)
@@ -242,6 +297,8 @@ class KafkaRequestHandlerPool(
handler.initiateShutdown()
for (handler <- runnables)
handler.awaitShutdown()
+ // Unregister this pool's threads from shared aggregate counter
+ aggregateThreads.addAndGet(-threadPoolSize.get)
info("shut down completely")
}
}
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala
b/core/src/main/scala/kafka/server/SharedServer.scala
index 9c245765569..77d02e405d8 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -112,6 +112,10 @@ class SharedServer(
private var usedByController: Boolean = false
val brokerConfig = new KafkaConfig(sharedServerConfig.props, false)
val controllerConfig = new KafkaConfig(sharedServerConfig.props, false)
+
+ // Factory for creating request handler pools with shared aggregate thread
counter
+ val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory()
+
@volatile var metrics: Metrics = _metrics
@volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
@volatile var brokerMetrics: BrokerServerMetrics = _
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 48e101443a1..1e629be5146 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.{CompletableFuture,
CountDownLatch, LinkedBlockingDe
import joptsimple.{OptionException, OptionSpec}
import kafka.network.SocketServer
import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager}
-import kafka.server.{KafkaConfig, KafkaRequestHandlerPool}
+import kafka.server.{KafkaConfig, KafkaRequestHandlerPool,
KafkaRequestHandlerPoolFactory}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
@@ -67,6 +67,7 @@ class TestRaftServer(
private val metrics = new Metrics(time)
private val shutdownLatch = new CountDownLatch(1)
private val threadNamePrefix = "test-raft"
+ private val requestHandlerPoolFactory = new KafkaRequestHandlerPoolFactory()
var socketServer: SocketServer = _
var credentialProvider: CredentialProvider = _
@@ -125,13 +126,13 @@ class TestRaftServer(
apiVersionManager
)
- dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(
+ dataPlaneRequestHandlerPool = requestHandlerPoolFactory.createPool(
config.brokerId,
socketServer.dataPlaneRequestChannel,
requestHandler,
time,
config.numIoThreads,
- "RequestHandlerAvgIdlePercent"
+ "broker"
)
workloadGenerator.start()
diff --git a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
index e67e041e1f5..c6eba176828 100644
--- a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
+++ b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
@@ -59,7 +59,7 @@ class KafkaRequestHandlerTest {
val requestChannel = new RequestChannel(10, time, metrics)
val apiHandler = mock(classOf[ApiRequestHandler])
try {
- val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new
AtomicInteger(1), requestChannel, apiHandler, time)
+ val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new
AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel,
apiHandler, time, "broker")
val request = makeRequest(time, metrics)
requestChannel.sendRequest(request)
@@ -95,7 +95,7 @@ class KafkaRequestHandlerTest {
val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler])
val requestChannel = new RequestChannel(10, time, metrics)
- val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new
AtomicInteger(1), requestChannel, apiHandler, time)
+ val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new
AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel,
apiHandler, time, "broker")
var handledCount = 0
var tryCompleteActionCount = 0
@@ -131,7 +131,7 @@ class KafkaRequestHandlerTest {
val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler])
val requestChannel = new RequestChannel(10, time, metrics)
- val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new
AtomicInteger(1), requestChannel, apiHandler, time)
+ val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new
AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel,
apiHandler, time, "broker")
val originalRequestLocal = mock(classOf[RequestLocal])
@@ -165,7 +165,7 @@ class KafkaRequestHandlerTest {
val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler])
val requestChannel = new RequestChannel(10, time, metrics)
- val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new
AtomicInteger(1), requestChannel, apiHandler, time)
+ val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new
AtomicInteger(1), mock(classOf[Meter]), new AtomicInteger(1), requestChannel,
apiHandler, time, "broker")
val originalRequestLocal = mock(classOf[RequestLocal])
when(originalRequestLocal.bufferSupplier).thenReturn(BufferSupplier.create())
@@ -698,4 +698,56 @@ class KafkaRequestHandlerTest {
// cleanup
brokerTopicStats.close()
}
+
+ @Test
+ def testMetricsForMultipleRequestPools(): Unit = {
+ val time = new MockTime()
+ val metricsBroker = mock(classOf[RequestChannelMetrics])
+ val metricsController = mock(classOf[RequestChannelMetrics])
+ val requestChannelBroker = new RequestChannel(10, time, metricsBroker)
+ val requestChannelController = new RequestChannel(10, time,
metricsController)
+ val apiHandler = mock(classOf[ApiRequestHandler])
+
+ // Create a factory for this test
+ val factory = new KafkaRequestHandlerPoolFactory()
+
+ // Create broker pool with 4 threads
+ val brokerPool = factory.createPool(
+ 0,
+ requestChannelBroker,
+ apiHandler,
+ time,
+ 4,
+ "broker"
+ )
+
+ // Verify global counter is updated
+ assertEquals(4, factory.aggregateThreadCount, "global counter should be 4
after broker pool")
+
+ // Create controller pool with 4 threads
+ val controllerPool = factory.createPool(
+ 0,
+ requestChannelController,
+ apiHandler,
+ time,
+ 4,
+ "controller"
+ )
+
+ // Verify global counter is updated to sum of both pools
+ assertEquals(8, factory.aggregateThreadCount, "global counter should be 8
after both pools")
+
+ // Test pool resizing
+ // Shrink broker pool from 4 to 2 threads
+ brokerPool.resizeThreadPool(2)
+ assertEquals(2, brokerPool.threadPoolSize.get)
+ assertEquals(4, controllerPool.threadPoolSize.get)
+ assertEquals(6, factory.aggregateThreadCount)
+
+ // Expand controller pool from 4 to 6 threads
+ controllerPool.resizeThreadPool(6)
+ assertEquals(2, brokerPool.threadPoolSize.get)
+ assertEquals(6, controllerPool.threadPoolSize.get)
+ assertEquals(8, factory.aggregateThreadCount)
+ }
}