[ https://issues.apache.org/jira/browse/KAFKA-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson resolved KAFKA-13417. ------------------------------------- Resolution: Fixed > Dynamic thread pool re-configurations may not get processed > ----------------------------------------------------------- > > Key: KAFKA-13417 > URL: https://issues.apache.org/jira/browse/KAFKA-13417 > Project: Kafka > Issue Type: Bug > Reporter: Jason Gustafson > Assignee: Jason Gustafson > Priority: Major > > `DynamicBrokerConfig.updateCurrentConfig` includes the following logic to > update the current configuration and to let each `Reconfigurable` process the > update: > {code} > val oldConfig = currentConfig > val (newConfig, brokerReconfigurablesToUpdate) = > processReconfiguration(newProps, validateOnly = false) > if (newConfig ne currentConfig) { > currentConfig = newConfig > kafkaConfig.updateCurrentConfig(newConfig) > // Process BrokerReconfigurable updates after current config is updated > brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, > newConfig)) > } > {code} > The problem here is that `currentConfig` gets initialized as `kafkaConfig` > which means that the first call to > `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig` > and consequently `oldConfig`. The problem with this is that some of the > `reconfigure` implementations will only apply a new configuration if the > value in `oldConfig` does not match the value in `newConfig`. For example, > here is the logic to update thread pools dynamically: > {code} > override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): > Unit = { > if (newConfig.numIoThreads != oldConfig.numIoThreads) > > server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads) > if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads) > server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, > newConfig.numNetworkThreads) > if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers) > > server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers) > if (newConfig.numRecoveryThreadsPerDataDir != > oldConfig.numRecoveryThreadsPerDataDir) > > server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir) > if (newConfig.backgroundThreads != oldConfig.backgroundThreads) > server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads) > } > {code} > Because of this, the dynamic update will not get applied the first time it is > made. I believe subsequent updates would work correctly though because we > would have lost the indirect reference to `kafkaConfig`. Other than the > `DynamicThreadPool` configurations, it looks like the config to update > unclean leader election may also be affected by this bug. > NOTE: This bug only affects kraft, which is missing the call to > `DynamicBrokerConfig.initialize()`. -- This message was sent by Atlassian Jira (v8.20.1#820001)