[jira] [Created] (KAFKA-16015) kafka-leader-election timeout values always overwritten b default values

2023-12-15 Thread Sergio Troiano (Jira)
Sergio Troiano created KAFKA-16015:
--

 Summary: kafka-leader-election timeout values always overwritten b 
default values 
 Key: KAFKA-16015
 URL: https://issues.apache.org/jira/browse/KAFKA-16015
 Project: Kafka
  Issue Type: Bug
  Components: admin, tools
Affects Versions: 3.6.1
Reporter: Sergio Troiano
Assignee: Sergio Troiano


Using the *kafka-leader-election.sh* I was getting random timeouts like these:
{code:java}
Error completing leader election (PREFERRED) for partition: 
sebatestemptytopic-4: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__CruiseControlMetrics-3: org.apache.kafka.common.errors.TimeoutException: The 
request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlModelTrainingSamples-18: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
Error completing leader election (PREFERRED) for partition: 
__KafkaCruiseControlPartitionMetricSamples-8: 
org.apache.kafka.common.errors.TimeoutException: The request timed out. {code}
These timeouts were raised from the client side as the controller always 
finished with all the Kafka leader elections.

One pattern I detected was always the timeouts were raised after about 15 
seconds.

 

So i checked this command has an option to pass configurations
{code:java}
Option                                  Description
--                                  ---
--admin.config     Configuration properties files to pass
                                          to the admin client {code}
I created the file in order to increment the values of *request.timeout.ms*  
and *default.api.timeout.ms.* So even after these changes the timeouts were 
raising after 15 seconds.

So I checked the source code and I came across with a bug, no matter the value 
we pass to the timeouts the default values were ALWAYS overwriting them.

 

This is the[ 3.6 
branch|https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala#L42]
{code:java}
object LeaderElectionCommand extends Logging {
  def main(args: Array[String]): Unit = {
    run(args, 30.second)
  }  def run(args: Array[String], timeout: Duration): Unit = {
    val commandOptions = new LeaderElectionCommandOptions(args)
    CommandLineUtils.maybePrintHelpOrVersion(
      commandOptions,
      "This tool attempts to elect a new leader for a set of topic partitions. 
The type of elections supported are preferred replicas and unclean replicas."
    )    validate(commandOptions)    val electionType = 
commandOptions.options.valueOf(commandOptions.electionType)    val 
jsonFileTopicPartitions = 
Option(commandOptions.options.valueOf(commandOptions.pathToJsonFile)).map { 
path  =>
      parseReplicaElectionData(Utils.readFileAsString(path))
    }    val singleTopicPartition = (
      Option(commandOptions.options.valueOf(commandOptions.topic)),
      Option(commandOptions.options.valueOf(commandOptions.partition))
    ) match {
      case (Some(topic), Some(partition)) => Some(Set(new TopicPartition(topic, 
partition)))
      case _ => None
    }    /* Note: No need to look at --all-topic-partitions as we want this to 
be None if it is use.
     * The validate function should be checking that this option is required if 
the --topic and --path-to-json-file
     * are not specified.
     */
    val topicPartitions = jsonFileTopicPartitions.orElse(singleTopicPartition)  
  val adminClient = {
      val props = 
Option(commandOptions.options.valueOf(commandOptions.adminClientConfig)).map { 
config =>
        Utils.loadProps(config)
      }.getOrElse(new Properties())      props.setProperty(
        AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
        commandOptions.options.valueOf(commandOptions.bootstrapServer)
      )
      props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 
(timeout.toMillis / 2).toString)      Admin.create(props)
    } {code}
As we can see the default timeout is 30 seconds, and the request timeout is 
30/2 which validate the 15 seconds timeout.

Also we can see in the code how the custom values passed by the config file are 
overwritten by the defaults.

 

 

The proposal is easy, we need to use the defaults values only when the timeouts 
were not defined by the config file, for example like this:
{code:java}
      if (!props.containsKey(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 
timeout.toMillis.toString)
      }
      if (!props.containsKey(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)) {
          props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONF

[jira] [Created] (KAFKA-15243) User creation mismatch

2023-07-24 Thread Sergio Troiano (Jira)
Sergio Troiano created KAFKA-15243:
--

 Summary: User creation mismatch
 Key: KAFKA-15243
 URL: https://issues.apache.org/jira/browse/KAFKA-15243
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.3.2
Reporter: Sergio Troiano
Assignee: Sergio Troiano


We found the Kafka users were not created properly, so let's suppose we create 
the user [myu...@myuser.com|mailto:myu...@myuser.com]

 

COMMAND:
{code:java}
/etc/new_kafka/bin/kafka-configs.sh  --bootstrap-server localhost:9092 --alter 
--add-config 
'SCRAM-SHA-256=[iterations=4096,password=blabla],SCRAM-SHA-256=[password=blabla]'
 --entity-type users --entity-name myu...@myuser.com{code}
RESPONSE:
{code:java}
Completed updating config for user myu...@myuser.com{code}
When listing the users I see the user was created as an encoded string

COMMAND
{code:java}
kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type 
users|grep myuser {code}
RESPONSE
{code:java}
SCRAM credential configs for user-principal 'myuser%40myuser.com' are 
SCRAM-SHA-256=iterations=8192, SCRAM-SHA-512=iterations=4096 {code}
 

So basically the user is being "sanitized" and giving a false OK to the user 
requester. The user requested does not exist as it should, it creates the 
encoded one instead.

 

I dug deep in the code until I found this is happening in the 
ZkAdminManager.scala in this line 

 
{code:java}
adminZkClient.changeConfigs(ConfigType.User, Sanitizer.sanitize(user), 
configsByPotentiallyValidUser(user)) {code}
So removing the Sanitizer fix the problem, but I have a couple of doubts

I checked we Sanitize because of some JMX metrics, but in this case I don't 
know if this is really needed, supossing this is needed I think we should 
forbid to create users with characters that will be encoded.

Even worse after creating an user in general we create ACLs and they are 
created properly without encoding the characters, this creates a mismatch 
between the user and the ACLs.

 

 

So I can work on fixing this, but I think we need to decide :

 

A) We forbid to create users with characters that will be encoded, so we fail 
in the user creation step.

 

B) We allow the user creation with special characters and remove the 
Sanitizer.sanitize(user) from the 2 places where it shows up in the file 
ZkAdminManager.scala

 

 

And of course if we go for B we need to create the tests.

Please let me know what you think and i can work on it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-838 Simulate batching and compression

2022-05-18 Thread Sergio Troiano
I’ll start with the code if you consider this feature could be useful, what do 
you think ? 

Regarding to the code I’ll use the same classes dump-logs uses for reading 
records and add the logic on top of it.


Best regards
Sergio Troiano

Sent from my iPhone

> On 16 May 2022, at 20:48, Sergio Daniel Troiano  
> wrote:
> 
> 
> Hi Divij,
> 
> First of all thanks for your time and dedication.
> 
> About point one:
> You are right, the idea is to have "in real time" visibility of the way the 
> clients are using the service as that is translated into a lot of money 
> saving.
> I agree with the further vision although I think we are still far away from 
> it :)
> 
> About the resource usage my idea is to be zero invasive, so taking a few MB 
> samples once every few hours will be more than enough to understand the 
> produced pattern, so in this case the CPU usage is only a cost for the 
> producer and consumer.
> Worth to mention that the additional 3% extra usage while producing is 
> negligible compared to the gain of batching and compression but maybe that 
> discussion is not related to this KIP, that is a decision between the cluster 
> admin and the clients.
> 
> About the "auto tuning" that is a great idea, again I think it is very 
> ambitious for the scope of this KIP but if the core of this is properly done 
> then this can be used in the future.
> 
> 
> About point two:
> Below is detailed the benefits of bathing and compression :
> - Reduction of network bandwidth while data is produced.
> - Reduction of disk usage to store the data, less IO for read and write the 
> segments (supposing the message format has not to be converted)
> - Reduction of network traffic while data is replicated.
> - Reduction of network traffic while the data is consumer.
> 
> The script I propose will output the percentage of network traffic reduction 
> + the disk space saved.
> - Batching will be recommended based on the parameters $batching-window-time 
> (ms) and $min-records-for-batching the idea is to check the CreationTime of 
> each batch, lets suppose we use:
> 
> batching-window-time = 300
> min-records-for-batching = 30
> 
> * This means we want to check if at least we can batch together 30 records in 
> 300 ms, this could be in 2 batches or in 30 (one record per batch)
> * If the batching is achievable then we jump the next check to simulate the 
> compression even if the compression is already applied as batching more data 
> will improve the compression ratio.
> * Finally the payload ( a few MB are brought to memory in order to get its 
> current size, then it is 
> compressed and the difference is calculated.
> 
> 
> As a side note I think if the classes are properly created this can be reused 
> in the future for a more "automagic" way of usage. Again I really like the 
> idea of allowing the cluster to configure the producers (maybe the producer 
> could have a parameter to allow this)
> 
> I did not enter into details about the code as I would like to know if the 
> idea worth it, I use this "solution" in the company I work and it saved us a 
> lot of money, for now we have just get the output from the dump-logs.sh 
> script in order to see the CreateTime and the compression type, this is 
> a first step but we can't yet simulate the compression.
> So for now we reach our clients saying "there is a potential benefit of cost 
> reduction if you apply these changes in the producer" 
> 
> 
> I hope this help, please feel free to add more feedback 
> 
> Best regards.
> Sergio Troiano
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> On Mon, 16 May 2022 at 10:35, Divij Vaidya  wrote:
>> Thank you for the KIP Sergio.
>> 
>> High level thoughts:
>> 1\ I understand that the idea here is to provide better visibility to the
>> admins about potential improvements using compression and modifying batch
>> size. I would take it a step further and say that we should be providing
>> this visibility in a programmatic push based manner and make this system
>> generic enough so that adding new "optimization rules" in the future is
>> seamless. Perhaps, have a "diagnostic" mode in the cluster, which can be
>> dynamically enabled. In such a mode, the cluster would run a set of
>> "optimization" rules (at the cost of additional CPU cycles). One of such
>> rules would be the compression rule you mentioned in your KIP. At the end
>> of the diagnostic run, the generated report would contain a set of
>> recommendations. To begin with, we can introduce this "diagnostic" as a
>> one-time run by admin and later

Re: [DISCUSS] KIP-824 Allowing dumping segmentlogs limiting the batches in the output

2022-03-24 Thread Sergio Troiano
Hey guys,


What is the next step? Who decides when it is time for voting?


Thanks!

Sent from my iPhone

> On 8 Mar 2022, at 19:57, Sergio Daniel Troiano  
> wrote:
> 
> 
> Hi Michael,
> 
> Yes, it's a good idea and I considered it, the main problem is the 
> FileRecords class does not accept number of batches as a parameter, it 
> accepts bytes instead, so if we want to do so either we redesign a core class 
> or we create a new one.
> One of the pretty things (I consider) about this change which will bring a 
> huge benefit is the change in the code is pretty small and it's on the final 
> script, it does not require any deep change in a core library.
> 
> An alternative which requires a big change as well without touching the 
> FileRecords class would be accept number of batches as parameter, then call 
> the FileRecords.slice() with a small value (bytes) count the batches, see if 
> we can satisfy the number of batches, if not then we call it again and again 
> until we reach the amount of batches. This will require a lot of code as well
> 
> So long story short, the proposal change is quite small, it uses the current 
> classes and has a big benefit.
> 
> Maybe in the future we could consider the FileRecords class to support 
> getting the amount of batches as parameters and we encapsulate this logic in 
> the proper class (FileRecords)
> What do you think?
> 
> Thanks
> Sergio 
> Thanks
> Sergio
> 
>> On Tue, 8 Mar 2022 at 18:32, Mickael Maison  wrote:
>> Hi Sergio,
>> 
>> Thanks for the KIP. Instead of specifying the size in bytes, have you
>> considered specifying it in terms of number of batches? I think it's a
>> bit more user friendly than a size in raw bytes.
>> For example: --num-batches: The number of batches to read from the log 
>> segment.
>> 
>> Thanks,
>> Mickael
>> 
>> 
>> On Tue, Mar 8, 2022 at 5:27 AM Sergio Daniel Troiano
>>  wrote:
>> >
>> > Hi Luke,
>> >
>> > Make sense, done!
>> >
>> > Thank you.
>> > Sergio Troiano
>> >
>> > On Tue, 8 Mar 2022 at 03:02, Luke Chen  wrote:
>> >
>> > > Hi Sergio,
>> > >
>> > > > I don't want this to minimize the main feature I want to deploy as I
>> > > think the
>> > > message size limit is not as important as the limiting the amount of
>> > > batches.
>> > >
>> > > Agree! Let's focus on the feature of limiting the batch amounts.
>> > >
>> > > One more comment to the KIP:
>> > > 1. Could you put the new parameter description into the KIP proposed 
>> > > change
>> > > section? That would make it much clear.
>> > >
>> > >
>> > > Thank you.
>> > > Luke
>> > >
>> > > On Mon, Mar 7, 2022 at 8:44 PM Sergio Daniel Troiano
>> > >  wrote:
>> > >
>> > > > hey Luke,
>> > > >
>> > > > I am interested in expanding the KIP scope but I am a bit concerned 
>> > > > this
>> > > > could create a lot of noise and confusion as they look like very 
>> > > > similar
>> > > > parameters, I agree this is a small change, so I think if I do it
>> > > properly
>> > > > it should not be a problem at all, I just will need a couple more of 
>> > > > days
>> > > > as I want to create the proper tests as well.
>> > > >
>> > > > I have a doubt about editing the KIP, I mean should I add this as a new
>> > > > feature as well?, should I describe this as a side effect finding? I
>> > > don't
>> > > > want this to minimize the main feature I want to deploy as I think the
>> > > > message size limit is not as important as the limiting the amount of
>> > > > batches.
>> > > >
>> > > > It is up to you, if you guys consider we must add this in this KIP 
>> > > > then I
>> > > > will be happy to do it. 
>> > > >
>> > > > Best regards.
>> > > > Sergio Troiano
>> > > >
>> > > > On Mon, 7 Mar 2022 at 02:01, Luke Chen  wrote:
>> > > >
>> > > > > Hi Sergio,
>> > > > >
>> > > > > Thanks for your explanation.
>> > > > > Make sense to me.
>> > > > >
>> > > > > > Only interesting thing that I have just found is *max-message-size
>> > &g

[jira] [Created] (KAFKA-13687) Limit number of batches when using kafka-dump-log.sh

2022-02-22 Thread Sergio Troiano (Jira)
Sergio Troiano created KAFKA-13687:
--

 Summary: Limit number of batches when using kafka-dump-log.sh
 Key: KAFKA-13687
 URL: https://issues.apache.org/jira/browse/KAFKA-13687
 Project: Kafka
  Issue Type: Improvement
Reporter: Sergio Troiano


Currently the kafka-dump-log.sh reads the whole files(s) and dumps the results 
of the segment file(s).

As we know the savings the combination of using compression and batching while 
producing (if the payloads are good candidates) are huge. 

 

We would like to have a way to "monitor" the way the producers produce the 
batches as we not always we can have access to producer metrics.

We have multitenant producers so it is hard to "detect" when the usage is not 
the best.

 

The problem with the current way the DumpLogs works is it reads the whole file, 
in an scenario of having thousands of topics with different segment sizes 
(default is 1 GB) we could end up affecting the cluster balance as we are 
removing useful page from the page cache and adding what we read from files.

 

As we only need to take a few samples from the segments to see the pattern of 
the usage while producing we would like to add a new parameter called 
maxBatches.

 

Based on the current script the change is quite small as it only needs a 
parameter and a counter.

 

After adding this change for example we could take smaller samples and analyze 
the batches headers (searching for compresscodec and the batch count)

 

Doing this we could automate a tool to read all the topics and even going 
further we could take the payloads of those samples when we see the client is 
neither using compression nor batching and simulate a compression of the 
payloads (using batching and compression) then with those numbers we can reach 
the client for the proposal of saving money. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)