[jira] [Created] (KAFKA-16015) kafka-leader-election timeout values always overwritten b default values
Sergio Troiano created KAFKA-16015: -- Summary: kafka-leader-election timeout values always overwritten b default values Key: KAFKA-16015 URL: https://issues.apache.org/jira/browse/KAFKA-16015 Project: Kafka Issue Type: Bug Components: admin, tools Affects Versions: 3.6.1 Reporter: Sergio Troiano Assignee: Sergio Troiano Using the *kafka-leader-election.sh* I was getting random timeouts like these: {code:java} Error completing leader election (PREFERRED) for partition: sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The request timed out. Error completing leader election (PREFERRED) for partition: __CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: The request timed out. Error completing leader election (PREFERRED) for partition: __KafkaCruiseControlModelTrainingSamples-18: org.apache.kafka.common.errors.TimeoutException: The request timed out. Error completing leader election (PREFERRED) for partition: __KafkaCruiseControlPartitionMetricSamples-8: org.apache.kafka.common.errors.TimeoutException: The request timed out. {code} These timeouts were raised from the client side as the controller always finished with all the Kafka leader elections. One pattern I detected was always the timeouts were raised after about 15 seconds. So i checked this command has an option to pass configurations {code:java} Option Description -- --- --admin.config Configuration properties files to pass to the admin client {code} I created the file in order to increment the values of *request.timeout.ms* and *default.api.timeout.ms.* So even after these changes the timeouts were raising after 15 seconds. So I checked the source code and I came across with a bug, no matter the value we pass to the timeouts the default values were ALWAYS overwriting them. This is the[ 3.6 branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42] {code:java} object LeaderElectionCommand extends Logging { def main(args: Array[String]): Unit = { run(args, 30.second) } def run(args: Array[String], timeout: Duration): Unit = { val commandOptions = new LeaderElectionCommandOptions(args) CommandLineUtils.maybePrintHelpOrVersion( commandOptions, "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas." ) validate(commandOptions) val electionType = commandOptions.options.valueOf(commandOptions.electionType) val jsonFileTopicPartitions = Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { path => parseReplicaElectionData(Utils.readFileAsString(path)) } val singleTopicPartition = ( Option(commandOptions.options.valueOf(commandOptions.topic)), Option(commandOptions.options.valueOf(commandOptions.partition)) ) match { case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, partition))) case _ => None } /* Note: No need to look at --all-topic-partitions as we want this to be None if it is use. * The validate function should be checking that this option is required if the --topic and --path-to-json-file * are not specified. */ val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition) val adminClient = { val props = Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { config => Utils.loadProps(config) }.getOrElse(new Properties()) props.setProperty( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOptions.options.valueOf(commandOptions.bootstrapServer) ) props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString) props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (timeout.toMillis / 2).toString) Admin.create(props) } {code} As we can see the default timeout is 30 seconds, and the request timeout is 30/2 which validate the 15 seconds timeout. Also we can see in the code how the custom values passed by the config file are overwritten by the defaults. The proposal is easy, we need to use the defaults values only when the timeouts were not defined by the config file, for example like this: {code:java} if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) { props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, timeout.toMillis.toString) } if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) { props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, (timeout.toMillis
[jira] [Created] (KAFKA-15243) User creation mismatch
Sergio Troiano created KAFKA-15243: -- Summary: User creation mismatch Key: KAFKA-15243 URL: https://issues.apache.org/jira/browse/KAFKA-15243 Project: Kafka Issue Type: Bug Affects Versions: 3.3.2 Reporter: Sergio Troiano Assignee: Sergio Troiano We found the Kafka users were not created properly, so let's suppose we create the user [myu...@myuser.com|mailto:myu...@myuser.com] COMMAND: {code:java} /etc/new_kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=4096,password=blabla],SCRAM-SHA-256=[password=blabla]' --entity-type users --entity-name myu...@myuser.com{code} RESPONSE: {code:java} Completed updating config for user myu...@myuser.com{code} When listing the users I see the user was created as an encoded string COMMAND {code:java} kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type users|grep myuser {code} RESPONSE {code:java} SCRAM credential configs for user-principal 'myuser%40myuser.com' are SCRAM-SHA-256=iterations=8192, SCRAM-SHA-512=iterations=4096 {code} So basically the user is being "sanitized" and giving a false OK to the user requester. The user requested does not exist as it should, it creates the encoded one instead. I dug deep in the code until I found this is happening in the ZkAdminManager.scala in this line {code:java} adminZkClient.changeConfigs(ConfigType.User, Sanitizer.sanitize(user), configsByPotentiallyValidUser(user)) {code} So removing the Sanitizer fix the problem, but I have a couple of doubts I checked we Sanitize because of some JMX metrics, but in this case I don't know if this is really needed, supossing this is needed I think we should forbid to create users with characters that will be encoded. Even worse after creating an user in general we create ACLs and they are created properly without encoding the characters, this creates a mismatch between the user and the ACLs. So I can work on fixing this, but I think we need to decide : A) We forbid to create users with characters that will be encoded, so we fail in the user creation step. B) We allow the user creation with special characters and remove the Sanitizer.sanitize(user) from the 2 places where it shows up in the file ZkAdminManager.scala And of course if we go for B we need to create the tests. Please let me know what you think and i can work on it -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-13687) Limit number of batches when using kafka-dump-log.sh
Sergio Troiano created KAFKA-13687: -- Summary: Limit number of batches when using kafka-dump-log.sh Key: KAFKA-13687 URL: https://issues.apache.org/jira/browse/KAFKA-13687 Project: Kafka Issue Type: Improvement Reporter: Sergio Troiano Currently the kafka-dump-log.sh reads the whole files(s) and dumps the results of the segment file(s). As we know the savings the combination of using compression and batching while producing (if the payloads are good candidates) are huge. We would like to have a way to "monitor" the way the producers produce the batches as we not always we can have access to producer metrics. We have multitenant producers so it is hard to "detect" when the usage is not the best. The problem with the current way the DumpLogs works is it reads the whole file, in an scenario of having thousands of topics with different segment sizes (default is 1 GB) we could end up affecting the cluster balance as we are removing useful page from the page cache and adding what we read from files. As we only need to take a few samples from the segments to see the pattern of the usage while producing we would like to add a new parameter called maxBatches. Based on the current script the change is quite small as it only needs a parameter and a counter. After adding this change for example we could take smaller samples and analyze the batches headers (searching for compresscodec and the batch count) Doing this we could automate a tool to read all the topics and even going further we could take the payloads of those samples when we see the client is neither using compression nor batching and simulate a compression of the payloads (using batching and compression) then with those numbers we can reach the client for the proposal of saving money. -- This message was sent by Atlassian Jira (v8.20.1#820001)