This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 5fd9bd10abf KAFKA-15265: Dynamic broker configs for remote fetch/copy 
quotas (#16078)
5fd9bd10abf is described below

commit 5fd9bd10abfed9eb3d200173591e26558c15f2af
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Wed Jun 12 19:47:46 2024 +0530

    KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas (#16078)
    
    Reviewers: Kamal Chandraprakash<[email protected]>, Satish 
Duggana <[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    | 11 +++
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 55 +++++++++----
 .../kafka/server/DynamicBrokerConfigTest.scala     | 96 ++++++++++++++++++++++
 3 files changed, 146 insertions(+), 16 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index b920a962afc..c1c87d579ef 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -34,6 +34,7 @@ import 
org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
@@ -249,6 +250,16 @@ public class RemoteLogManager implements Closeable {
         indexCache.resizeCacheSize(remoteLogIndexFileCacheSize);
     }
 
+    public void updateCopyQuota(long quota) {
+        LOGGER.info("Updating remote copy quota to {} bytes per second", 
quota);
+        rlmCopyQuotaManager.updateQuota(new Quota(quota, true));
+    }
+
+    public void updateFetchQuota(long quota) {
+        LOGGER.info("Updating remote fetch quota to {} bytes per second", 
quota);
+        rlmFetchQuotaManager.updateQuota(new Quota(quota, true));
+    }
+
     private void removeMetrics() {
         
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
         
metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 9b75425666f..f627fa966b4 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -1165,36 +1165,57 @@ class DynamicRemoteLogConfig(server: KafkaBroker) 
extends BrokerReconfigurable w
 
   override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
     newConfig.values.forEach { (k, v) =>
-      if (reconfigurableConfigs.contains(k)) {
-        if 
(k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
 {
-          val newValue = v.asInstanceOf[Long]
-          val oldValue = getValue(server.config, k)
-          if (newValue != oldValue && newValue <= 0) {
-            val errorMsg = s"Dynamic remote log manager config update 
validation failed for $k=$v"
-            throw new ConfigException(s"$errorMsg, value should be at least 1")
-          }
+      if 
(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP.equals(k)
 ||
+        
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP.equals(k)
 ||
+        
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP.equals(k))
 {
+        val newValue = v.asInstanceOf[Long]
+        val oldValue = getValue(server.config, k)
+        if (newValue != oldValue && newValue <= 0) {
+          val errorMsg = s"Dynamic remote log manager config update validation 
failed for $k=$v"
+          throw new ConfigException(s"$errorMsg, value should be at least 1")
         }
       }
     }
   }
 
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
-    val oldValue = 
oldConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
-    val newValue = 
newConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
-    if (oldValue != newValue) {
-      val remoteLogManager = server.remoteLogManagerOpt
-      if (remoteLogManager.nonEmpty) {
+    def oldLongValue(k: String): Long = oldConfig.getLong(k)
+    def newLongValue(k: String): Long = newConfig.getLong(k)
+
+    def isChangedLongValue(k : String): Boolean = oldLongValue(k) != 
newLongValue(k)
+
+    val remoteLogManager = server.remoteLogManagerOpt
+    if (remoteLogManager.nonEmpty) {
+      if 
(isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
 {
+        val oldValue = 
oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+        val newValue = 
newLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
         remoteLogManager.get.resizeCacheSize(newValue)
         info(s"Dynamic remote log manager config: 
${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} 
updated, " +
           s"old value: $oldValue, new value: $newValue")
       }
+      if 
(isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP))
 {
+        val oldValue = 
oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
+        val newValue = 
newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
+        remoteLogManager.get.updateCopyQuota(newValue)
+        info(s"Dynamic remote log manager config: 
${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP} 
updated, " +
+          s"old value: $oldValue, new value: $newValue")
+      }
+      if 
(isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP))
 {
+        val oldValue = 
oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
+        val newValue = 
newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
+        remoteLogManager.get.updateFetchQuota(newValue)
+        info(s"Dynamic remote log manager config: 
${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP} 
updated, " +
+          s"old value: $oldValue, new value: $newValue")
+      }
     }
   }
 
   private def getValue(config: KafkaConfig, name: String): Long = {
     name match {
-      case 
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
-        
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+      case 
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP |
+           
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP |
+           
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP =>
+        config.getLong(name)
       case n => throw new IllegalStateException(s"Unexpected dynamic remote 
log manager config $n")
     }
   }
@@ -1203,6 +1224,8 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends 
BrokerReconfigurable w
 object DynamicRemoteLogConfig {
   val ReconfigurableConfigs = Set(
     RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
-    RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP
+    RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
+    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
+    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP
   )
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 2e5d77cdc6f..3ba15889157 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -850,6 +850,102 @@ class DynamicBrokerConfigTest {
     Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
   }
 
+  @Test
+  def testRemoteLogManagerCopyQuotaUpdates(): Unit = {
+    testRemoteLogManagerQuotaUpdates(
+      RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
+      
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
+      (remoteLogManager, quota) => 
Mockito.verify(remoteLogManager).updateCopyQuota(quota)
+    )
+  }
+
+  @Test
+  def testRemoteLogManagerFetchQuotaUpdates(): Unit = {
+    testRemoteLogManagerQuotaUpdates(
+      
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
+      
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
+      (remoteLogManager, quota) => 
Mockito.verify(remoteLogManager).updateFetchQuota(quota)
+    )
+  }
+
+  def testRemoteLogManagerQuotaUpdates(quotaProp: String, defaultQuota: Long, 
verifyMethod: (RemoteLogManager, Long) => Unit): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
+    val config = KafkaConfig.fromProps(props)
+    val serverMock: KafkaServer = mock(classOf[KafkaServer])
+    val remoteLogManagerMockOpt = Option(mock(classOf[RemoteLogManager]))
+
+    Mockito.when(serverMock.config).thenReturn(config)
+    
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
+
+    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
+
+    assertEquals(defaultQuota, config.getLong(quotaProp))
+
+    // Update default config
+    props.put(quotaProp, "100")
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(100, config.getLong(quotaProp))
+    verifyMethod(remoteLogManagerMockOpt.get, 100)
+
+    // Update per broker config
+    props.put(quotaProp, "200")
+    config.dynamicConfig.updateBrokerConfig(0, props)
+    assertEquals(200, config.getLong(quotaProp))
+    verifyMethod(remoteLogManagerMockOpt.get, 200)
+
+    Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
+  }
+
+  @Test
+  def testRemoteLogManagerMultipleConfigUpdates(): Unit = {
+    val indexFileCacheSizeProp = 
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP
+    val copyQuotaProp = 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP
+    val fetchQuotaProp = 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP
+
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 9092)
+    val config = KafkaConfig.fromProps(props)
+    val serverMock: KafkaServer = mock(classOf[KafkaServer])
+    val remoteLogManagerMockOpt = 
Option(Mockito.mock(classOf[RemoteLogManager]))
+
+    Mockito.when(serverMock.config).thenReturn(config)
+    
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
+
+    config.dynamicConfig.initialize(None, None)
+    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
+
+    // Default values
+    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
 config.getLong(indexFileCacheSizeProp))
+    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
 config.getLong(copyQuotaProp))
+    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
 config.getLong(fetchQuotaProp))
+
+    // Update default config
+    props.put(indexFileCacheSizeProp, "4")
+    props.put(copyQuotaProp, "100")
+    props.put(fetchQuotaProp, "200")
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertEquals(4, config.getLong(indexFileCacheSizeProp))
+    assertEquals(100, config.getLong(copyQuotaProp))
+    assertEquals(200, config.getLong(fetchQuotaProp))
+    Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4)
+    Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(100)
+    Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(200)
+
+    // Update per broker config
+    props.put(indexFileCacheSizeProp, "8")
+    props.put(copyQuotaProp, "200")
+    props.put(fetchQuotaProp, "400")
+    config.dynamicConfig.updateBrokerConfig(0, props)
+    assertEquals(8, config.getLong(indexFileCacheSizeProp))
+    assertEquals(200, config.getLong(copyQuotaProp))
+    assertEquals(400, config.getLong(fetchQuotaProp))
+    Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(8)
+    Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(200)
+    Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(400)
+
+    Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
+  }
+
   def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long,
                                             retentionMs: Long,
                                             logLocalRetentionBytes: Long,

Reply via email to