jsancio commented on code in PR #20481:
URL: https://github.com/apache/kafka/pull/20481#discussion_r2462924316
##########
core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala:
##########
@@ -698,4 +698,81 @@ class KafkaRequestHandlerTest {
// cleanup
brokerTopicStats.close()
}
+
+ @Test
+ def testRequestThreadMetrics(): Unit = {
+ val time = Time.SYSTEM
+ val metricsBroker = new RequestChannelMetrics(java.util.Set.of[ApiKeys])
+ val metricsController = new
RequestChannelMetrics(java.util.Set.of[ApiKeys])
Review Comment:
The other tests use a `mock(classOf[RequestChannelMetrics])`. Why aren't we
doing the same thing here?
##########
core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala:
##########
@@ -698,4 +698,81 @@ class KafkaRequestHandlerTest {
// cleanup
brokerTopicStats.close()
}
+
+ @Test
+ def testRequestThreadMetrics(): Unit = {
Review Comment:
How about calling this method something like
`testMetricsForMultipleRequestPools`
##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -215,6 +215,9 @@ class KafkaRequestHandlerPoolFactory {
): KafkaRequestHandlerPool = {
new KafkaRequestHandlerPool(aggregateThreads,
RequestHandlerAvgIdleMetricName, brokerId, requestChannel, apis, time,
numThreads, nodeName)
}
+
+ // Only used for test purpose
+ def getAggregateThreads: AtomicInteger = aggregateThreads
Review Comment:
In Scala read methods are not prefixed with the string "get". E.g.
`aggregateThreads`.
You don't want public users of this type to be able to modify the atomic
integer. You can protect this by exposing this property instead: `def
aggregateThreadCount: Int = aggregateThread.get()`.
##########
core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala:
##########
@@ -698,4 +698,81 @@ class KafkaRequestHandlerTest {
// cleanup
brokerTopicStats.close()
}
+
+ @Test
+ def testRequestThreadMetrics(): Unit = {
+ val time = Time.SYSTEM
+ val metricsBroker = new RequestChannelMetrics(java.util.Set.of[ApiKeys])
+ val metricsController = new
RequestChannelMetrics(java.util.Set.of[ApiKeys])
+ val requestChannelBroker = new RequestChannel(10, time, metricsBroker)
+ val requestChannelController = new RequestChannel(10, time,
metricsController)
+ val apiHandler = mock(classOf[ApiRequestHandler])
+ val metricsGroup = new KafkaMetricsGroup("kafka.server",
"KafkaRequestHandlerPool")
+
+ // Create a factory for this test
+ val factory = new KafkaRequestHandlerPoolFactory()
+
+ // Create broker pool with 4 threads
+ val brokerPool = factory.createPool(
+ 0,
+ requestChannelBroker,
+ apiHandler,
+ time,
+ 4,
+ "broker"
+ )
+
+ // Verify global counter is updated
+ assertEquals(4, factory.getAggregateThreads.get(), "global counter should
be 4 after broker pool")
+
+ // Create controller pool with 4 threads
+ val controllerPool = factory.createPool(
+ 0,
+ requestChannelController,
+ apiHandler,
+ time,
+ 4,
+ "controller"
+ )
+
+ // Verify global counter is updated to sum of both pools
+ assertEquals(8, factory.getAggregateThreads.get, "global counter should be
8 after both pools")
+
+ val aggregateMeter = metricsGroup.newMeter("RequestHandlerAvgIdlePercent",
"percent", TimeUnit.NANOSECONDS)
+ val brokerPerPoolIdleMeter =
metricsGroup.newMeter("BrokerRequestHandlerAvgIdlePercent", "percent",
TimeUnit.NANOSECONDS)
+ val controllerPerPoolIdleMeter =
metricsGroup.newMeter("ControllerRequestHandlerAvgIdlePercent", "percent",
TimeUnit.NANOSECONDS)
+
+ var aggregateValue = 0.0
+ var brokerPerPoolValue = 0.0
+ var controllerPerPoolValue = 0.0
+
+ // Wait until all idle-percent meters have been initialized with non-zero
rates,
+ // or timeout after the given duration.
+ val startTime = time.milliseconds()
+ while(aggregateValue == 0.0 || brokerPerPoolValue == 0.0 ||
controllerPerPoolValue == 0.0) {
+ if (time.milliseconds() - startTime > 8000)
+ throw new RuntimeException("Timeout waiting for idle-percent metrics
to initialize")
+ aggregateValue = aggregateMeter.oneMinuteRate()
+ brokerPerPoolValue = brokerPerPoolIdleMeter.oneMinuteRate()
+ controllerPerPoolValue = controllerPerPoolIdleMeter.oneMinuteRate()
+ }
Review Comment:
You need this hack because you are using `Time.SYSTEM` for the test. You can
use MockTime and that object's `sleep(int)` method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]