chia7712 commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2652159795
##########
docs/upgrade.html:
##########
@@ -113,6 +113,35 @@ <h5><a id="upgrade_420_notable"
href="#upgrade_420_notable">Notable changes in 4
<li>
The <code>num.replica.fetchers</code> config has a new lower bound of
1.
</li>
+ <li>
+ Improvements have been made to the validation rules and default values
of LIST-type configurations
+ (<a href="https://cwiki.apache.org/confluence/x/HArXF">KIP-1161</a>).
+ <ul>
+ <li>
+ LIST-type configurations now enforce stricter validation:
+ <ul>
+ <li>Null values are no longer accepted for most LIST-type
configurations, except those that explicitly
+ allow a null default value or where a null value has a
well-defined semantic meaning.</li>
+ <li>Duplicate entries within the same list are no longer
permitted.</li>
Review Comment:
A simple fix to enforce fail-fast behaviour during startup is to make
`LogManager` use the latest configuration for its startup. Additionally, we
need to invoke `BrokerConfigHandler#processConfigChanges ` during the "first
publish" to ensure `KafkaConfig` is updated with latest values before
initializing the managers
```scala
if (_firstPublish) {
// ensure `KafkaConfig` is updated with latest values before
"initializeManagers"
try {
Option(delta.configsDelta()).foreach { configsDelta =>
configsDelta.changes().keySet().forEach { resource =>
val props = newImage.configs().configProperties(resource)
if (resource.`type`() == ConfigResource.Type.BROKER &&
(resource.name().isEmpty ||
resource.name().equals(brokerId.toString))) {
dynamicConfigPublisher.dynamicConfigHandlers.get(ConfigType.BROKER).foreach {
handler =>
handler.processConfigChanges(resource.name(), props)
}
}
}
}
} catch {
case t: Throwable => fatalFaultHandler.handleFault("Error
initialize broker configs", t)
}
// If this is the first metadata update we are applying, initialize
the managers
// first (but after setting up the metadata cache).
initializeManagers(newImage)
}
```
```scala
private def initializeManagers(newImage: MetadataImage): Unit = {
try {
// make `LogManager` use the latest configuration
logManager.reconfigureDefaultLogConfig(new
LogConfig(config.extractLogConfigMap))
// Start log manager, which will perform (potentially lengthy)
// recovery-from-unclean-shutdown if required.
logManager.startup(
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]