[jira] [Updated] (KAFKA-16015) kafka-leader-election timeout values always overwritten by default values

2023-12-29 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16015:
-
Fix Version/s: 3.7.0
   3.8.0

> kafka-leader-election timeout values always overwritten by default values 
> --
>
> Key: KAFKA-16015
> URL: https://issues.apache.org/jira/browse/KAFKA-16015
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 3.5.1, 3.6.1
>Reporter: Sergio Troiano
>Assignee: Sergio Troiano
>Priority: Minor
> Fix For: 3.7.0, 3.8.0
>
>
> Using the *kafka-leader-election.sh* I was getting random timeouts like these:
> {code:java}
> Error completing leader election (PREFERRED) for partition: 
> sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
> request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: 
> The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlModelTrainingSamples-18: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlPartitionMetricSamples-8: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
> These timeouts were raised from the client side as the controller always 
> finished with all the Kafka leader elections.
> One pattern I detected was always the timeouts were raised after about 15 
> seconds.
>  
> So i checked this command has an option to pass configurations
> {code:java}
> Option                                  Description
> --                                  ---
> --admin.config     Configuration properties files to pass
>                                           to the admin client {code}
> I created the file in order to increment the values of *request.timeout.ms*  
> and *default.api.timeout.ms.* So even after increasing these values  I got 
> the same result, timeouts were happening, like the new values were not having 
> any effect. 
> So I checked the source code and I came across with a bug, no matter the 
> value we pass to the timeouts the default values were ALWAYS overwriting them.
>  
> This is the[3.6 
> branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
> {code:java}
> object LeaderElectionCommand extends Logging {
>   def main(args: Array[String]): Unit = {
>     run(args, 30.second)
>   }  def run(args: Array[String], timeout: Duration): Unit = {
>     val commandOptions = new LeaderElectionCommandOptions(args)
>     CommandLineUtils.maybePrintHelpOrVersion(
>       commandOptions,
>       "This tool attempts to elect a new leader for a set of topic 
> partitions. The type of elections supported are preferred replicas and 
> unclean replicas."
>     )    validate(commandOptions)    val electionType = 
> commandOptions.options.valueOf(commandOptions.electionType)    val 
> jsonFileTopicPartitions = 
> Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
> path  =>
>       parseReplicaElectionData(Utils.readFileAsString(path))
>     }    val singleTopicPartition = (
>       Option(commandOptions.options.valueOf(commandOptions.topic)),
>       Option(commandOptions.options.valueOf(commandOptions.partition))
>     ) match {
>       case (Some(topic), Some(partition)) => Some(Set(new 
> TopicPartition(topic, partition)))
>       case _ => None
>     }    /* Note: No need to look at --all-topic-partitions as we want this 
> to be None if it is use.
>      * The validate function should be checking that this option is required 
> if the --topic and --path-to-json-file
>      * are not specified.
>      */
>     val topicPartitions = 
> jsonFileTopicPartitions.orElse(singleTopicPartition)    val adminClient = {
>       val props = 
> Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map 
> { config =>
>         Utils.loadProps(config)
>       }.getOrElse(new Properties())      props.setProperty(
>         AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
>         commandOptions.options.valueOf(commandOptions.bootstrapServer)
>       )
>       props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
> timeout.toMillis.toString)
>       props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
> (timeout.toMillis / 2).toString)      Admin.create(props)
>     } {code}
> As we can see the default timeout is 30 seconds, and the request timeout is 
> 30/2 which validates the 15 seconds timeout.
> Also we can see in the code how the custom values passed by the config file 
> are overwritten 

[jira] [Updated] (KAFKA-16015) kafka-leader-election timeout values always overwritten by default values

2023-12-15 Thread Sergio Troiano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Troiano updated KAFKA-16015:
---
Description: 
Using the *kafka-leader-election.sh* I was getting random timeouts like these:
{code:java}
Error completing leader election (PREFERRED) for partition: 
sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlModelTrainingSamples-18: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlPartitionMetricSamples-8: 
org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
These timeouts were raised from the client side as the controller always 
finished with all the Kafka leader elections.

One pattern I detected was always the timeouts were raised after about 15 
seconds.

 

So i checked this command has an option to pass configurations
{code:java}
Option                                  Description
--                                  ---
--admin.config     Configuration properties files to pass
                                          to the admin client {code}
I created the file in order to increment the values of *request.timeout.ms*  
and *default.api.timeout.ms.* So even after increasing these values  I got the 
same result, timeouts were happening, like the new values were not having any 
effect. 

So I checked the source code and I came across with a bug, no matter the value 
we pass to the timeouts the default values were ALWAYS overwriting them.

 

This is the[3.6 
branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
{code:java}
object LeaderElectionCommand extends Logging {
  def main(args: Array[String]): Unit = {
    run(args, 30.second)
  }  def run(args: Array[String], timeout: Duration): Unit = {
    val commandOptions = new LeaderElectionCommandOptions(args)
    CommandLineUtils.maybePrintHelpOrVersion(
      commandOptions,
      "This tool attempts to elect a new leader for a set of topic partitions. 
The type of elections supported are preferred replicas and unclean replicas."
    )    validate(commandOptions)    val electionType = 
commandOptions.options.valueOf(commandOptions.electionType)    val 
jsonFileTopicPartitions = 
Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
path  =>
      parseReplicaElectionData(Utils.readFileAsString(path))
    }    val singleTopicPartition = (
      Option(commandOptions.options.valueOf(commandOptions.topic)),
      Option(commandOptions.options.valueOf(commandOptions.partition))
    ) match {
      case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, 
partition)))
      case _ => None
    }    /* Note: No need to look at --all-topic-partitions as we want this to 
be None if it is use.
     * The validate function should be checking that this option is required if 
the --topic and --path-to-json-file
     * are not specified.
     */
    val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition)  
  val adminClient = {
      val props = 
Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { 
config =>
        Utils.loadProps(config)
      }.getOrElse(new Properties())      props.setProperty(
        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
        commandOptions.options.valueOf(commandOptions.bootstrapServer)
      )
      props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)      Admin.create(props)
    } {code}
As we can see the default timeout is 30 seconds, and the request timeout is 
30/2 which validates the 15 seconds timeout.

Also we can see in the code how the custom values passed by the config file are 
overwritten by the defaults.

 

 

The proposal is easy, we need to use the defaults values only when the timeouts 
were not defined by the config file, for example like this:
{code:java}
      if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      }
      if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)
      } {code}
 

I tested it and now I am able to modify the timeouts and make my application to 
catch the result of the command properly.

 

  was:
Using the 

[jira] [Updated] (KAFKA-16015) kafka-leader-election timeout values always overwritten by default values

2023-12-15 Thread Sergio Troiano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Troiano updated KAFKA-16015:
---
Description: 
Using the *kafka-leader-election.sh* I was getting random timeouts like these:
{code:java}
Error completing leader election (PREFERRED) for partition: 
sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlModelTrainingSamples-18: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlPartitionMetricSamples-8: 
org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
These timeouts were raised from the client side as the controller always 
finished with all the Kafka leader elections.

One pattern I detected was always the timeouts were raised after about 15 
seconds.

 

So i checked this command has an option to pass configurations
{code:java}
Option                                  Description
--                                  ---
--admin.config     Configuration properties files to pass
                                          to the admin client {code}
I created the file in order to increment the values of *request.timeout.ms*  
and *default.api.timeout.ms.* So even after increasing these values  I got hte 
same result, timeouts were happening, like the new values were not having any 
effect. 

So I checked the source code and I came across with a bug, no matter the value 
we pass to the timeouts the default values were ALWAYS overwriting them.

 

This is the[3.6 
branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
{code:java}
object LeaderElectionCommand extends Logging {
  def main(args: Array[String]): Unit = {
    run(args, 30.second)
  }  def run(args: Array[String], timeout: Duration): Unit = {
    val commandOptions = new LeaderElectionCommandOptions(args)
    CommandLineUtils.maybePrintHelpOrVersion(
      commandOptions,
      "This tool attempts to elect a new leader for a set of topic partitions. 
The type of elections supported are preferred replicas and unclean replicas."
    )    validate(commandOptions)    val electionType = 
commandOptions.options.valueOf(commandOptions.electionType)    val 
jsonFileTopicPartitions = 
Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
path  =>
      parseReplicaElectionData(Utils.readFileAsString(path))
    }    val singleTopicPartition = (
      Option(commandOptions.options.valueOf(commandOptions.topic)),
      Option(commandOptions.options.valueOf(commandOptions.partition))
    ) match {
      case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, 
partition)))
      case _ => None
    }    /* Note: No need to look at --all-topic-partitions as we want this to 
be None if it is use.
     * The validate function should be checking that this option is required if 
the --topic and --path-to-json-file
     * are not specified.
     */
    val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition)  
  val adminClient = {
      val props = 
Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { 
config =>
        Utils.loadProps(config)
      }.getOrElse(new Properties())      props.setProperty(
        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
        commandOptions.options.valueOf(commandOptions.bootstrapServer)
      )
      props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)      Admin.create(props)
    } {code}
As we can see the default timeout is 30 seconds, and the request timeout is 
30/2 which validate the 15 seconds timeout.

Also we can see in the code how the custom values passed by the config file are 
overwritten by the defaults.

 

 

The proposal is easy, we need to use the defaults values only when the timeouts 
were not defined by the config file, for example like this:
{code:java}
      if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      }
      if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)
      } {code}
 

I tested it and now I am able to modify the timeouts and make my application to 
catch the result of the command properly.

 

  was:
Using the 

[jira] [Updated] (KAFKA-16015) kafka-leader-election timeout values always overwritten by default values

2023-12-15 Thread Sergio Troiano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Troiano updated KAFKA-16015:
---
Affects Version/s: 3.5.1

> kafka-leader-election timeout values always overwritten by default values 
> --
>
> Key: KAFKA-16015
> URL: https://issues.apache.org/jira/browse/KAFKA-16015
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 3.5.1, 3.6.1
>Reporter: Sergio Troiano
>Assignee: Sergio Troiano
>Priority: Minor
>
> Using the *kafka-leader-election.sh* I was getting random timeouts like these:
> {code:java}
> Error completing leader election (PREFERRED) for partition: 
> sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
> request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: 
> The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlModelTrainingSamples-18: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlPartitionMetricSamples-8: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
> These timeouts were raised from the client side as the controller always 
> finished with all the Kafka leader elections.
> One pattern I detected was always the timeouts were raised after about 15 
> seconds.
>  
> So i checked this command has an option to pass configurations
> {code:java}
> Option                                  Description
> --                                  ---
> --admin.config     Configuration properties files to pass
>                                           to the admin client {code}
> I created the file in order to increment the values of *request.timeout.ms*  
> and *default.api.timeout.ms.* So even after these changes the timeouts were 
> raising after 15 seconds.
> So I checked the source code and I came across with a bug, no matter the 
> value we pass to the timeouts the default values were ALWAYS overwriting them.
>  
> This is the[3.6 
> branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
> {code:java}
> object LeaderElectionCommand extends Logging {
>   def main(args: Array[String]): Unit = {
>     run(args, 30.second)
>   }  def run(args: Array[String], timeout: Duration): Unit = {
>     val commandOptions = new LeaderElectionCommandOptions(args)
>     CommandLineUtils.maybePrintHelpOrVersion(
>       commandOptions,
>       "This tool attempts to elect a new leader for a set of topic 
> partitions. The type of elections supported are preferred replicas and 
> unclean replicas."
>     )    validate(commandOptions)    val electionType = 
> commandOptions.options.valueOf(commandOptions.electionType)    val 
> jsonFileTopicPartitions = 
> Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
> path  =>
>       parseReplicaElectionData(Utils.readFileAsString(path))
>     }    val singleTopicPartition = (
>       Option(commandOptions.options.valueOf(commandOptions.topic)),
>       Option(commandOptions.options.valueOf(commandOptions.partition))
>     ) match {
>       case (Some(topic), Some(partition)) => Some(Set(new 
> TopicPartition(topic, partition)))
>       case _ => None
>     }    /* Note: No need to look at --all-topic-partitions as we want this 
> to be None if it is use.
>      * The validate function should be checking that this option is required 
> if the --topic and --path-to-json-file
>      * are not specified.
>      */
>     val topicPartitions = 
> jsonFileTopicPartitions.orElse(singleTopicPartition)    val adminClient = {
>       val props = 
> Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map 
> { config =>
>         Utils.loadProps(config)
>       }.getOrElse(new Properties())      props.setProperty(
>         AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
>         commandOptions.options.valueOf(commandOptions.bootstrapServer)
>       )
>       props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
> timeout.toMillis.toString)
>       props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
> (timeout.toMillis / 2).toString)      Admin.create(props)
>     } {code}
> As we can see the default timeout is 30 seconds, and the request timeout is 
> 30/2 which validate the 15 seconds timeout.
> Also we can see in the code how the custom values passed by the config file 
> are overwritten by the defaults.
>  
>  
> The proposal is easy, we need to use the defaults values only when the 
> timeouts were not 

[jira] [Updated] (KAFKA-16015) kafka-leader-election timeout values always overwritten by default values

2023-12-15 Thread Sergio Troiano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Troiano updated KAFKA-16015:
---
Summary: kafka-leader-election timeout values always overwritten by default 
values   (was: kafka-leader-election timeout values always overwritten b 
default values )

> kafka-leader-election timeout values always overwritten by default values 
> --
>
> Key: KAFKA-16015
> URL: https://issues.apache.org/jira/browse/KAFKA-16015
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, tools
>Affects Versions: 3.6.1
>Reporter: Sergio Troiano
>Assignee: Sergio Troiano
>Priority: Minor
>
> Using the *kafka-leader-election.sh* I was getting random timeouts like these:
> {code:java}
> Error completing leader election (PREFERRED) for partition: 
> sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
> request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: 
> The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlModelTrainingSamples-18: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.
> Error completing leader election (PREFERRED) for partition: 
> __KafkaCruiseControlPartitionMetricSamples-8: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
> These timeouts were raised from the client side as the controller always 
> finished with all the Kafka leader elections.
> One pattern I detected was always the timeouts were raised after about 15 
> seconds.
>  
> So i checked this command has an option to pass configurations
> {code:java}
> Option                                  Description
> --                                  ---
> --admin.config     Configuration properties files to pass
>                                           to the admin client {code}
> I created the file in order to increment the values of *request.timeout.ms*  
> and *default.api.timeout.ms.* So even after these changes the timeouts were 
> raising after 15 seconds.
> So I checked the source code and I came across with a bug, no matter the 
> value we pass to the timeouts the default values were ALWAYS overwriting them.
>  
> This is the[3.6 
> branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
> {code:java}
> object LeaderElectionCommand extends Logging {
>   def main(args: Array[String]): Unit = {
>     run(args, 30.second)
>   }  def run(args: Array[String], timeout: Duration): Unit = {
>     val commandOptions = new LeaderElectionCommandOptions(args)
>     CommandLineUtils.maybePrintHelpOrVersion(
>       commandOptions,
>       "This tool attempts to elect a new leader for a set of topic 
> partitions. The type of elections supported are preferred replicas and 
> unclean replicas."
>     )    validate(commandOptions)    val electionType = 
> commandOptions.options.valueOf(commandOptions.electionType)    val 
> jsonFileTopicPartitions = 
> Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
> path  =>
>       parseReplicaElectionData(Utils.readFileAsString(path))
>     }    val singleTopicPartition = (
>       Option(commandOptions.options.valueOf(commandOptions.topic)),
>       Option(commandOptions.options.valueOf(commandOptions.partition))
>     ) match {
>       case (Some(topic), Some(partition)) => Some(Set(new 
> TopicPartition(topic, partition)))
>       case _ => None
>     }    /* Note: No need to look at --all-topic-partitions as we want this 
> to be None if it is use.
>      * The validate function should be checking that this option is required 
> if the --topic and --path-to-json-file
>      * are not specified.
>      */
>     val topicPartitions = 
> jsonFileTopicPartitions.orElse(singleTopicPartition)    val adminClient = {
>       val props = 
> Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map 
> { config =>
>         Utils.loadProps(config)
>       }.getOrElse(new Properties())      props.setProperty(
>         AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
>         commandOptions.options.valueOf(commandOptions.bootstrapServer)
>       )
>       props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
> timeout.toMillis.toString)
>       props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
> (timeout.toMillis / 2).toString)      Admin.create(props)
>     } {code}
> As we can see the default timeout is 30 seconds, and the request timeout is 
> 30/2 which validate the 15 seconds timeout.
> Also we can see in the code how the custom values passed by the config file 
> are 

[jira] [Updated] (KAFKA-16015) kafka-leader-election timeout values always overwritten b default values

2023-12-15 Thread Sergio Troiano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergio Troiano updated KAFKA-16015:
---
Description: 
Using the *kafka-leader-election.sh* I was getting random timeouts like these:
{code:java}
Error completing leader election (PREFERRED) for partition: 
sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlModelTrainingSamples-18: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlPartitionMetricSamples-8: 
org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
These timeouts were raised from the client side as the controller always 
finished with all the Kafka leader elections.

One pattern I detected was always the timeouts were raised after about 15 
seconds.

 

So i checked this command has an option to pass configurations
{code:java}
Option                                  Description
--                                  ---
--admin.config     Configuration properties files to pass
                                          to the admin client {code}
I created the file in order to increment the values of *request.timeout.ms*  
and *default.api.timeout.ms.* So even after these changes the timeouts were 
raising after 15 seconds.

So I checked the source code and I came across with a bug, no matter the value 
we pass to the timeouts the default values were ALWAYS overwriting them.

 

This is the[3.6 
branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
{code:java}
object LeaderElectionCommand extends Logging {
  def main(args: Array[String]): Unit = {
    run(args, 30.second)
  }  def run(args: Array[String], timeout: Duration): Unit = {
    val commandOptions = new LeaderElectionCommandOptions(args)
    CommandLineUtils.maybePrintHelpOrVersion(
      commandOptions,
      "This tool attempts to elect a new leader for a set of topic partitions. 
The type of elections supported are preferred replicas and unclean replicas."
    )    validate(commandOptions)    val electionType = 
commandOptions.options.valueOf(commandOptions.electionType)    val 
jsonFileTopicPartitions = 
Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
path  =>
      parseReplicaElectionData(Utils.readFileAsString(path))
    }    val singleTopicPartition = (
      Option(commandOptions.options.valueOf(commandOptions.topic)),
      Option(commandOptions.options.valueOf(commandOptions.partition))
    ) match {
      case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, 
partition)))
      case _ => None
    }    /* Note: No need to look at --all-topic-partitions as we want this to 
be None if it is use.
     * The validate function should be checking that this option is required if 
the --topic and --path-to-json-file
     * are not specified.
     */
    val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition)  
  val adminClient = {
      val props = 
Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { 
config =>
        Utils.loadProps(config)
      }.getOrElse(new Properties())      props.setProperty(
        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
        commandOptions.options.valueOf(commandOptions.bootstrapServer)
      )
      props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)      Admin.create(props)
    } {code}
As we can see the default timeout is 30 seconds, and the request timeout is 
30/2 which validate the 15 seconds timeout.

Also we can see in the code how the custom values passed by the config file are 
overwritten by the defaults.

 

 

The proposal is easy, we need to use the defaults values only when the timeouts 
were not defined by the config file, for example like this:
{code:java}
      if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      }
      if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)
      } {code}
 

I tested it and now I am able to modify the timeouts and make my application to 
catch the result of the command properly.

 

  was:
Using the *kafka-leader-election.sh* I was getting random timeouts like these: