[jira] [Created] (KAFKA-16015) kafka-leader-election timeout values always overwritten b default values

2023-12-15 Thread Sergio Troiano (Jira)
Sergio Troiano created KAFKA-16015:
--

 Summary: kafka-leader-election timeout values always overwritten b 
default values 
 Key: KAFKA-16015
 URL: https://issues.apache.org/jira/browse/KAFKA-16015
 Project: Kafka
  Issue Type: Bug
  Components: admin, tools
Affects Versions: 3.6.1
Reporter: Sergio Troiano
Assignee: Sergio Troiano


Using the *kafka-leader-election.sh* I was getting random timeouts like these:
{code:java}
Error completing leader election (PREFERRED) for partition: 
sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlModelTrainingSamples-18: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlPartitionMetricSamples-8: 
org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
These timeouts were raised from the client side as the controller always 
finished with all the Kafka leader elections.

One pattern I detected was always the timeouts were raised after about 15 
seconds.

 

So i checked this command has an option to pass configurations
{code:java}
Option                                  Description
--                                  ---
--admin.config     Configuration properties files to pass
                                          to the admin client {code}
I created the file in order to increment the values of *request.timeout.ms*  
and *default.api.timeout.ms.* So even after these changes the timeouts were 
raising after 15 seconds.

So I checked the source code and I came across with a bug, no matter the value 
we pass to the timeouts the default values were ALWAYS overwriting them.

 

This is the[ 3.6 
branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
{code:java}
object LeaderElectionCommand extends Logging {
  def main(args: Array[String]): Unit = {
    run(args, 30.second)
  }  def run(args: Array[String], timeout: Duration): Unit = {
    val commandOptions = new LeaderElectionCommandOptions(args)
    CommandLineUtils.maybePrintHelpOrVersion(
      commandOptions,
      "This tool attempts to elect a new leader for a set of topic partitions. 
The type of elections supported are preferred replicas and unclean replicas."
    )    validate(commandOptions)    val electionType = 
commandOptions.options.valueOf(commandOptions.electionType)    val 
jsonFileTopicPartitions = 
Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
path  =>
      parseReplicaElectionData(Utils.readFileAsString(path))
    }    val singleTopicPartition = (
      Option(commandOptions.options.valueOf(commandOptions.topic)),
      Option(commandOptions.options.valueOf(commandOptions.partition))
    ) match {
      case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, 
partition)))
      case _ => None
    }    /* Note: No need to look at --all-topic-partitions as we want this to 
be None if it is use.
     * The validate function should be checking that this option is required if 
the --topic and --path-to-json-file
     * are not specified.
     */
    val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition)  
  val adminClient = {
      val props = 
Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { 
config =>
        Utils.loadProps(config)
      }.getOrElse(new Properties())      props.setProperty(
        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
        commandOptions.options.valueOf(commandOptions.bootstrapServer)
      )
      props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)      Admin.create(props)
    } {code}
As we can see the default timeout is 30 seconds, and the request timeout is 
30/2 which validate the 15 seconds timeout.

Also we can see in the code how the custom values passed by the config file are 
overwritten by the defaults.

 

 

The proposal is easy, we need to use the defaults values only when the timeouts 
were not defined by the config file, for example like this:
{code:java}
      if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      }
      if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis 

[jira] [Created] (KAFKA-15243) User creation mismatch

2023-07-24 Thread Sergio Troiano (Jira)
Sergio Troiano created KAFKA-15243:
--

 Summary: User creation mismatch
 Key: KAFKA-15243
 URL: https://issues.apache.org/jira/browse/KAFKA-15243
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.3.2
Reporter: Sergio Troiano
Assignee: Sergio Troiano


We found the Kafka users were not created properly, so let's suppose we create 
the user [myu...@myuser.com|mailto:myu...@myuser.com]

 

COMMAND:
{code:java}
/etc/new_kafka/bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter 
--add-config 
'SCRAM-SHA-256=[iterations=4096,password=blabla],SCRAM-SHA-256=[password=blabla]'
 --entity-type users --entity-name myu...@myuser.com{code}
RESPONSE:
{code:java}
Completed updating config for user myu...@myuser.com{code}
When listing the users I see the user was created as an encoded string

COMMAND
{code:java}
kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type 
users|grep myuser {code}
RESPONSE
{code:java}
SCRAM credential configs for user-principal 'myuser%40myuser.com' are 
SCRAM-SHA-256=iterations=8192, SCRAM-SHA-512=iterations=4096 {code}
 

So basically the user is being "sanitized" and giving a false OK to the user 
requester. The user requested does not exist as it should, it creates the 
encoded one instead.

 

I dug deep in the code until I found this is happening in the 
ZkAdminManager.scala in this line 

 
{code:java}
adminZkClient.changeConfigs(ConfigType.User, Sanitizer.sanitize(user), 
configsByPotentiallyValidUser(user)) {code}
So removing the Sanitizer fix the problem, but I have a couple of doubts

I checked we Sanitize because of some JMX metrics, but in this case I don't 
know if this is really needed, supossing this is needed I think we should 
forbid to create users with characters that will be encoded.

Even worse after creating an user in general we create ACLs and they are 
created properly without encoding the characters, this creates a mismatch 
between the user and the ACLs.

 

 

So I can work on fixing this, but I think we need to decide :

 

A) We forbid to create users with characters that will be encoded, so we fail 
in the user creation step.

 

B) We allow the user creation with special characters and remove the 
Sanitizer.sanitize(user) from the 2 places where it shows up in the file 
ZkAdminManager.scala

 

 

And of course if we go for B we need to create the tests.

Please let me know what you think and i can work on it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-13687) Limit number of batches when using kafka-dump-log.sh

2022-02-22 Thread Sergio Troiano (Jira)
Sergio Troiano created KAFKA-13687:
--

 Summary: Limit number of batches when using kafka-dump-log.sh
 Key: KAFKA-13687
 URL: https://issues.apache.org/jira/browse/KAFKA-13687
 Project: Kafka
  Issue Type: Improvement
Reporter: Sergio Troiano


Currently the kafka-dump-log.sh reads the whole files(s) and dumps the results 
of the segment file(s).

As we know the savings the combination of using compression and batching while 
producing (if the payloads are good candidates) are huge. 

 

We would like to have a way to "monitor" the way the producers produce the 
batches as we not always we can have access to producer metrics.

We have multitenant producers so it is hard to "detect" when the usage is not 
the best.

 

The problem with the current way the DumpLogs works is it reads the whole file, 
in an scenario of having thousands of topics with different segment sizes 
(default is 1 GB) we could end up affecting the cluster balance as we are 
removing useful page from the page cache and adding what we read from files.

 

As we only need to take a few samples from the segments to see the pattern of 
the usage while producing we would like to add a new parameter called 
maxBatches.

 

Based on the current script the change is quite small as it only needs a 
parameter and a counter.

 

After adding this change for example we could take smaller samples and analyze 
the batches headers (searching for compresscodec and the batch count)

 

Doing this we could automate a tool to read all the topics and even going 
further we could take the payloads of those samples when we see the client is 
neither using compression nor batching and simulate a compression of the 
payloads (using batching and compression) then with those numbers we can reach 
the client for the proposal of saving money. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)