This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 5fd9bd10abf KAFKA-15265: Dynamic broker configs for remote fetch/copy
quotas (#16078)
5fd9bd10abf is described below
commit 5fd9bd10abfed9eb3d200173591e26558c15f2af
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Wed Jun 12 19:47:46 2024 +0530
KAFKA-15265: Dynamic broker configs for remote fetch/copy quotas (#16078)
Reviewers: Kamal Chandraprakash<[email protected]>, Satish
Duggana <[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 11 +++
.../scala/kafka/server/DynamicBrokerConfig.scala | 55 +++++++++----
.../kafka/server/DynamicBrokerConfigTest.scala | 96 ++++++++++++++++++++++
3 files changed, 146 insertions(+), 16 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index b920a962afc..c1c87d579ef 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -34,6 +34,7 @@ import
org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
@@ -249,6 +250,16 @@ public class RemoteLogManager implements Closeable {
indexCache.resizeCacheSize(remoteLogIndexFileCacheSize);
}
+ public void updateCopyQuota(long quota) {
+ LOGGER.info("Updating remote copy quota to {} bytes per second",
quota);
+ rlmCopyQuotaManager.updateQuota(new Quota(quota, true));
+ }
+
+ public void updateFetchQuota(long quota) {
+ LOGGER.info("Updating remote fetch quota to {} bytes per second",
quota);
+ rlmFetchQuotaManager.updateQuota(new Quota(quota, true));
+ }
+
private void removeMetrics() {
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 9b75425666f..f627fa966b4 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -1165,36 +1165,57 @@ class DynamicRemoteLogConfig(server: KafkaBroker)
extends BrokerReconfigurable w
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
newConfig.values.forEach { (k, v) =>
- if (reconfigurableConfigs.contains(k)) {
- if
(k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
{
- val newValue = v.asInstanceOf[Long]
- val oldValue = getValue(server.config, k)
- if (newValue != oldValue && newValue <= 0) {
- val errorMsg = s"Dynamic remote log manager config update
validation failed for $k=$v"
- throw new ConfigException(s"$errorMsg, value should be at least 1")
- }
+ if
(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP.equals(k)
||
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP.equals(k)
||
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP.equals(k))
{
+ val newValue = v.asInstanceOf[Long]
+ val oldValue = getValue(server.config, k)
+ if (newValue != oldValue && newValue <= 0) {
+ val errorMsg = s"Dynamic remote log manager config update validation
failed for $k=$v"
+ throw new ConfigException(s"$errorMsg, value should be at least 1")
}
}
}
}
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig):
Unit = {
- val oldValue =
oldConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
- val newValue =
newConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
- if (oldValue != newValue) {
- val remoteLogManager = server.remoteLogManagerOpt
- if (remoteLogManager.nonEmpty) {
+ def oldLongValue(k: String): Long = oldConfig.getLong(k)
+ def newLongValue(k: String): Long = newConfig.getLong(k)
+
+ def isChangedLongValue(k : String): Boolean = oldLongValue(k) !=
newLongValue(k)
+
+ val remoteLogManager = server.remoteLogManagerOpt
+ if (remoteLogManager.nonEmpty) {
+ if
(isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
{
+ val oldValue =
oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+ val newValue =
newLongValue(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
remoteLogManager.get.resizeCacheSize(newValue)
info(s"Dynamic remote log manager config:
${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP}
updated, " +
s"old value: $oldValue, new value: $newValue")
}
+ if
(isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP))
{
+ val oldValue =
oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
+ val newValue =
newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
+ remoteLogManager.get.updateCopyQuota(newValue)
+ info(s"Dynamic remote log manager config:
${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP}
updated, " +
+ s"old value: $oldValue, new value: $newValue")
+ }
+ if
(isChangedLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP))
{
+ val oldValue =
oldLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
+ val newValue =
newLongValue(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
+ remoteLogManager.get.updateFetchQuota(newValue)
+ info(s"Dynamic remote log manager config:
${RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP}
updated, " +
+ s"old value: $oldValue, new value: $newValue")
+ }
}
}
private def getValue(config: KafkaConfig, name: String): Long = {
name match {
- case
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
-
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+ case
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP |
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP |
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP =>
+ config.getLong(name)
case n => throw new IllegalStateException(s"Unexpected dynamic remote
log manager config $n")
}
}
@@ -1203,6 +1224,8 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends
BrokerReconfigurable w
object DynamicRemoteLogConfig {
val ReconfigurableConfigs = Set(
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
- RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP
+ RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
+ RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
+ RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP
)
}
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 2e5d77cdc6f..3ba15889157 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -850,6 +850,102 @@ class DynamicBrokerConfigTest {
Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
}
+ @Test
+ def testRemoteLogManagerCopyQuotaUpdates(): Unit = {
+ testRemoteLogManagerQuotaUpdates(
+ RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
+
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
+ (remoteLogManager, quota) =>
Mockito.verify(remoteLogManager).updateCopyQuota(quota)
+ )
+ }
+
+ @Test
+ def testRemoteLogManagerFetchQuotaUpdates(): Unit = {
+ testRemoteLogManagerQuotaUpdates(
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
+
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
+ (remoteLogManager, quota) =>
Mockito.verify(remoteLogManager).updateFetchQuota(quota)
+ )
+ }
+
+ def testRemoteLogManagerQuotaUpdates(quotaProp: String, defaultQuota: Long,
verifyMethod: (RemoteLogManager, Long) => Unit): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 9092)
+ val config = KafkaConfig.fromProps(props)
+ val serverMock: KafkaServer = mock(classOf[KafkaServer])
+ val remoteLogManagerMockOpt = Option(mock(classOf[RemoteLogManager]))
+
+ Mockito.when(serverMock.config).thenReturn(config)
+
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
+
+ config.dynamicConfig.initialize(None, None)
+ config.dynamicConfig.addBrokerReconfigurable(new
DynamicRemoteLogConfig(serverMock))
+
+ assertEquals(defaultQuota, config.getLong(quotaProp))
+
+ // Update default config
+ props.put(quotaProp, "100")
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(100, config.getLong(quotaProp))
+ verifyMethod(remoteLogManagerMockOpt.get, 100)
+
+ // Update per broker config
+ props.put(quotaProp, "200")
+ config.dynamicConfig.updateBrokerConfig(0, props)
+ assertEquals(200, config.getLong(quotaProp))
+ verifyMethod(remoteLogManagerMockOpt.get, 200)
+
+ Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
+ }
+
+ @Test
+ def testRemoteLogManagerMultipleConfigUpdates(): Unit = {
+ val indexFileCacheSizeProp =
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP
+ val copyQuotaProp =
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP
+ val fetchQuotaProp =
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP
+
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port
= 9092)
+ val config = KafkaConfig.fromProps(props)
+ val serverMock: KafkaServer = mock(classOf[KafkaServer])
+ val remoteLogManagerMockOpt =
Option(Mockito.mock(classOf[RemoteLogManager]))
+
+ Mockito.when(serverMock.config).thenReturn(config)
+
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
+
+ config.dynamicConfig.initialize(None, None)
+ config.dynamicConfig.addBrokerReconfigurable(new
DynamicRemoteLogConfig(serverMock))
+
+ // Default values
+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
config.getLong(indexFileCacheSizeProp))
+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
config.getLong(copyQuotaProp))
+
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
config.getLong(fetchQuotaProp))
+
+ // Update default config
+ props.put(indexFileCacheSizeProp, "4")
+ props.put(copyQuotaProp, "100")
+ props.put(fetchQuotaProp, "200")
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(4, config.getLong(indexFileCacheSizeProp))
+ assertEquals(100, config.getLong(copyQuotaProp))
+ assertEquals(200, config.getLong(fetchQuotaProp))
+ Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4)
+ Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(100)
+ Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(200)
+
+ // Update per broker config
+ props.put(indexFileCacheSizeProp, "8")
+ props.put(copyQuotaProp, "200")
+ props.put(fetchQuotaProp, "400")
+ config.dynamicConfig.updateBrokerConfig(0, props)
+ assertEquals(8, config.getLong(indexFileCacheSizeProp))
+ assertEquals(200, config.getLong(copyQuotaProp))
+ assertEquals(400, config.getLong(fetchQuotaProp))
+ Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(8)
+ Mockito.verify(remoteLogManagerMockOpt.get).updateCopyQuota(200)
+ Mockito.verify(remoteLogManagerMockOpt.get).updateFetchQuota(400)
+
+ Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
+ }
+
def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long,
retentionMs: Long,
logLocalRetentionBytes: Long,