This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b2da186f214 KAFKA-15265: Remote fetch throttle metrics (#16087)
b2da186f214 is described below

commit b2da186f21490ddf9922a7ad834e830deee1f7a8
Author: Abhijeet Kumar <abhijeet.cse....@gmail.com>
AuthorDate: Wed Jul 3 14:11:18 2024 +0530

    KAFKA-15265: Remote fetch throttle metrics (#16087)
    
    As part of 
[KIP-956](https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas),
 we have added quota for remote fetches from remote storage. In this PR, we are 
adding the following metrics for remote fetch throttling.
    
    remote-fetch-throttle-time-avg : The average time in millis remote fetches 
was throttled by a broker
    remote-fetch-throttle-time-max : The max time in millis remote fetches was 
throttled by a broker
    
    Reviewers: Kamal Chandraprakash<kamal.chandraprak...@gmail.com>, Luke Chen 
<show...@gmail.com>, Satish Duggana <sati...@apache.org>
---
 .../java/kafka/log/remote/RemoteLogManager.java    | 17 ++++++--
 .../kafka/log/remote/quota/RLMQuotaManager.java    |  7 ++--
 .../kafka/log/remote/quota/RLMQuotaMetrics.java    | 47 ++++++++++++++++++++++
 .../main/scala/kafka/server/ReplicaManager.scala   |  6 ++-
 .../kafka/log/remote/RemoteLogManagerTest.java     | 10 ++---
 .../log/remote/quota/RLMQuotaManagerTest.java      | 16 ++++----
 .../unit/kafka/server/ReplicaManagerTest.scala     | 43 +++++++++++++++++---
 7 files changed, 121 insertions(+), 25 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 5aedd1bbfb3..2d589337e71 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -21,6 +21,7 @@ import kafka.cluster.Partition;
 import kafka.log.UnifiedLog;
 import kafka.log.remote.quota.RLMQuotaManager;
 import kafka.log.remote.quota.RLMQuotaManagerConfig;
+import kafka.log.remote.quota.RLMQuotaMetrics;
 import kafka.server.BrokerTopicStats;
 import kafka.server.QuotaType;
 import kafka.server.StopPartition;
@@ -34,6 +35,7 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
@@ -137,6 +139,7 @@ import java.util.stream.Stream;
 import scala.Option;
 import scala.collection.JavaConverters;
 
+import static 
kafka.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS;
 import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
@@ -171,6 +174,7 @@ public class RemoteLogManager implements Closeable {
     private final Condition copyQuotaManagerLockCondition = 
copyQuotaManagerLock.newCondition();
     private final RLMQuotaManager rlmCopyQuotaManager;
     private final RLMQuotaManager rlmFetchQuotaManager;
+    private final Sensor fetchThrottleTimeSensor;
 
     private final RemoteIndexCache indexCache;
     private final RemoteStorageThreadPool remoteStorageReaderThreadPool;
@@ -229,6 +233,9 @@ public class RemoteLogManager implements Closeable {
         rlmCopyQuotaManager = createRLMCopyQuotaManager();
         rlmFetchQuotaManager = createRLMFetchQuotaManager();
 
+        fetchThrottleTimeSensor = new RLMQuotaMetrics(metrics, 
"remote-fetch-throttle-time", RemoteLogManager.class.getSimpleName(),
+            "The %s time in millis remote fetches was throttled by a broker", 
INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS).sensor();
+
         indexCache = new 
RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), 
remoteLogStorageManager, logDir);
         delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
         rlmScheduledThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
@@ -286,8 +293,12 @@ public class RemoteLogManager implements Closeable {
           "Tracking fetch byte-rate for Remote Log Manager", time);
     }
 
-    public boolean isRemoteLogFetchQuotaExceeded() {
-        return rlmFetchQuotaManager.isQuotaExceeded();
+    public long getFetchThrottleTimeMs() {
+        return rlmFetchQuotaManager.getThrottleTimeMs();
+    }
+
+    public Sensor fetchThrottleTimeSensor() {
+        return fetchThrottleTimeSensor;
     }
 
     static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig 
rlmConfig) {
@@ -804,7 +815,7 @@ public class RemoteLogManager implements Closeable {
 
                             copyQuotaManagerLock.lock();
                             try {
-                                while (rlmCopyQuotaManager.isQuotaExceeded()) {
+                                while (rlmCopyQuotaManager.getThrottleTimeMs() 
> 0) {
                                     logger.debug("Quota exceeded for copying 
log segments, waiting for the quota to be available.");
                                     // If the thread gets interrupted while 
waiting, the InterruptedException is thrown
                                     // back to the caller. It's important to 
note that the task being executed is already
diff --git a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java 
b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
index 68513dd4215..0f695a21e63 100644
--- a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
+++ b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
@@ -18,6 +18,7 @@ package kafka.log.remote.quota;
 
 import kafka.server.QuotaType;
 import kafka.server.SensorAccess;
+import kafka.utils.QuotaUtils;
 
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.KafkaMetric;
@@ -80,16 +81,16 @@ public class RLMQuotaManager {
         }
     }
 
-    public boolean isQuotaExceeded() {
+    public long getThrottleTimeMs() {
         Sensor sensorInstance = sensor();
         try {
             sensorInstance.checkQuotas();
         } catch (QuotaViolationException qve) {
             LOGGER.debug("Quota violated for sensor ({}), metric: ({}), 
metric-value: ({}), bound: ({})",
                 sensorInstance.name(), qve.metric().metricName(), qve.value(), 
qve.bound());
-            return true;
+            return QuotaUtils.throttleTime(qve, time.milliseconds());
         }
-        return false;
+        return 0L;
     }
 
     public void record(double value) {
diff --git a/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java 
b/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java
new file mode 100644
index 00000000000..96dd6b1b7e6
--- /dev/null
+++ b/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.SensorAccess;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import scala.runtime.BoxedUnit;
+
+public class RLMQuotaMetrics {
+
+    private final Sensor sensor;
+
+    public RLMQuotaMetrics(Metrics metrics, String name, String group, String 
descriptionFormat, long expirationTime) {
+        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+        SensorAccess sensorAccess = new SensorAccess(lock, metrics);
+        this.sensor = sensorAccess.getOrCreate(name, expirationTime, s -> {
+            s.add(metrics.metricName(name + "-avg", group, 
String.format(descriptionFormat, "average")), new Avg());
+            s.add(metrics.metricName(name + "-max", group, 
String.format(descriptionFormat, "maximum")), new Max());
+            return BoxedUnit.UNIT;
+        });
+    }
+
+    public Sensor sensor() {
+        return sensor;
+    }
+}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8ffc7eae585..6ebd151a032 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1762,7 +1762,11 @@ class ReplicaManager(val config: KafkaConfig,
         createLogReadResult(highWatermark, leaderLogStartOffset, 
leaderLogEndOffset,
           new OffsetMovedToTieredStorageException("Given offset" + offset + " 
is moved to tiered storage"))
       } else {
-        val fetchDataInfo = if 
(remoteLogManager.get.isRemoteLogFetchQuotaExceeded) {
+        val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMs()
+        val fetchDataInfo = if (throttleTimeMs > 0) {
+          // Record the throttle time for the remote log fetches
+          
remoteLogManager.get.fetchThrottleTimeSensor().record(throttleTimeMs, 
time.milliseconds())
+
           // We do not want to send an exception in a LogReadResult response 
(like we do in other cases when we send
           // UnknownOffsetMetadata), because it is classified as an error in 
reading the data, and a response is
           // immediately sent back to the client. Instead, we want to serve 
data for the other topic partitions of the
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 5b8030b6756..8cf190785f5 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -2870,7 +2870,7 @@ public class RemoteLogManagerTest {
             assertTimeoutPreemptively(Duration.ofMillis(100), () -> 
task.copyLogSegmentsToRemote(mockLog));
 
             // Verify quota check was performed
-            verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded();
+            verify(rlmCopyQuotaManager, times(1)).getThrottleTimeMs();
             // Verify bytes to copy was recorded with the quota manager
             verify(rlmCopyQuotaManager, times(1)).record(10);
 
@@ -2893,7 +2893,7 @@ public class RemoteLogManagerTest {
             Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.emptySet(), topicIds);
         // Ensure the copy operation is waiting for quota to be available
         TestUtils.waitForCondition(() -> {
-            verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded();
+            verify(rlmCopyQuotaManager, atLeast(1)).getThrottleTimeMs();
             return true;
         }, "Quota exceeded check did not happen");
         // Verify RLM is able to shut down
@@ -2958,7 +2958,7 @@ public class RemoteLogManagerTest {
         
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
         
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
 
-        when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+        when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaExceeded 
? 1000L : 0L);
         doNothing().when(rlmCopyQuotaManager).record(anyInt());
 
         RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
@@ -3029,8 +3029,8 @@ public class RemoteLogManagerTest {
         
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
         
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
 
-        // After the first call, isQuotaExceeded should return true
-        when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false, true);
+        // After the first call, getThrottleTimeMs should return non-zero 
throttle time
+        when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(0L, 1000L);
         doNothing().when(rlmCopyQuotaManager).record(anyInt());
 
         RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
diff --git a/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java 
b/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java
index 2d639d4f241..ce6c3ce3ecc 100644
--- a/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java
@@ -47,19 +47,19 @@ public class RLMQuotaManagerTest {
         RLMQuotaManager quotaManager = new RLMQuotaManager(
             new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, 
DESCRIPTION, time);
 
-        assertFalse(quotaManager.isQuotaExceeded());
+        assertEquals(0L, quotaManager.getThrottleTimeMs());
         quotaManager.record(500);
         // Move clock by 1 sec, quota is violated
         moveClock(1);
-        assertTrue(quotaManager.isQuotaExceeded());
+        assertEquals(9_000L, quotaManager.getThrottleTimeMs());
 
         // Move clock by another 8 secs, quota is still violated for the window
         moveClock(8);
-        assertTrue(quotaManager.isQuotaExceeded());
+        assertEquals(1_000L, quotaManager.getThrottleTimeMs());
 
         // Move clock by 1 sec, quota is no more violated
         moveClock(1);
-        assertFalse(quotaManager.isQuotaExceeded());
+        assertEquals(0L, quotaManager.getThrottleTimeMs());
     }
 
     @Test
@@ -67,9 +67,9 @@ public class RLMQuotaManagerTest {
         RLMQuotaManager quotaManager = new RLMQuotaManager(
             new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, 
DESCRIPTION, time);
 
-        assertFalse(quotaManager.isQuotaExceeded());
+        assertFalse(quotaManager.getThrottleTimeMs() > 0);
         quotaManager.record(51);
-        assertTrue(quotaManager.isQuotaExceeded());
+        assertTrue(quotaManager.getThrottleTimeMs() > 0);
 
         Map<MetricName, KafkaMetric> fetchQuotaMetrics = 
metrics.metrics().entrySet().stream()
             .filter(entry -> entry.getKey().name().equals("byte-rate") && 
entry.getKey().group().equals(QUOTA_TYPE.toString()))
@@ -88,7 +88,7 @@ public class RLMQuotaManagerTest {
         // Update quota to 60, quota is no more violated
         Quota quota60Bytes = new Quota(60, true);
         quotaManager.updateQuota(quota60Bytes);
-        assertFalse(quotaManager.isQuotaExceeded());
+        assertFalse(quotaManager.getThrottleTimeMs() > 0);
 
         // Verify quota metrics were updated
         Map<MetricName, MetricConfig> configForQuotaMetricsAfterFirstUpdate = 
extractMetricConfig(fetchQuotaMetrics);
@@ -100,7 +100,7 @@ public class RLMQuotaManagerTest {
         // Update quota to 40, quota is violated again
         Quota quota40Bytes = new Quota(40, true);
         quotaManager.updateQuota(quota40Bytes);
-        assertTrue(quotaManager.isQuotaExceeded());
+        assertTrue(quotaManager.getThrottleTimeMs() > 0);
 
         // Verify quota metrics were updated
         assertNotEquals(configForQuotaMetricsAfterFirstUpdate, 
extractMetricConfig(fetchQuotaMetrics));
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a1f3ce002c2..b1fffa88518 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -23,6 +23,8 @@ import kafka.cluster.PartitionTest.MockPartitionListener
 import kafka.cluster.{BrokerEndPoint, Partition}
 import kafka.log._
 import kafka.log.remote.RemoteLogManager
+import 
kafka.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS
+import kafka.log.remote.quota.RLMQuotaMetrics
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile}
 import kafka.server.epoch.util.MockBlockingSender
@@ -116,6 +118,8 @@ class ReplicaManagerTest {
   private var addPartitionsToTxnManager: AddPartitionsToTxnManager = _
   private var brokerTopicStats: BrokerTopicStats = _
   private val transactionSupportedOperation = genericError
+  private val quotaExceededThrottleTime = 1000
+  private val quotaAvailableThrottleTime = 0
 
   // Constants defined for readability
   private val zkVersion = 0
@@ -133,6 +137,13 @@ class ReplicaManagerTest {
     alterPartitionManager = mock(classOf[AlterPartitionManager])
     quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
     mockRemoteLogManager = mock(classOf[RemoteLogManager])
+    when(mockRemoteLogManager.fetchThrottleTimeSensor()).thenReturn(
+      new RLMQuotaMetrics(metrics,
+        "remote-fetch-throttle-time",
+        classOf[RemoteLogManager].getSimpleName,
+        "The %s time in millis remote fetches was throttled by a broker",
+        INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS)
+        .sensor())
     addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
 
     // Anytime we try to verify, just automatically run the callback as though 
the transaction was verified.
@@ -3357,7 +3368,8 @@ class ReplicaManagerTest {
     defaultTopicRemoteLogStorageEnable: Boolean = true,
     setupLogDirMetaProperties: Boolean = false,
     directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP,
-    buildRemoteLogAuxState: Boolean = false
+    buildRemoteLogAuxState: Boolean = false,
+    remoteFetchQuotaExceeded: Option[Boolean] = None
   ): ReplicaManager = {
     val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
     val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
@@ -3411,6 +3423,15 @@ class ReplicaManagerTest {
 
     when(metadataCache.contains(new TopicPartition(topic, 0))).thenReturn(true)
 
+    if (remoteFetchQuotaExceeded.isDefined) {
+      assertFalse(remoteLogManager.isDefined)
+      if (remoteFetchQuotaExceeded.get) {
+        
when(mockRemoteLogManager.getFetchThrottleTimeMs()).thenReturn(quotaExceededThrottleTime)
+      } else {
+        
when(mockRemoteLogManager.getFetchThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime)
+      }
+    }
+
     // Transactional appends attempt to schedule to the request handler thread 
using a non request handler thread. Set this to avoid error.
     KafkaRequestHandler.setBypassThreadCheck(true)
 
@@ -3967,7 +3988,7 @@ class ReplicaManagerTest {
     val tp0 = new TopicPartition(topic, 0)
     val tidp0 = new TopicIdPartition(topicId, tp0)
     // create a replicaManager with remoteLog enabled
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteFetchQuotaExceeded = Some(false))
     try {
       val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
       replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
@@ -4022,7 +4043,7 @@ class ReplicaManagerTest {
     val tp0 = new TopicPartition(topic, 0)
     val tidp0 = new TopicIdPartition(topicId, tp0)
     // create a replicaManager with remoteLog enabled
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog= true)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog= true, remoteFetchQuotaExceeded = Some(false))
     try {
       val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
       replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
@@ -6713,7 +6734,7 @@ class ReplicaManagerTest {
 
   @Test
   def testRemoteReadQuotaExceeded(): Unit = {
-    when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(true)
+    
when(mockRemoteLogManager.getFetchThrottleTimeMs).thenReturn(quotaExceededThrottleTime)
 
     val tp0 = new TopicPartition(topic, 0)
     val tpId0 = new TopicIdPartition(topicId, tp0)
@@ -6727,11 +6748,17 @@ class ReplicaManagerTest {
     assertFalse(fetchInfo.firstEntryIncomplete)
     assertFalse(fetchInfo.abortedTransactions.isPresent)
     assertFalse(fetchInfo.delayedRemoteStorageFetch.isPresent)
+
+    val allMetrics = metrics.metrics()
+    val avgMetric = 
allMetrics.get(metrics.metricName("remote-fetch-throttle-time-avg", 
"RemoteLogManager"))
+    val maxMetric = 
allMetrics.get(metrics.metricName("remote-fetch-throttle-time-max", 
"RemoteLogManager"))
+    assertEquals(quotaExceededThrottleTime, 
avgMetric.metricValue.asInstanceOf[Double].toLong)
+    assertEquals(quotaExceededThrottleTime, 
maxMetric.metricValue.asInstanceOf[Double].toLong)
   }
 
   @Test
   def testRemoteReadQuotaNotExceeded(): Unit = {
-    when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(false)
+    
when(mockRemoteLogManager.getFetchThrottleTimeMs).thenReturn(quotaAvailableThrottleTime)
 
     val tp0 = new TopicPartition(topic, 0)
     val tpId0 = new TopicIdPartition(topicId, tp0)
@@ -6745,6 +6772,12 @@ class ReplicaManagerTest {
     assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
     assertEquals(MemoryRecords.EMPTY, fetchInfo.records)
     assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent)
+
+    val allMetrics = metrics.metrics()
+    val avgMetric = 
allMetrics.get(metrics.metricName("remote-fetch-throttle-time-avg", 
"RemoteLogManager"))
+    val maxMetric = 
allMetrics.get(metrics.metricName("remote-fetch-throttle-time-max", 
"RemoteLogManager"))
+    assertEquals(Double.NaN, avgMetric.metricValue)
+    assertEquals(Double.NaN, maxMetric.metricValue)
   }
 
   private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): 
Seq[(TopicIdPartition, LogReadResult)] = {

Reply via email to