Jason Gustafson created KAFKA-13417: ---------------------------------------
Summary: Dynamic thread pool re-configurations may not get processed Key: KAFKA-13417 URL: https://issues.apache.org/jira/browse/KAFKA-13417 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson `DynamicBrokerConfig.updateCurrentConfig` includes the following logic to update the current configuration and to let each `Reconfigurable` process the update: {code} val oldConfig = currentConfig val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false) if (newConfig ne currentConfig) { currentConfig = newConfig kafkaConfig.updateCurrentConfig(newConfig) // Process BrokerReconfigurable updates after current config is updated brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig)) } {code} The problem here is that `currentConfig` gets initialized as `kafkaConfig` which means that the first call to `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig` and consequently `oldConfig`. The problem with this is that some of the `reconfigure` implementations will only apply a new configuration if the value in `oldConfig` does not match the value in `newConfig`. For example, here is the logic to update thread pools dynamically: {code} override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { if (newConfig.numIoThreads != oldConfig.numIoThreads) server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads) if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads) server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads) if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers) server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers) if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir) server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir) if (newConfig.backgroundThreads != oldConfig.backgroundThreads) server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads) } {code} Because of this, the dynamic update will not get applied the first time it is made. I believe subsequent updates would work correctly though because we would have lost the indirect reference to `kafkaConfig`. Other than the `DynamicThreadPool` configurations, it looks like the config to update unclean leader election may also be affected by this bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)