Jason Gustafson created KAFKA-13417:
---------------------------------------

             Summary: Dynamic thread pool re-configurations may not get 
processed
                 Key: KAFKA-13417
                 URL: https://issues.apache.org/jira/browse/KAFKA-13417
             Project: Kafka
          Issue Type: Bug
            Reporter: Jason Gustafson
            Assignee: Jason Gustafson


`DynamicBrokerConfig.updateCurrentConfig` includes the following logic to 
update the current configuration and to let each `Reconfigurable` process the 
update:
{code}
    val oldConfig = currentConfig
    val (newConfig, brokerReconfigurablesToUpdate) = 
processReconfiguration(newProps, validateOnly = false)
    if (newConfig ne currentConfig) {
      currentConfig = newConfig
      kafkaConfig.updateCurrentConfig(newConfig)

      // Process BrokerReconfigurable updates after current config is updated
      brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig))
    }
{code}

The problem here is that `currentConfig` gets initialized as `kafkaConfig` 
which means that the first call to `kafkaConfig.updateCurrentConfig(newConfig)` 
ends up mutating `currentConfig` and consequently `oldConfig`. The problem with 
this is that some of the `reconfigure` implementations will only apply a new 
configuration if the value in `oldConfig` does not match the value in 
`newConfig`. For example, here is the logic to update thread pools dynamically:

{code}
  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
    if (newConfig.numIoThreads != oldConfig.numIoThreads)
      
server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
    if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
      server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, 
newConfig.numNetworkThreads)
    if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
      
server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers)
    if (newConfig.numRecoveryThreadsPerDataDir != 
oldConfig.numRecoveryThreadsPerDataDir)
      
server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir)
    if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
      server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
  }
{code}

Because of this, the dynamic update will not get applied the first time it is 
made. I believe subsequent updates would work correctly though because we would 
have lost the indirect reference to `kafkaConfig`. Other than the 
`DynamicThreadPool` configurations, it looks like the config to update unclean 
leader election may also be affected by this bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to