[ https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sergio Troiano updated KAFKA-16015: ----------------------------------- Affects Version/s: 3.5.1 > kafka-leader-election timeout values always overwritten by default values > -------------------------------------------------------------------------- > > Key: KAFKA-16015 > URL: https://issues.apache.org/jira/browse/KAFKA-16015 > Project: Kafka > Issue Type: Bug > Components: admin, tools > Affects Versions: 3.5.1, 3.6.1 > Reporter: Sergio Troiano > Assignee: Sergio Troiano > Priority: Minor > > Using the *kafka-leader-election.sh* I was getting random timeouts like these: > {code:java} > Error completing leader election (PREFERRED) for partition: > sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The > request timed out. > Error completing leader election (PREFERRED) for partition: > __CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: > The request timed out. > Error completing leader election (PREFERRED) for partition: > __KafkaCruiseControlModelTrainingSamples-18: > org.apache.kafka.common.errors.TimeoutException: The request timed out. > Error completing leader election (PREFERRED) for partition: > __KafkaCruiseControlPartitionMetricSamples-8: > org.apache.kafka.common.errors.TimeoutException: The request timed out. {code} > These timeouts were raised from the client side as the controller always > finished with all the Kafka leader elections. > One pattern I detected was always the timeouts were raised after about 15 > seconds. > > So i checked this command has an option to pass configurations > {code:java} > Option Description > ------ ----------- > --admin.config <String: config file> Configuration properties files to pass > to the admin client {code} > I created the file in order to increment the values of *request.timeout.ms* > and *default.api.timeout.ms.* So even after these changes the timeouts were > raising after 15 seconds. > So I checked the source code and I came across with a bug, no matter the > value we pass to the timeouts the default values were ALWAYS overwriting them. > > This is the[3.6 > branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42] > {code:java} > object LeaderElectionCommand extends Logging { > def main(args: Array[String]): Unit = { > run(args, 30.second) > } def run(args: Array[String], timeout: Duration): Unit = { > val commandOptions = new LeaderElectionCommandOptions(args) > CommandLineUtils.maybePrintHelpOrVersion( > commandOptions, > "This tool attempts to elect a new leader for a set of topic > partitions. The type of elections supported are preferred replicas and > unclean replicas." > ) validate(commandOptions) val electionType = > commandOptions.options.valueOf(commandOptions.electionType) val > jsonFileTopicPartitions = > Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { > path => > parseReplicaElectionData(Utils.readFileAsString(path)) > } val singleTopicPartition = ( > Option(commandOptions.options.valueOf(commandOptions.topic)), > Option(commandOptions.options.valueOf(commandOptions.partition)) > ) match { > case (Some(topic), Some(partition)) => Some(Set(new > TopicPartition(topic, partition))) > case _ => None > } /* Note: No need to look at --all-topic-partitions as we want this > to be None if it is use. > * The validate function should be checking that this option is required > if the --topic and --path-to-json-file > * are not specified. > */ > val topicPartitions = > jsonFileTopicPartitions.orElse(singleTopicPartition) val adminClient = { > val props = > Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map > { config => > Utils.loadProps(config) > }.getOrElse(new Properties()) props.setProperty( > AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, > commandOptions.options.valueOf(commandOptions.bootstrapServer) > ) > props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, > timeout.toMillis.toString) > props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, > (timeout.toMillis / 2).toString) Admin.create(props) > } {code} > As we can see the default timeout is 30 seconds, and the request timeout is > 30/2 which validate the 15 seconds timeout. > Also we can see in the code how the custom values passed by the config file > are overwritten by the defaults. > > > The proposal is easy, we need to use the defaults values only when the > timeouts were not defined by the config file, for example like this: > {code:java} > if > (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) { > props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, > timeout.toMillis.toString) > } > if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) { > props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, > (timeout.toMillis / 2).toString) > } {code} > > I tested it and now I am able to modify the timeouts and make my application > to catch the result of the command properly. > -- This message was sent by Atlassian Jira (v8.20.10#820010)