chia7712 commented on code in PR #16280: URL: https://github.com/apache/kafka/pull/16280#discussion_r1640574384
########## core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala: ########## @@ -472,6 +472,80 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { } } + private def alterBrokerConfigs(brokerId: String, newValue: java.lang.Long): Unit = { + if (isKRaftTest()) { + val admin = createAdminClient() + try { + val resource = new ConfigResource(ConfigResource.Type.BROKER, brokerId) + val configEntry = new ConfigEntry(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, newValue.toString) + val configEntry2 = new ConfigEntry(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, newValue.toString) + val configEntry3 = new ConfigEntry(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, newValue.toString) + val config = new Config(List(configEntry, configEntry2, configEntry3).asJavaCollection) + admin.alterConfigs(Map( Review Comment: Could you please use `incrementalAlterConfigs` instead of deprecated `alterConfigs`? ########## core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala: ########## @@ -472,6 +472,80 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { } } + private def alterBrokerConfigs(brokerId: String, newValue: java.lang.Long): Unit = { + if (isKRaftTest()) { + val admin = createAdminClient() + try { + val resource = new ConfigResource(ConfigResource.Type.BROKER, brokerId) + val configEntry = new ConfigEntry(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, newValue.toString) + val configEntry2 = new ConfigEntry(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, newValue.toString) + val configEntry3 = new ConfigEntry(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, newValue.toString) + val config = new Config(List(configEntry, configEntry2, configEntry3).asJavaCollection) + admin.alterConfigs(Map( + resource -> config, + ).asJava).all.get + } finally { + admin.close() + } + } else { + val newProps = new Properties() + newProps.put(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, newValue.toString) + newProps.put(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, newValue.toString) + newProps.put(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, newValue.toString) + val brokerIdOption = if (brokerId != "") Option(brokerId.toInt) else None + adminZkClient.changeBrokerConfig(brokerIdOption, newProps) + } + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testBrokerIdConfigChange(quorum: String): Unit = { + val newValue: java.lang.Long = 100000L + val brokerId: String = this.brokers.head.config.brokerId.toString + alterBrokerConfigs(brokerId, newValue) + for (b <- this.brokers) { + val value = if (b.config.brokerId.toString == brokerId) newValue else QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT + TestUtils.retry(10000) { + assertEquals(value, b.quotaManagers.leader.upperBound) + assertEquals(value, b.quotaManagers.follower.upperBound) + assertEquals(value, b.quotaManagers.alterLogDirs.upperBound) + } + } + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testDefaultBrokerIdConfigChange(quorum: String): Unit = { + val newValue: java.lang.Long = 100000L + val brokerId: String = "" + alterBrokerConfigs(brokerId, newValue) + for (b <- this.brokers) { + TestUtils.retry(10000) { + assertEquals(newValue, b.quotaManagers.leader.upperBound) + assertEquals(newValue, b.quotaManagers.follower.upperBound) + assertEquals(newValue, b.quotaManagers.alterLogDirs.upperBound) + } + } + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testDefaultAndBrokerIdConfigChange(quorum: String): Unit = { + val newValue: java.lang.Long = 100000L + val brokerId: String = this.brokers.head.config.brokerId.toString + alterBrokerConfigs(brokerId, newValue) + val newDefaultValue: java.lang.Long = 200000L + alterBrokerConfigs("", newDefaultValue) + for (b <- this.brokers) { + val value = if (b.config.brokerId.toString == brokerId) newValue else newDefaultValue + TestUtils.retry(10000) { + assertEquals(value, b.quotaManagers.leader.upperBound) + assertEquals(value, b.quotaManagers.follower.upperBound) + assertEquals(value, b.quotaManagers.alterLogDirs.upperBound) + } + } + } + Review Comment: Could you please add a test that unset both dynamic and default value? ########## core/src/main/scala/kafka/server/ConfigHandler.scala: ########## @@ -236,22 +236,29 @@ class IpConfigHandler(private val connectionQuotas: ConnectionQuotas) extends Co */ class BrokerConfigHandler(private val brokerConfig: KafkaConfig, private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging { - def processConfigChanges(brokerId: String, properties: Properties): Unit = { - def getOrDefault(prop: String): Long = { - if (properties.containsKey(prop)) - properties.getProperty(prop).toLong - else - QuotaConfigs.QUOTA_BYTES_PER_SECOND_DEFAULT - } if (brokerId == ZooKeeperInternals.DEFAULT_STRING) brokerConfig.dynamicConfig.updateDefaultConfig(properties) else if (brokerConfig.brokerId == brokerId.trim.toInt) { brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties) - quotaManagers.leader.updateQuota(upperBound(getOrDefault(QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG).toDouble)) - quotaManagers.follower.updateQuota(upperBound(getOrDefault(QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG).toDouble)) - quotaManagers.alterLogDirs.updateQuota(upperBound(getOrDefault(QuotaConfigs.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG).toDouble)) } + val updatedDynamicBrokerConfigs = brokerConfig.dynamicConfig.currentDynamicBrokerConfigs + val updatedDynamicDefaultConfigs = brokerConfig.dynamicConfig.currentDynamicDefaultConfigs + + def getOrDefault(prop: String): Long = { + updatedDynamicBrokerConfigs get prop match { + case Some(value) => value.toLong + case None => { Review Comment: please remove redundant `{}` ########## core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala: ########## @@ -472,6 +472,80 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { } } + private def alterBrokerConfigs(brokerId: String, newValue: java.lang.Long): Unit = { Review Comment: It seems `java.lang.Long` can be superseded by `Long` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org