dajac commented on a change in pull request #11920: URL: https://github.com/apache/kafka/pull/11920#discussion_r833118468
########## File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala ########## @@ -35,6 +35,7 @@ import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable} import org.apache.kafka.common.security.authenticator.LoginManager import org.apache.kafka.common.utils.{ConfigUtils, Utils} +import java.util.concurrent.CopyOnWriteArrayList Review comment: nit: Could we either move this one next to the other `java.util.*` imports or move the others down here? ########## File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala ########## @@ -535,24 +539,24 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging if (changeMap.nonEmpty || deletedKeySet.nonEmpty) { try { val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs - newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_)) - reconfigurables.foreach { + newConfig.valuesFromThisConfig.keySet.forEach(k => customConfigs.remove(k)) + reconfigurables.forEach( { Review comment: nit: You can remove the parenthesis. `forEach { ...` should work. ########## File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala ########## @@ -535,24 +539,24 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging if (changeMap.nonEmpty || deletedKeySet.nonEmpty) { try { val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs - newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_)) - reconfigurables.foreach { + newConfig.valuesFromThisConfig.keySet.forEach(k => customConfigs.remove(k)) + reconfigurables.forEach( { case listenerReconfigurable: ListenerReconfigurable => processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false) case reconfigurable => if (needsReconfiguration(reconfigurable.reconfigurableConfigs, changeMap.keySet, deletedKeySet)) processReconfigurable(reconfigurable, changeMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly) - } + }) // BrokerReconfigurable updates are processed after config is updated. Only do the validation here. val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]() - brokerReconfigurables.foreach { reconfigurable => + brokerReconfigurables.forEach({ reconfigurable => Review comment: nit: ditto. ########## File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala ########## @@ -201,8 +202,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala private val dynamicBrokerConfigs = mutable.Map[String, String]() private val dynamicDefaultConfigs = mutable.Map[String, String]() - private val reconfigurables = mutable.Buffer[Reconfigurable]() - private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]() + + // Use COWArrayList to prevent concurrent modification exception when an item is added by one thread to these + // collections, while another thread is iterating over them Review comment: nit: Could we add `.` at the end of the sentence? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org