chia7712 commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2652159795


##########
docs/upgrade.html:
##########
@@ -113,6 +113,35 @@ <h5><a id="upgrade_420_notable" 
href="#upgrade_420_notable">Notable changes in 4
     <li>
         The <code>num.replica.fetchers</code> config has a new lower bound of 
1.
     </li>
+    <li>
+        Improvements have been made to the validation rules and default values 
of LIST-type configurations
+        (<a href="https://cwiki.apache.org/confluence/x/HArXF";>KIP-1161</a>).
+        <ul>
+            <li>
+                LIST-type configurations now enforce stricter validation:
+                <ul>
+                    <li>Null values are no longer accepted for most LIST-type 
configurations, except those that explicitly
+                        allow a null default value or where a null value has a 
well-defined semantic meaning.</li>
+                    <li>Duplicate entries within the same list are no longer 
permitted.</li>

Review Comment:
   A simple fix to enforce fail-fast behaviour during startup is to make 
`LogManager` use the latest configuration for its startup. Additionally, we 
need to invoke `BrokerConfigHandler#processConfigChanges ` during the "first 
publish" to ensure `KafkaConfig` is updated with latest values before 
initializing the managers
   
   ```scala
         if (_firstPublish) {
           // ensure `KafkaConfig` is updated with latest values before 
"initializeManagers"
           try {
             Option(delta.configsDelta()).foreach { configsDelta =>
               configsDelta.changes().keySet().forEach { resource =>
                 val props = newImage.configs().configProperties(resource)
                 if (resource.`type`() == ConfigResource.Type.BROKER &&
                   (resource.name().isEmpty || 
resource.name().equals(brokerId.toString))) {
                   
dynamicConfigPublisher.dynamicConfigHandlers.get(ConfigType.BROKER).foreach { 
handler =>
                     handler.processConfigChanges(resource.name(), props)
                   }
                 }
               }
             }
           } catch {
             case t: Throwable => fatalFaultHandler.handleFault("Error 
initialize broker configs", t)
           }
   
           // If this is the first metadata update we are applying, initialize 
the managers
           // first (but after setting up the metadata cache).
           initializeManagers(newImage)
         }
   ```
   
   ```scala
     private def initializeManagers(newImage: MetadataImage): Unit = {
       try {
         // make `LogManager` use the latest configuration
         logManager.reconfigureDefaultLogConfig(new 
LogConfig(config.extractLogConfigMap))
         // Start log manager, which will perform (potentially lengthy)
         // recovery-from-unclean-shutdown if required.
         logManager.startup(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to