chia7712 commented on code in PR #16280:
URL: https://github.com/apache/kafka/pull/16280#discussion_r1640574384


##########
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala:
##########
@@ -472,6 +472,80 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     }
   }
 
+  private def alterBrokerConfigs(brokerId: String, newValue: java.lang.Long): 
Unit = {
+    if (isKRaftTest()) {
+      val admin = createAdminClient()
+      try {
+        val resource = new ConfigResource(ConfigResource.Type.BROKER, brokerId)
+        val configEntry = new 
ConfigEntry(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, 
newValue.toString)
+        val configEntry2 = new 
ConfigEntry(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, 
newValue.toString)
+        val configEntry3 = new 
ConfigEntry(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, 
newValue.toString)
+        val config = new Config(List(configEntry, configEntry2, 
configEntry3).asJavaCollection)
+        admin.alterConfigs(Map(

Review Comment:
   Could you please use `incrementalAlterConfigs` instead of deprecated  
`alterConfigs`?



##########
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala:
##########
@@ -472,6 +472,80 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     }
   }
 
+  private def alterBrokerConfigs(brokerId: String, newValue: java.lang.Long): 
Unit = {
+    if (isKRaftTest()) {
+      val admin = createAdminClient()
+      try {
+        val resource = new ConfigResource(ConfigResource.Type.BROKER, brokerId)
+        val configEntry = new 
ConfigEntry(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, 
newValue.toString)
+        val configEntry2 = new 
ConfigEntry(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, 
newValue.toString)
+        val configEntry3 = new 
ConfigEntry(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, 
newValue.toString)
+        val config = new Config(List(configEntry, configEntry2, 
configEntry3).asJavaCollection)
+        admin.alterConfigs(Map(
+          resource -> config,
+        ).asJava).all.get
+      } finally {
+        admin.close()
+      }
+    } else {
+      val newProps = new Properties()
+      newProps.put(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, 
newValue.toString)
+      newProps.put(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, 
newValue.toString)
+      
newProps.put(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG,
 newValue.toString)
+      val brokerIdOption = if (brokerId != "") Option(brokerId.toInt) else None
+      adminZkClient.changeBrokerConfig(brokerIdOption, newProps)
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testBrokerIdConfigChange(quorum: String): Unit = {
+    val newValue: java.lang.Long = 100000L
+    val brokerId: String = this.brokers.head.config.brokerId.toString
+    alterBrokerConfigs(brokerId, newValue)
+    for (b <- this.brokers) {
+      val value = if (b.config.brokerId.toString == brokerId) newValue else 
QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT
+      TestUtils.retry(10000) {
+        assertEquals(value, b.quotaManagers.leader.upperBound)
+        assertEquals(value, b.quotaManagers.follower.upperBound)
+        assertEquals(value, b.quotaManagers.alterLogDirs.upperBound)
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDefaultBrokerIdConfigChange(quorum: String): Unit = {
+    val newValue: java.lang.Long = 100000L
+    val brokerId: String = ""
+    alterBrokerConfigs(brokerId, newValue)
+    for (b <- this.brokers) {
+      TestUtils.retry(10000) {
+        assertEquals(newValue, b.quotaManagers.leader.upperBound)
+        assertEquals(newValue, b.quotaManagers.follower.upperBound)
+        assertEquals(newValue, b.quotaManagers.alterLogDirs.upperBound)
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDefaultAndBrokerIdConfigChange(quorum: String): Unit = {
+    val newValue: java.lang.Long = 100000L
+    val brokerId: String = this.brokers.head.config.brokerId.toString
+    alterBrokerConfigs(brokerId, newValue)
+    val newDefaultValue: java.lang.Long = 200000L
+    alterBrokerConfigs("", newDefaultValue)
+    for (b <- this.brokers) {
+      val value = if (b.config.brokerId.toString == brokerId) newValue else 
newDefaultValue
+      TestUtils.retry(10000) {
+        assertEquals(value, b.quotaManagers.leader.upperBound)
+        assertEquals(value, b.quotaManagers.follower.upperBound)
+        assertEquals(value, b.quotaManagers.alterLogDirs.upperBound)
+      }
+    }
+  }
+

Review Comment:
   Could you please add a test that unset both dynamic and default value?



##########
core/src/main/scala/kafka/server/ConfigHandler.scala:
##########
@@ -236,22 +236,29 @@ class IpConfigHandler(private val connectionQuotas: 
ConnectionQuotas) extends Co
   */
 class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
                           private val quotaManagers: QuotaManagers) extends 
ConfigHandler with Logging {
-
   def processConfigChanges(brokerId: String, properties: Properties): Unit = {
-    def getOrDefault(prop: String): Long = {
-      if (properties.containsKey(prop))
-        properties.getProperty(prop).toLong
-      else
-        QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT
-    }
     if (brokerId == ZooKeeperInternals.DEFAULT_STRING)
       brokerConfig.dynamicConfig.updateDefaultConfig(properties)
     else if (brokerConfig.brokerId == brokerId.trim.toInt) {
       brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, 
properties)
-      
quotaManagers.leader.updateQuota(upperBound(getOrDefault(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG).toDouble))
-      
quotaManagers.follower.updateQuota(upperBound(getOrDefault(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG).toDouble))
-      
quotaManagers.alterLogDirs.updateQuota(upperBound(getOrDefault(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG).toDouble))
     }
+    val updatedDynamicBrokerConfigs = 
brokerConfig.dynamicConfig.currentDynamicBrokerConfigs
+    val updatedDynamicDefaultConfigs = 
brokerConfig.dynamicConfig.currentDynamicDefaultConfigs
+
+    def getOrDefault(prop: String): Long = {
+      updatedDynamicBrokerConfigs get prop match {
+        case Some(value) => value.toLong
+        case None => {

Review Comment:
   please remove redundant `{}`



##########
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala:
##########
@@ -472,6 +472,80 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     }
   }
 
+  private def alterBrokerConfigs(brokerId: String, newValue: java.lang.Long): 
Unit = {

Review Comment:
   It seems `java.lang.Long` can be superseded by `Long` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to