dajac commented on a change in pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#discussion_r833118468



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -35,6 +35,7 @@ import org.apache.kafka.common.network.{ListenerName, 
ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
 import org.apache.kafka.common.utils.{ConfigUtils, Utils}
 
+import java.util.concurrent.CopyOnWriteArrayList

Review comment:
       nit: Could we either move this one next to the other `java.util.*` 
imports or move the others down here?

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -535,24 +539,24 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
       try {
         val customConfigs = new util.HashMap[String, 
Object](newConfig.originalsFromThisConfig) // non-Kafka configs
-        newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_))
-        reconfigurables.foreach {
+        newConfig.valuesFromThisConfig.keySet.forEach(k => 
customConfigs.remove(k))
+        reconfigurables.forEach( {

Review comment:
       nit: You can remove the parenthesis. `forEach { ...` should work.

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -535,24 +539,24 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
       try {
         val customConfigs = new util.HashMap[String, 
Object](newConfig.originalsFromThisConfig) // non-Kafka configs
-        newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_))
-        reconfigurables.foreach {
+        newConfig.valuesFromThisConfig.keySet.forEach(k => 
customConfigs.remove(k))
+        reconfigurables.forEach( {
           case listenerReconfigurable: ListenerReconfigurable =>
             processListenerReconfigurable(listenerReconfigurable, newConfig, 
customConfigs, validateOnly, reloadOnly = false)
           case reconfigurable =>
             if (needsReconfiguration(reconfigurable.reconfigurableConfigs, 
changeMap.keySet, deletedKeySet))
               processReconfigurable(reconfigurable, changeMap.keySet, 
newConfig.valuesFromThisConfig, customConfigs, validateOnly)
-        }
+        })
 
         // BrokerReconfigurable updates are processed after config is updated. 
Only do the validation here.
         val brokerReconfigurablesToUpdate = 
mutable.Buffer[BrokerReconfigurable]()
-        brokerReconfigurables.foreach { reconfigurable =>
+        brokerReconfigurables.forEach({ reconfigurable =>

Review comment:
       nit: ditto.

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -201,8 +202,11 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private[server] val staticDefaultConfigs = 
ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
   private val dynamicBrokerConfigs = mutable.Map[String, String]()
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
-  private val reconfigurables = mutable.Buffer[Reconfigurable]()
-  private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
+
+  // Use COWArrayList to prevent concurrent modification exception when an 
item is added by one thread to these
+  // collections, while another thread is iterating over them

Review comment:
       nit: Could we add `.` at the end of the sentence?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to