This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a640a81040f KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric 
(#15463)
a640a81040f is described below

commit a640a81040f6ef6f85819b60194f0394f5f2194e
Author: Johnny Hsu <44309740+johnnych...@users.noreply.github.com>
AuthorDate: Mon Apr 1 10:52:53 2024 +0800

    KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric (#15463)
    
    The variable of metrics item 
(kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec) is singleton 
object and it could be removed by other tests which are running in same JVM 
(and it is not recreated). Hence, verifying the metrics value is not stable to 
this test case.
    
    Reviewers: Luke Chen <show...@gmail.com>, Chia-Ping Tsai 
<chia7...@gmail.com>, Kamal Chandraprakash <kamal.chandraprak...@gmail.com>
---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 23 +++-------------------
 1 file changed, 3 insertions(+), 20 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 35a5d9cc7fb..c7587e4d615 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -4185,16 +4185,13 @@ class ReplicaManagerTest {
         mock(classOf[FetchDataInfo])
       }).when(spyRLM).read(any())
 
-      // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec 
metric value before fetching
-      val curExpiresPerSec = 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]
+      val curExpiresPerSec = 
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
       replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 100000, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
       // advancing the clock to expire the delayed remote fetch
       timer.advanceClock(2000L)
 
-      // verify the metric value is incremented since the delayed remote fetch 
is expired
-      TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long],
-        "The ExpiresPerSec value is not incremented. Current value is: " +
-          
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
+      // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is 
called since the delayed remote fetch is expired
+      TestUtils.waitUntilTrue(() => (curExpiresPerSec + 1) == 
DelayedRemoteFetchMetrics.expiredRequestMeter.count(), 
"DelayedRemoteFetchMetrics.expiredRequestMeter.count() should be 1, but got: " 
+ DelayedRemoteFetchMetrics.expiredRequestMeter.count(), 10000L)
       latch.countDown()
     } finally {
       Utils.tryAll(util.Arrays.asList[Callable[Void]](
@@ -4210,20 +4207,6 @@ class ReplicaManagerTest {
     }
   }
 
-  private def safeYammerMetricValue(name: String): Any = {
-    val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
-    val opt = allMetrics.find { case (n, _) => n.getMBeanName.endsWith(name) }
-    if (opt.isEmpty)
-      0L
-    else {
-      opt.get._2 match {
-        case m: Gauge[_] => m.value
-        case m: Meter => m.count()
-        case m => fail(s"Unexpected broker metric of class ${m.getClass}")
-      }
-    }
-  }
-
   private def yammerMetricValue(name: String): Any = {
     val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
     val (_, metric) = allMetrics.find { case (n, _) => 
n.getMBeanName.endsWith(name) }

Reply via email to