rajinisivaram commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493520165



##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, 
previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => 
ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + 
configName
+              if (!processedFiles.contains(prefixedName) && 
configProps.containsKey(prefixedName) &&
+                
configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName,
 ""))) {
+                val equivalentFileName = 
configProps.getProperty(prefixedName).replace("/", "//")
+                configProps.setProperty(prefixedName, equivalentFileName)
+                processedFiles.add(prefixedName)
+              }
+            })
+        })
+  }
+
+  private[server] def trimSSLStorePaths(configProps: Properties): Boolean = {
+    var fileChanged = false
+    val processedFiles = new mutable.HashSet[String]
+
+    reconfigurables
+      .filter(reconfigurable => 
ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+      .foreach {
+        case reconfigurable: ListenerReconfigurable =>
+        ReloadableFileConfigs.foreach(configName => {
+          val prefixedName = reconfigurable.listenerName.configPrefix + 
configName
+          if (!processedFiles.contains(prefixedName) && 
configProps.containsKey(prefixedName)) {
+            val configFileName = configProps.getProperty(prefixedName)
+            val equivalentFileName = configFileName.replace("//", "/")
+            if (!configFileName.equals(equivalentFileName)) {
+              fileChanged = true

Review comment:
       This means update was requested, but not necessarily that file has 
changed?

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, 
previousConfigProps: Map[String, String]): Unit ={

Review comment:
       nit: `SSL` => `Ssl`

##########
File path: core/src/main/scala/kafka/server/ConfigHandler.scala
##########
@@ -203,7 +203,13 @@ class BrokerConfigHandler(private val brokerConfig: 
KafkaConfig,
     if (brokerId == ConfigEntityName.Default)
       brokerConfig.dynamicConfig.updateDefaultConfig(properties)
     else if (brokerConfig.brokerId == brokerId.trim.toInt) {
-      brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, 
properties)
+      val persistentProps = 
brokerConfig.dynamicConfig.fromPersistentProps(properties, perBrokerConfig = 
true)
+      // The filepath was changed for equivalent replacement, which means we 
should reload
+      if (brokerConfig.dynamicConfig.trimSSLStorePaths(persistentProps)) {
+        
brokerConfig.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(persistentProps)
+      }

Review comment:
       Can't we put this logic in `DynamicBrokerConfig`?

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, 
previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => 
ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + 
configName
+              if (!processedFiles.contains(prefixedName) && 
configProps.containsKey(prefixedName) &&
+                
configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName,
 ""))) {
+                val equivalentFileName = 
configProps.getProperty(prefixedName).replace("/", "//")
+                configProps.setProperty(prefixedName, equivalentFileName)
+                processedFiles.add(prefixedName)
+              }
+            })
+        })
+  }
+
+  private[server] def trimSSLStorePaths(configProps: Properties): Boolean = {

Review comment:
       `SSL` => `Ssl`

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, 
previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => 
ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + 
configName
+              if (!processedFiles.contains(prefixedName) && 
configProps.containsKey(prefixedName) &&
+                
configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName,
 ""))) {
+                val equivalentFileName = 
configProps.getProperty(prefixedName).replace("/", "//")
+                configProps.setProperty(prefixedName, equivalentFileName)
+                processedFiles.add(prefixedName)
+              }
+            })
+        })
+  }
+
+  private[server] def trimSSLStorePaths(configProps: Properties): Boolean = {
+    var fileChanged = false
+    val processedFiles = new mutable.HashSet[String]
+
+    reconfigurables
+      .filter(reconfigurable => 
ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+      .foreach {
+        case reconfigurable: ListenerReconfigurable =>
+        ReloadableFileConfigs.foreach(configName => {
+          val prefixedName = reconfigurable.listenerName.configPrefix + 
configName
+          if (!processedFiles.contains(prefixedName) && 
configProps.containsKey(prefixedName)) {
+            val configFileName = configProps.getProperty(prefixedName)
+            val equivalentFileName = configFileName.replace("//", "/")
+            if (!configFileName.equals(equivalentFileName)) {
+              fileChanged = true

Review comment:
       This means update was requested, but not necessarily that file has 
changed?

##########
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##########
@@ -331,6 +334,50 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
       }
   }
 
+  private[server] def maybeAugmentSSLStorePaths(configProps: Properties, 
previousConfigProps: Map[String, String]): Unit ={
+    val processedFiles = new mutable.HashSet[String]
+    reconfigurables
+      .filter(reconfigurable => 
ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
+        .foreach({
+          case reconfigurable: ListenerReconfigurable =>
+            ReloadableFileConfigs.foreach(configName => {
+              val prefixedName = reconfigurable.listenerName.configPrefix + 
configName
+              if (!processedFiles.contains(prefixedName) && 
configProps.containsKey(prefixedName) &&
+                
configProps.get(prefixedName).equals(previousConfigProps.getOrElse(prefixedName,
 ""))) {
+                val equivalentFileName = 
configProps.getProperty(prefixedName).replace("/", "//")

Review comment:
       Does this get reset somewhere or will we keep adding `/`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to