Thanks, Luke for the feedback

1. how do we store value in bloom filter? It's unclear from this KIP that
> what we store inside bloom filter, and how we throttle them.
> My understanding is, we have a map with key = kafkaPrinciple, and value =
> PID for each bloom filter.
> And when new PID created for a userA, we update the map to add PID into
> the cache value (i.e. the bloom filter)
> When the window passed half of the time, we created another bloom filter,
> and this time, when new PID comes, we check if this new PID existed in
> previous bloom filter, if not, we add into the new bloom filter. And in the
> meantime, we track the "new created" count (filtered by previous bloom
> filter) for throttling the users.
> Is my understanding correct?

Not quite what am proposing. I'm proposing

   - a cache layer to be used only for checking if we encountered the PID
   before or not for a given KafkaPrincipal. Not as a counter.
   - if the cache layer doesn't contain the PIDs I'll increment the metric
   sensor using Sensor::record (the sensor will be created during the initial
   interaction with this KafkaPrincipal). Sensor::record fails with
   QuotaViolationException when we reach the max of the sensor.
   - If incrementing the sensor didn't fail with QuotaViolationException
   then I'll add the PID to the cache for the next time

To achieve this I'm proposing this the cache layer will be represented as a
"cachedMap = Map<KafkaPrincipal, Map<Timestamp, SimpleBloomFilter<PIDs>>".
I wrapped "Map<Timestamp, SimpleBloomFilter<PIDs>>" into
"TimedControlledBloomFilter" where we decide which bloom filter to write
to, which bloomfilter to delete, etc.

   1. When we encounter the producer for the first time,

   - the new quota manager will create the sensor and update its value.
   - Then it will update the cache with this PID for the next time.

   1. The cache will create an entry for the user in the cachedMap
      - The cache will be like this
         - Map { "UserA" -> TimedBloomFilter {
            -> bloom_filter_1
         2. the PID will be added to bloom_filter_1

2. If the producer tries to produce with the same PID the next time; the
quota manager will not update the sensor or the cache. And will not
throttle the user.
3. However, if the producer tries to produce with a new PID, it will be
added to bloom_filter_1 as long as we are within the first half of
4. If the producer sends any new PIDs after the first half of, we will create a new bloom filter to
store the PIDs from the next half of the window

   - The cache will be like this
   - Map { "UserA" -> TimedBloomFilter {
                                     bloom_filter_1_create_timestamp ->
                                     bloom_filter_2_create_timestamp ->
   - All PIDs from this point until the end of this window will be added to
   - And both bloom_filter_1 and bloom_filter_2 will be used for checking
   if we came across PID before or not.

5. a scheduler will run in the background to delete any bloom filter with
create_timestamp >= from
TimedBloomFilter automatically
6. If the user stopped producing and all its bloom filters got deleted by
the scheduler the user entry will be removed from the cachedMap.

I updated the KIP to add more clarification to this caching layer.

2. what will user get when they can't allocate new PID due to throttling?

The client will get `QuotaViolationException` similar to  ClientQuotaManager

3. This config: is unclear to me?

`quota.window.num` is a standard config between all Kafka quota types. I am
re-using the same description and default value in the existing Kafka
codebase (am not a fan of the description as it's not clear). The sample
here is referring to the sliding time window. Kafka Quotas keeps 10 in
memory + the current window so in total they are 11.

> Finally, I think this KIP is good to have an official KIP discuss thread
for community review.
I will open the official KIP discussion today.


On Tue, Jun 6, 2023 at 10:19 AM Luke Chen <> wrote:

> Hi Omnia,
> I finally got time to check this KIP.
> Thanks for putting all this together.
> Questions:
> 1. how do we store value in bloom filter? It's unclear from this KIP that
> what we store inside bloom filter, and how we throttle them.
> My understanding is, we have a map with key = kafkaPrinciple, and value =
> PID for each bloom filter.
> And when new PID created for a userA, we update the map to add PID into
> the cache value (i.e. the bloom filter)
> When the window passed half of the time, we created another bloom filter,
> and this time, when new PID comes, we check if this new PID existed in
> previous bloom filter, if not, we add into the new bloom filter. And in the
> meantime, we track the "new created" count (filtered by previous bloom
> filter) for throttling the users.
> Is my understanding correct?
> 2. what will user get when they can't allocate new PID due to throttling?
> You might need to address in the KIP.
> 3. This config: is unclear to me?
> "The number of samples to retain in memory for alter producer id quotas"
> What's the "samples" mean here? Do you mean we only track the top 11
> kafkaPrinciple usage each window?
> Finally, I think this KIP is good to have an official KIP discuss thread
> for community review.
> Thanks for the KIP!
> Luke
> On Mon, Jun 5, 2023 at 11:44 PM Omnia Ibrahim <>
> wrote:
>> Hi Justine, Thanks for having a look
>> > One question I have is how will we handle a scenario where potentially
>> each new client has a new Kafka Principal? Is that simply not covered by
>> throttling?
>> if any new client setup a new principal they will be throttled based on
>> the throttling for `/config/users/<default>` or
>> /config/users/<the_new_principal>
>> On Wed, May 31, 2023 at 6:50 PM Justine Olshan <>
>> wrote:
>>> Hey Omnia,
>>> I was doing a bit of snooping (I actually just get updates for the KIP
>>> page) and I saw this draft was in progress. I shared it with some of my
>>> colleagues as well who I previously discussed the issue with.
>>> The initial look was pretty promising to me. I appreciate the detailing
>>> of the rejected options since we had quite a few we worked through :)
>>> One question I have is how will we handle a scenario where potentially
>>> each new client has a new Kafka Principal? Is that simply not covered by
>>> throttling?
>>> Thanks,
>>> Justine
>>> On Wed, May 31, 2023 at 10:08 AM Omnia Ibrahim <>
>>> wrote:
>>>> Hi Justine and Luke,
>>>> I started a KIP draft here
>>>> for a proposal would appreciate it if you could provide any initial
>>>> feedback before opening a broader discussion.
>>>> Thanks
>>>> On Wed, Feb 22, 2023 at 4:35 PM Omnia Ibrahim <>
>>>> wrote:
>>>>> *Hi Justine, *
>>>>> *My initial thought of throttling the initProducerId was to get ripped
>>>>> off the problem at the source (which creates too many PIDs per client) and
>>>>> fail faster but if having this on the produce request level is easier this
>>>>> should be fine. I am guessing it will be the same direction as we may
>>>>> ClientQuotaManage for Produce throttling with a different quota window 
>>>>> than
>>>>> `quota.window.size.seconds `. *
>>>>> *If this is good as an initial solution I can put start a KIP and see
>>>>> what the wider community feels about this. *
>>>>> *Also, I noticed that at some point one of us hit "Replay" instead of
>>>>> "Replay to All" :)  So here are the previous conversations*
>>>>> *On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <
>>>>> <>> wrote:*
>>>>>> Hey Omnia,
>>>>>> Thanks for the response. I think I understand your explanations here
>>>>>> with respect to principal and clientId usage.
>>>>>> For the throttling -- handleInitProducerIdRequest will allocate the
>>>>>> ID to the producer, but we don't actually store it on the broker or
>>>>>> increment our metric until the first produce request for that producer is
>>>>>> sent (or sent again after previously expiring). Would you consider
>>>>>> throttling the produce request instead? It may be hard to get any metrics
>>>>>> from the transaction coordinator where the initProducerId request is
>>>>>> handled.
>>>>>> Justine
>>>>> *On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim
>>>>> < <>> wrote:*
>>>>>> Hey Justine,
>>>>>> > If I understand your message correctly, there are issues with
>>>>>> identifying the source of the rogue clients? So you propose to add a new
>>>>>> metric for that?
>>>>>> > And also proposing to throttle based on clientId as a potential
>>>>>> follow up?
>>>>>> I want to identify rogue clients by KafkaPrincipal (and/or clientId)
>>>>>> similarly to how we identify clients in Fetch/Produce/Request
>>>>>> QuotaManagers. Using KafkaPrincipal should give cluster admin the ability
>>>>>> to throttle later based on principal which is most likely to be a smaller
>>>>>> set than clientIds. My initial thought was to add a metrics that 
>>>>>> represent
>>>>>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or 
>>>>>> clientId)
>>>>>> similar to Fetch/Produce QuotaManagers.
>>>>>> Then as a follow-up, we can throttle based on either KafkaPrinciple
>>>>>> or clientId (maybe default as well to align this with other QuotaManagers
>>>>>> in Kafka).
>>>>>> >1. Does we rely on the client using the same ID? What if there are
>>>>>> many clients that all use different client IDs?
>>>>>> This is why I want to use the combination of KafkaPrincipal or
>>>>>> clientId similar to some other quotas we have in Kafka already. This will
>>>>>> be a similar risk to Fetch/Produce quota in Kafka which also relay on the
>>>>>> client to use the same clientId and KafkaPrincipal.
>>>>>> >2. Are there places where high cardinality of this metric is a
>>>>>> concern? I can imagine many client IDs in the system. Would we treat this
>>>>>> as a rate metric (ie, when we get an init producer ID and return a new
>>>>>> producer ID we emit a count for that client id?) Or something else?
>>>>>> My initial thought here was to follow the steps of ClientQuotaManager
>>>>>> and ClientRequestQuotaManager and use a rate metric. However, I think we
>>>>>> can emit it either
>>>>>>    1. when we return the new PID. However, I have concerns that we
>>>>>>    may circle back to the previous concerns with OMM due to keeping 
>>>>>> track of
>>>>>>    ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also 
>>>>>> this
>>>>>>    would be the first time Kafka throttle IDs for any client.
>>>>>>    2. or once we recieve initProducerIDRequest and throttle before
>>>>>>    even hitting `handleInitProducerIdRequest`. Going this direction we 
>>>>>> may
>>>>>>    need to throttle it within a different quota window than `
>>>>>>    quota.window.size.seconds ` as throttling INIT_PRODUCER_ID
>>>>>>    request per second wouldn't be efficient for most cases. I personally 
>>>>>> think
>>>>>>    this direction is easier as it seems more consistent with the existing
>>>>>>    quota implementation. Specially that Kafka has already the concept of
>>>>>>    throttling subset of requests (in ControllerMutationQuotaManager) but 
>>>>>> never
>>>>>>    had any concept of throttling active IDs for any client.
>>>>>> What do you think?
>>>>>> Thanks
>>>>>> Omnia
>>>>> *On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <
>>>>> <>> wrote:*
>>>>>> Hey Omnia,
>>>>>> Sorry for losing track of this.
>>>>>> If I understand your message correctly, there are issues with
>>>>>> identifying the source of the rogue clients? So you propose to add a new
>>>>>> metric for that?
>>>>>> And also proposing to throttle based on clientId as a potential
>>>>>> follow up?
>>>>>> I think both of these make sense. The only things I can think of for
>>>>>> the metric are:
>>>>>> 1. Does we rely on the client using the same ID? What if there are
>>>>>> many clients that all use different client IDs?
>>>>>> 2. Are there places where high cardinality of this metric is a
>>>>>> concern? I can imagine many client IDs in the system. Would we treat this
>>>>>> as a rate metric (ie, when we get an init producer ID and return a new
>>>>>> producer ID we emit a count for that client id?) Or something else?
>>>>>> Thanks,
>>>>>> Justine
>>>>> Thanks
>>>>> Omnia
>>>>> On Thu, Feb 2, 2023 at 4:44 PM Omnia Ibrahim <>
>>>>> wrote:
>>>>>> Hi Luke and Justine,
>>>>>> Are there any thoughts or updates on this? I would love to help with
>>>>>> this as we are hitting this more frequently now.
>>>>>> best,
>>>>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <
>>>>>>> wrote:
>>>>>>> Hi Luke and Justine,
>>>>>>>> For (3), you said:
>>>>>>>> > - I have some concerns about the impact of this option on the
>>>>>>>> transactional
>>>>>>>> producers, for example, what will happen to an ongoing transaction
>>>>>>>> associated with an expired PID? Would this leave the transactions
>>>>>>>> in a
>>>>>>>> "hanging" state?
>>>>>>>> - How will we notify the client that the transaction can't continue
>>>>>>>> due to
>>>>>>>> an expired PID?
>>>>>>>> - If PID got marked as `expired` this will mean that
>>>>>>>> `admin.DescribeProducers` will not list them which will make
>>>>>>>> *`
>>>>>>>> --list`* a bit tricky as we can't identify if there are
>>>>>>>> transactions linked
>>>>>>>> to this expired PID or not. The same concern applies to
>>>>>>>> *`
>>>>>>>> --find-hanging`*.
>>>>>>>> --> Yes, you're right. Those are also concerns for this solution.
>>>>>>>> Currently, there's no way to notify clients about the expiration.
>>>>>>>> Also, the ongoing transactions will be hanging. For the admin cli,
>>>>>>>> we've
>>>>>>>> never thought about it. Good point.
>>>>>>>> In summary, to adopt this solution, there are many issues needed to
>>>>>>>> get
>>>>>>>> fixed.
>>>>>>> Justin already clarified that if PID is attached to a transaction it
>>>>>>> will not expire so identifying the transactions shouldn't be a concern
>>>>>>> anymore.
>>>>>>> The only concern here will be that this solution will not solve the
>>>>>>> problem if the rouge producer is a transactional producer with hanging
>>>>>>> transactions.
>>>>>>> If anyone faced this situation they will need to abort the hanging
>>>>>>> transactions manually and then the solution to expire a PID can then 
>>>>>>> work.
>>>>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id
>>>>>>>> is also
>>>>>>>> workable.
>>>>>>>> It's just these 2 attributes are not required.
>>>>>>>> That is, it's possible we take all clients as the same one: {default
>>>>>>>> KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>>>> Do you have any thoughts about it?
>>>>>>>> Maybe skip throttling for {default KafkaPrinciple + default
>>>>>>>> clientID}
>>>>>>> Throttling for default KafkaPrinciple and default ClientID is useful
>>>>>>> when we need to have a hard limit on the whole cluster and whoever is
>>>>>>> running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is
>>>>>>> reused between different producer applications.
>>>>>>> I usually find it helpful to have a way to apply throttling only on
>>>>>>> the rough clients only once I identify them without punishing everyone 
>>>>>>> on
>>>>>>> the cluster. However, there are two problems with this
>>>>>>> - There's no easy way at the moment to link PIDs to clientId or
>>>>>>> KafkaPrinciple. This need to be addressed first.
>>>>>>> - Is Justin's comment on the throttling, and the fact that will mean
>>>>>>> we either block all requests or have to store the request in memory 
>>>>>>> which
>>>>>>> in both cases has side downs on the producer experince.
>>>>>>> I recently had another discussion with my team and it does seem like
>>>>>>>> there
>>>>>>>> should be a way to make it more clear to the clients what is going
>>>>>>>> on. A
>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a
>>>>>>>> way to
>>>>>>>> improve the story for newer clients. (Ie if we choose to expire
>>>>>>>> based on a
>>>>>>>> size limit, we should include a response indicating the ID has
>>>>>>>> expired.) We
>>>>>>>> also discussed ways to redefine the guarantees so that users who
>>>>>>>> have
>>>>>>>> stronger idempotency requirements can ensure them (over
>>>>>>>> availability/memory
>>>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>> It may be easier to improve the experience for new clients. However,
>>>>>>> if we improved only the new clients we may need a way to help teams who 
>>>>>>> run
>>>>>>> Kafka with rough clients on old versions by at least giving them an easy
>>>>>>> way to identify the clientId/ or KafkaPrinciple that generated these 
>>>>>>> PIDs.
>>>>>>> For context, it's very tricky to even identify which clientId is
>>>>>>> creating all these PIDs that caused OOM, which is a contributing part of
>>>>>>> the issue at the moment. So maybe one option here could be adding a new
>>>>>>> metric that tracks the number of generated PIDs per clientId. This will
>>>>>>> help the team who runs the Kafka cluster to
>>>>>>> - contact these rough clients and ask them to fix their clients or
>>>>>>> upgrade to a new client if the new client version has a better 
>>>>>>> experience.
>>>>>>> - or if ended with a throttling solution this may help identify
>>>>>>> which clientId needs to be throttled.
>>>>>>> Maybe we can start with a solution for identifying the rough clients
>>>>>>> first and keep looking for a solution to limit them, what do you think?
>>>>>>> Thanks
>>>>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan
>>>>>>> <> wrote:
>>>>>>>> Oops.  I realized I just replied to Omnia 🤦‍♀️
>>>>>>>> Here was my response for the mailing thread:
>>>>>>>> Hey Omnia,
>>>>>>>> Sorry to hear this is a problem for you as well. :(
>>>>>>>> > * I have some concerns about the impact of this option on the
>>>>>>>> transactional producers, for example, what will happen to an ongoing
>>>>>>>> transaction associated with an expired PID? Would this leave the
>>>>>>>> transactions in a "hanging" state?*
>>>>>>>> We currently check if a transaction is ongoing and do not expire the
>>>>>>>> producer ID if it has an ongoing transaction. I suspect we will
>>>>>>>> continue to
>>>>>>>> do this with any solution we pick.
>>>>>>>> My team members and I looked a bit into the throttling case and it
>>>>>>>> can get
>>>>>>>> a bit tricky since it means we need to throttle the produce request
>>>>>>>> before
>>>>>>>> it is processed. This means we either block all requests or have to
>>>>>>>> store
>>>>>>>> the request in memory (which is not great if we are trying to save
>>>>>>>> memory).
>>>>>>>> I recently had another discussion with my team and it does seem
>>>>>>>> like there
>>>>>>>> should be a way to make it more clear to the clients what is going
>>>>>>>> on. A
>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a
>>>>>>>> way to
>>>>>>>> improve the story for newer clients. (Ie if we choose to expire
>>>>>>>> based on a
>>>>>>>> size limit, we should include a response indicating the ID has
>>>>>>>> expired.) We
>>>>>>>> also discussed ways to redefine the guarantees so that users who
>>>>>>>> have
>>>>>>>> stronger idempotency requirements can ensure them (over
>>>>>>>> availability/memory
>>>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>>> Thanks again for commenting here, hopefully we can come to a good
>>>>>>>> solution.
>>>>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <>
>>>>>>>> wrote:
>>>>>>>> > Hi Omnia,
>>>>>>>> >
>>>>>>>> > Thanks for your reply.
>>>>>>>> >
>>>>>>>> > For (3), you said:
>>>>>>>> > > - I have some concerns about the impact of this option on the
>>>>>>>> > transactional
>>>>>>>> > producers, for example, what will happen to an ongoing transaction
>>>>>>>> > associated with an expired PID? Would this leave the transactions
>>>>>>>> in a
>>>>>>>> > "hanging" state?
>>>>>>>> >
>>>>>>>> > - How will we notify the client that the transaction can't
>>>>>>>> continue due to
>>>>>>>> > an expired PID?
>>>>>>>> >
>>>>>>>> > - If PID got marked as `expired` this will mean that
>>>>>>>> > `admin.DescribeProducers` will not list them which will make
>>>>>>>> > *`
>>>>>>>> > --list`* a bit tricky as we can't identify if there are
>>>>>>>> transactions linked
>>>>>>>> > to this expired PID or not. The same concern applies to
>>>>>>>> > *`
>>>>>>>> > --find-hanging`*.
>>>>>>>> >
>>>>>>>> > --> Yes, you're right. Those are also concerns for this solution.
>>>>>>>> > Currently, there's no way to notify clients about the expiration.
>>>>>>>> > Also, the ongoing transactions will be hanging. For the admin
>>>>>>>> cli, we've
>>>>>>>> > never thought about it. Good point.
>>>>>>>> > In summary, to adopt this solution, there are many issues needed
>>>>>>>> to get
>>>>>>>> > fixed.
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > For (5), you said:
>>>>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your
>>>>>>>> concern here
>>>>>>>> > that
>>>>>>>> > those good clients that use the same principal as a rogue one
>>>>>>>> will get
>>>>>>>> > throttled?
>>>>>>>> >
>>>>>>>> > If this is the case, then I believe it should be okay as other
>>>>>>>> throttling
>>>>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > What about applying limit/throttling to
>>>>>>>> > *`/config/users/<user>/clients/<client-id>`
>>>>>>>> > *similar to what we have with client quota? This should reduce
>>>>>>>> the concern
>>>>>>>> > about throttling good clients, right?
>>>>>>>> >
>>>>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client
>>>>>>>> Id is
>>>>>>>> > also workable.
>>>>>>>> > It's just these 2 attributes are not required.
>>>>>>>> > That is, it's possible we take all clients as the same one:
>>>>>>>> {default
>>>>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>>>> > Do you have any thoughts about it?
>>>>>>>> > Maybe skip throttling for {default KafkaPrinciple + default
>>>>>>>> clientID} ?
>>>>>>>> >
>>>>>>>> > Luke
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim <
>>>>>>>> > wrote:
>>>>>>>> >
>>>>>>>> >> Hi Luke & Justine,
>>>>>>>> >> Thanks for looking into this issue, we have been experiencing
>>>>>>>> this because
>>>>>>>> >> of rouge clients as well.
>>>>>>>> >>
>>>>>>>> >> > 3. Having a limit to the number of active producer IDs (sort
>>>>>>>> of like an
>>>>>>>> >> LRU
>>>>>>>> >> >cache)
>>>>>>>> >> >-> The idea here is that if we hit a misconfigured client, we
>>>>>>>> will expire
>>>>>>>> >> >the older entries. The concern here is we have risks to lose
>>>>>>>> idempotency
>>>>>>>> >> >guarantees, and currently, we don't have a way to notify
>>>>>>>> clients about
>>>>>>>> >> >losing idempotency guarantees. Besides, the least  recently
>>>>>>>> used entries
>>>>>>>> >> >got removed are not always from the "bad" clients.
>>>>>>>> >>
>>>>>>>> >> - I have some concerns about the impact of this option on the
>>>>>>>> >> transactional
>>>>>>>> >> producers, for example, what will happen to an ongoing
>>>>>>>> transaction
>>>>>>>> >> associated with an expired PID? Would this leave the
>>>>>>>> transactions in a
>>>>>>>> >> "hanging" state?
>>>>>>>> >>
>>>>>>>> >> - How will we notify the client that the transaction can't
>>>>>>>> continue due to
>>>>>>>> >> an expired PID?
>>>>>>>> >>
>>>>>>>> >> - If PID got marked as `expired` this will mean that
>>>>>>>> >> `admin.DescribeProducers` will not list them which will make
>>>>>>>> >> *`
>>>>>>>> >> --list`* a bit tricky as we can't identify if there are
>>>>>>>> transactions
>>>>>>>> >> linked
>>>>>>>> >> to this expired PID or not. The same concern applies to
>>>>>>>> >> *`
>>>>>>>> >> --find-hanging`*.
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> >5. limit/throttling the producer id based on the principle
>>>>>>>> >> >-> Although we can limit the impact to a certain principle with
>>>>>>>> this
>>>>>>>> >> idea,
>>>>>>>> >> >same concern still exists as solution #1 #2.
>>>>>>>> >>
>>>>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your
>>>>>>>> concern here
>>>>>>>> >> that
>>>>>>>> >> those good clients that use the same principal as a rogue one
>>>>>>>> will get
>>>>>>>> >> throttled?
>>>>>>>> >>
>>>>>>>> >> If this is the case, then I believe it should be okay as other
>>>>>>>> throttling
>>>>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> What about applying limit/throttling to
>>>>>>>> >> *`/config/users/<user>/clients/<client-id>`
>>>>>>>> >> *similar to what we have with client quota? This should reduce
>>>>>>>> the concern
>>>>>>>> >> about throttling good clients, right?
>>>>>>>> >>
>>>>>>>> >> best,
>>>>>>>> >>
>>>>>>>> >> Omnia
>>>>>>>> >>
>>>>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <>
>>>>>>>> wrote:
>>>>>>>> >>
>>>>>>>> >> > Bump this thread to see if there are any comments/thoughts.
>>>>>>>> >> > Thanks.
>>>>>>>> >> >
>>>>>>>> >> > Luke
>>>>>>>> >> >
>>>>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <>
>>>>>>>> wrote:
>>>>>>>> >> >
>>>>>>>> >> > > Hi devs,
>>>>>>>> >> > >
>>>>>>>> >> > > As stated in the motivation section in KIP-854
>>>>>>>> >> > > <
>>>>>>>> >> >
>>>>>>>> >>
>>>>>>>> >> > >:
>>>>>>>> >> > >
>>>>>>>> >> > > With idempotent producers becoming the default in Kafka,
>>>>>>>> this means
>>>>>>>> >> that
>>>>>>>> >> > > unless otherwise specified, all new producers will be given
>>>>>>>> producer
>>>>>>>> >> IDs.
>>>>>>>> >> > > Some (inefficient) applications may now create many
>>>>>>>> non-transactional
>>>>>>>> >> > > idempotent producers. Each of these producers will be
>>>>>>>> assigned a
>>>>>>>> >> producer
>>>>>>>> >> > > ID and these IDs and their metadata are stored in the broker
>>>>>>>> memory,
>>>>>>>> >> > which
>>>>>>>> >> > > might cause brokers out of memory.
>>>>>>>> >> > >
>>>>>>>> >> > > Justine (in cc.) and I and some other team members are
>>>>>>>> working on the
>>>>>>>> >> > > solutions for this issue. But none of them solves it
>>>>>>>> completely
>>>>>>>> >> without
>>>>>>>> >> > > side effects. Among them, "availability" VS "idempotency
>>>>>>>> guarantees"
>>>>>>>> >> is
>>>>>>>> >> > > what we can't decide which to sacrifice. Some of these
>>>>>>>> solutions
>>>>>>>> >> > sacrifice
>>>>>>>> >> > > availability of produce (1,2,5) and others sacrifice
>>>>>>>> idempotency
>>>>>>>> >> > guarantees
>>>>>>>> >> > > (3). It could be useful to know if people generally have a
>>>>>>>> preference
>>>>>>>> >> one
>>>>>>>> >> > > way or the other. Or what other better solutions there might
>>>>>>>> be.
>>>>>>>> >> > >
>>>>>>>> >> > > Here are the proposals we came up with:
>>>>>>>> >> > >
>>>>>>>> >> > > 1. Limit the total active producer ID allocation number.
>>>>>>>> >> > > -> This is the simplest solution. But since the OOM issue is
>>>>>>>> usually
>>>>>>>> >> > > caused by a rogue or misconfigured client, and this solution
>>>>>>>> might
>>>>>>>> >> > "punish"
>>>>>>>> >> > > the good client from sending messages.
>>>>>>>> >> > >
>>>>>>>> >> > > 2. Throttling the producer ID allocation rate
>>>>>>>> >> > > -> Same concern as the solution #1.
>>>>>>>> >> > >
>>>>>>>> >> > > 3. Having a limit to the number of active producer IDs (sort
>>>>>>>> of like
>>>>>>>> >> an
>>>>>>>> >> > > LRU cache)
>>>>>>>> >> > > -> The idea here is that if we hit a misconfigured client,
>>>>>>>> we will
>>>>>>>> >> expire
>>>>>>>> >> > > the older entries. The concern here is we have risks to lose
>>>>>>>> >> idempotency
>>>>>>>> >> > > guarantees, and currently, we don't have a way to notify
>>>>>>>> clients about
>>>>>>>> >> > > losing idempotency guarantees. Besides, the least  recently
>>>>>>>> used
>>>>>>>> >> entries
>>>>>>>> >> > > got removed are not always from the "bad" clients.
>>>>>>>> >> > >
>>>>>>>> >> > > 4. allow clients to "close" the producer ID usage
>>>>>>>> >> > > -> We can provide a way for producer to "close" producerID
>>>>>>>> usage.
>>>>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested
>>>>>>>> to
>>>>>>>> >> allocate
>>>>>>>> >> > > one. After that, we'll keep the producer ID metadata in
>>>>>>>> broker even if
>>>>>>>> >> > the
>>>>>>>> >> > > producer is "closed". Having a closed API (ex:
>>>>>>>> END_PRODUCER_ID), we
>>>>>>>> >> can
>>>>>>>> >> > > remove the entry from broker side. In client side, we can
>>>>>>>> send it when
>>>>>>>> >> > > producer closing. The concern is, the old clients (including
>>>>>>>> non-java
>>>>>>>> >> > > clients) will still suffer from the OOM issue.
>>>>>>>> >> > >
>>>>>>>> >> > > 5. limit/throttling the producer id based on the principle
>>>>>>>> >> > > -> Although we can limit the impact to a certain principle
>>>>>>>> with this
>>>>>>>> >> > idea,
>>>>>>>> >> > > same concern still exists as solution #1 #2.
>>>>>>>> >> > >
>>>>>>>> >> > > Any thoughts/feedback are welcomed.
>>>>>>>> >> > >
>>>>>>>> >> > > Thank you.
>>>>>>>> >> > > Luke
>>>>>>>> >> > >
>>>>>>>> >> >
>>>>>>>> >>
>>>>>>>> >
>>>>> On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <>
>>>>> wrote:
>>>>>> Hey Omnia,
>>>>>> Thanks for the response. I think I understand your explanations here
>>>>>> with respect to principal and clientId usage.
>>>>>> For the throttling -- handleInitProducerIdRequest will allocate the
>>>>>> ID to the producer, but we don't actually store it on the broker or
>>>>>> increment our metric until the first produce request for that producer is
>>>>>> sent (or sent again after previously expiring). Would you consider
>>>>>> throttling the produce request instead? It may be hard to get any metrics
>>>>>> from the transaction coordinator where the initProducerId request is
>>>>>> handled.
>>>>>> Justine
>>>>>> On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <
>>>>>>> wrote:
>>>>>>> Hey Justine,
>>>>>>> > If I understand your message correctly, there are issues with
>>>>>>> identifying the source of the rogue clients? So you propose to add a new
>>>>>>> metric for that?
>>>>>>> > And also proposing to throttle based on clientId as a potential
>>>>>>> follow up?
>>>>>>> I want to identify rogue clients by KafkaPrincipal (and/or clientId)
>>>>>>> similarly to how we identify clients in Fetch/Produce/Request
>>>>>>> QuotaManagers. Using KafkaPrincipal should give cluster admin the 
>>>>>>> ability
>>>>>>> to throttle later based on principal which is most likely to be a 
>>>>>>> smaller
>>>>>>> set than clientIds. My initial thought was to add a metrics that 
>>>>>>> represent
>>>>>>> how many InitProducerIDRequest are sent by KafkaPrincipal (and/or 
>>>>>>> clientId)
>>>>>>> similar to Fetch/Produce QuotaManagers.
>>>>>>> Then as a follow-up, we can throttle based on either KafkaPrinciple
>>>>>>> or clientId (maybe default as well to align this with other 
>>>>>>> QuotaManagers
>>>>>>> in Kafka).
>>>>>>> >1. Does we rely on the client using the same ID? What if there are
>>>>>>> many clients that all use different client IDs?
>>>>>>> This is why I want to use the combination of KafkaPrincipal or
>>>>>>> clientId similar to some other quotas we have in Kafka already. This 
>>>>>>> will
>>>>>>> be a similar risk to Fetch/Produce quota in Kafka which also relay on 
>>>>>>> the
>>>>>>> client to use the same clientId and KafkaPrincipal.
>>>>>>> >2. Are there places where high cardinality of this metric is a
>>>>>>> concern? I can imagine many client IDs in the system. Would we treat 
>>>>>>> this
>>>>>>> as a rate metric (ie, when we get an init producer ID and return a new
>>>>>>> producer ID we emit a count for that client id?) Or something else?
>>>>>>> My initial thought here was to follow the steps of
>>>>>>> ClientQuotaManager and ClientRequestQuotaManager and use a rate metric.
>>>>>>> However, I think we can emit it either
>>>>>>>    1. when we return the new PID. However, I have concerns that we
>>>>>>>    may circle back to the previous concerns with OMM due to keeping 
>>>>>>> track of
>>>>>>>    ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also 
>>>>>>> this
>>>>>>>    would be the first time Kafka throttle IDs for any client.
>>>>>>>    2. or once we recieve initProducerIDRequest and throttle before
>>>>>>>    even hitting `handleInitProducerIdRequest`. Going this direction we 
>>>>>>> may
>>>>>>>    need to throttle it within a different quota window than `
>>>>>>>    quota.window.size.seconds ` as throttling INIT_PRODUCER_ID
>>>>>>>    request per second wouldn't be efficient for most cases. I 
>>>>>>> personally think
>>>>>>>    this direction is easier as it seems more consistent with the 
>>>>>>> existing
>>>>>>>    quota implementation. Specially that Kafka has already the concept of
>>>>>>>    throttling subset of requests (in ControllerMutationQuotaManager) 
>>>>>>> but never
>>>>>>>    had any concept of throttling active IDs for any client.
>>>>>>> What do you think?
>>>>>>> Thanks
>>>>>>> Omnia
>>>>>>> On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <>
>>>>>>> wrote:
>>>>>>>> Hey Omnia,
>>>>>>>> Sorry for losing track of this.
>>>>>>>> If I understand your message correctly, there are issues with
>>>>>>>> identifying the source of the rogue clients? So you propose to add a 
>>>>>>>> new
>>>>>>>> metric for that?
>>>>>>>> And also proposing to throttle based on clientId as a potential
>>>>>>>> follow up?
>>>>>>>> I think both of these make sense. The only things I can think of
>>>>>>>> for the metric are:
>>>>>>>> 1. Does we rely on the client using the same ID? What if there are
>>>>>>>> many clients that all use different client IDs?
>>>>>>>> 2. Are there places where high cardinality of this metric is a
>>>>>>>> concern? I can imagine many client IDs in the system. Would we treat 
>>>>>>>> this
>>>>>>>> as a rate metric (ie, when we get an init producer ID and return a new
>>>>>>>> producer ID we emit a count for that client id?) Or something else?
>>>>>>>> Thanks,
>>>>>>>> Justine
>>>>>>>> On Thu, Feb 2, 2023 at 8:44 AM Omnia Ibrahim <
>>>>>>>>> wrote:
>>>>>>>>> Hi Luke and Justine,
>>>>>>>>> Are there any thoughts or updates on this? I would love to help
>>>>>>>>> with this as we are hitting this more frequently now.
>>>>>>>>> best,
>>>>>>>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <
>>>>>>>>>> wrote:
>>>>>>>>>> Hi Luke and Justine,
>>>>>>>>>>> For (3), you said:
>>>>>>>>>>> > - I have some concerns about the impact of this option on the
>>>>>>>>>>> transactional
>>>>>>>>>>> producers, for example, what will happen to an ongoing
>>>>>>>>>>> transaction
>>>>>>>>>>> associated with an expired PID? Would this leave the
>>>>>>>>>>> transactions in a
>>>>>>>>>>> "hanging" state?
>>>>>>>>>>> - How will we notify the client that the transaction can't
>>>>>>>>>>> continue due to
>>>>>>>>>>> an expired PID?
>>>>>>>>>>> - If PID got marked as `expired` this will mean that
>>>>>>>>>>> `admin.DescribeProducers` will not list them which will make
>>>>>>>>>>> *`
>>>>>>>>>>> --list`* a bit tricky as we can't identify if there are
>>>>>>>>>>> transactions linked
>>>>>>>>>>> to this expired PID or not. The same concern applies to
>>>>>>>>>>> *`
>>>>>>>>>>> --find-hanging`*.
>>>>>>>>>>> --> Yes, you're right. Those are also concerns for this solution.
>>>>>>>>>>> Currently, there's no way to notify clients about the expiration.
>>>>>>>>>>> Also, the ongoing transactions will be hanging. For the admin
>>>>>>>>>>> cli, we've
>>>>>>>>>>> never thought about it. Good point.
>>>>>>>>>>> In summary, to adopt this solution, there are many issues needed
>>>>>>>>>>> to get
>>>>>>>>>>> fixed.
>>>>>>>>>> Justin already clarified that if PID is attached to a transaction
>>>>>>>>>> it will not expire so identifying the transactions shouldn't be a 
>>>>>>>>>> concern
>>>>>>>>>> anymore.
>>>>>>>>>> The only concern here will be that this solution will not solve
>>>>>>>>>> the problem if the rouge producer is a transactional producer with 
>>>>>>>>>> hanging
>>>>>>>>>> transactions.
>>>>>>>>>> If anyone faced this situation they will need to abort the
>>>>>>>>>> hanging transactions manually and then the solution to expire a PID 
>>>>>>>>>> can
>>>>>>>>>> then work.
>>>>>>>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>>>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client
>>>>>>>>>>> Id is also
>>>>>>>>>>> workable.
>>>>>>>>>>> It's just these 2 attributes are not required.
>>>>>>>>>>> That is, it's possible we take all clients as the same one:
>>>>>>>>>>> {default
>>>>>>>>>>> KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>>>>>>> Do you have any thoughts about it?
>>>>>>>>>>> Maybe skip throttling for {default KafkaPrinciple + default
>>>>>>>>>>> clientID}
>>>>>>>>>> Throttling for default KafkaPrinciple and default ClientID is
>>>>>>>>>> useful when we need to have a hard limit on the whole cluster and 
>>>>>>>>>> whoever
>>>>>>>>>> is running the cluster doesn't knowclientsntIDs or if a 
>>>>>>>>>> KafkaPrinciple is
>>>>>>>>>> reused between different producer applications.
>>>>>>>>>> I usually find it helpful to have a way to apply throttling only
>>>>>>>>>> on the rough clients only once I identify them without punishing 
>>>>>>>>>> everyone
>>>>>>>>>> on the cluster. However, there are two problems with this
>>>>>>>>>> - There's no easy way at the moment to link PIDs to clientId or
>>>>>>>>>> KafkaPrinciple. This need to be addressed first.
>>>>>>>>>> - Is Justin's comment on the throttling, and the fact that will
>>>>>>>>>> mean we either block all requests or have to store the request in 
>>>>>>>>>> memory
>>>>>>>>>> which in both cases has side downs on the producer experince.
>>>>>>>>>> I recently had another discussion with my team and it does seem
>>>>>>>>>>> like there
>>>>>>>>>>> should be a way to make it more clear to the clients what is
>>>>>>>>>>> going on. A
>>>>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there
>>>>>>>>>>> is a way to
>>>>>>>>>>> improve the story for newer clients. (Ie if we choose to expire
>>>>>>>>>>> based on a
>>>>>>>>>>> size limit, we should include a response indicating the ID has
>>>>>>>>>>> expired.) We
>>>>>>>>>>> also discussed ways to redefine the guarantees so that users who
>>>>>>>>>>> have
>>>>>>>>>>> stronger idempotency requirements can ensure them (over
>>>>>>>>>>> availability/memory
>>>>>>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>>>>> It may be easier to improve the experience for new clients.
>>>>>>>>>> However, if we improved only the new clients we may need a way to 
>>>>>>>>>> help
>>>>>>>>>> teams who run Kafka with rough clients on old versions by at least 
>>>>>>>>>> giving
>>>>>>>>>> them an easy way to identify the clientId/ or KafkaPrinciple that 
>>>>>>>>>> generated
>>>>>>>>>> these PIDs.
>>>>>>>>>> For context, it's very tricky to even identify which clientId is
>>>>>>>>>> creating all these PIDs that caused OOM, which is a contributing 
>>>>>>>>>> part of
>>>>>>>>>> the issue at the moment. So maybe one option here could be adding a 
>>>>>>>>>> new
>>>>>>>>>> metric that tracks the number of generated PIDs per clientId. This 
>>>>>>>>>> will
>>>>>>>>>> help the team who runs the Kafka cluster to
>>>>>>>>>> - contact these rough clients and ask them to fix their clients
>>>>>>>>>> or upgrade to a new client if the new client version has a better
>>>>>>>>>> experience.
>>>>>>>>>> - or if ended with a throttling solution this may help identify
>>>>>>>>>> which clientId needs to be throttled.
>>>>>>>>>> Maybe we can start with a solution for identifying the rough
>>>>>>>>>> clients first and keep looking for a solution to limit them, what do 
>>>>>>>>>> you
>>>>>>>>>> think?
>>>>>>>>>> Thanks
>>>>>>>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan
>>>>>>>>>> <> wrote:
>>>>>>>>>>> Oops.  I realized I just replied to Omnia 🤦‍♀️
>>>>>>>>>>> Here was my response for the mailing thread:
>>>>>>>>>>> Hey Omnia,
>>>>>>>>>>> Sorry to hear this is a problem for you as well. :(
>>>>>>>>>>> > * I have some concerns about the impact of this option on the
>>>>>>>>>>> transactional producers, for example, what will happen to an
>>>>>>>>>>> ongoing
>>>>>>>>>>> transaction associated with an expired PID? Would this leave the
>>>>>>>>>>> transactions in a "hanging" state?*
>>>>>>>>>>> We currently check if a transaction is ongoing and do not expire
>>>>>>>>>>> the
>>>>>>>>>>> producer ID if it has an ongoing transaction. I suspect we will
>>>>>>>>>>> continue to
>>>>>>>>>>> do this with any solution we pick.
>>>>>>>>>>> My team members and I looked a bit into the throttling case and
>>>>>>>>>>> it can get
>>>>>>>>>>> a bit tricky since it means we need to throttle the produce
>>>>>>>>>>> request before
>>>>>>>>>>> it is processed. This means we either block all requests or have
>>>>>>>>>>> to store
>>>>>>>>>>> the request in memory (which is not great if we are trying to
>>>>>>>>>>> save memory).
>>>>>>>>>>> I recently had another discussion with my team and it does seem
>>>>>>>>>>> like there
>>>>>>>>>>> should be a way to make it more clear to the clients what is
>>>>>>>>>>> going on. A
>>>>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there
>>>>>>>>>>> is a way to
>>>>>>>>>>> improve the story for newer clients. (Ie if we choose to expire
>>>>>>>>>>> based on a
>>>>>>>>>>> size limit, we should include a response indicating the ID has
>>>>>>>>>>> expired.) We
>>>>>>>>>>> also discussed ways to redefine the guarantees so that users who
>>>>>>>>>>> have
>>>>>>>>>>> stronger idempotency requirements can ensure them (over
>>>>>>>>>>> availability/memory
>>>>>>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>>>>>> Thanks again for commenting here, hopefully we can come to a
>>>>>>>>>>> good solution.
>>>>>>>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <>
>>>>>>>>>>> wrote:
>>>>>>>>>>> > Hi Omnia,
>>>>>>>>>>> >
>>>>>>>>>>> > Thanks for your reply.
>>>>>>>>>>> >
>>>>>>>>>>> > For (3), you said:
>>>>>>>>>>> > > - I have some concerns about the impact of this option on the
>>>>>>>>>>> > transactional
>>>>>>>>>>> > producers, for example, what will happen to an ongoing
>>>>>>>>>>> transaction
>>>>>>>>>>> > associated with an expired PID? Would this leave the
>>>>>>>>>>> transactions in a
>>>>>>>>>>> > "hanging" state?
>>>>>>>>>>> >
>>>>>>>>>>> > - How will we notify the client that the transaction can't
>>>>>>>>>>> continue due to
>>>>>>>>>>> > an expired PID?
>>>>>>>>>>> >
>>>>>>>>>>> > - If PID got marked as `expired` this will mean that
>>>>>>>>>>> > `admin.DescribeProducers` will not list them which will make
>>>>>>>>>>> > *`
>>>>>>>>>>> > --list`* a bit tricky as we can't identify if there are
>>>>>>>>>>> transactions linked
>>>>>>>>>>> > to this expired PID or not. The same concern applies to
>>>>>>>>>>> > *`
>>>>>>>>>>> > --find-hanging`*.
>>>>>>>>>>> >
>>>>>>>>>>> > --> Yes, you're right. Those are also concerns for this
>>>>>>>>>>> solution.
>>>>>>>>>>> > Currently, there's no way to notify clients about the
>>>>>>>>>>> expiration.
>>>>>>>>>>> > Also, the ongoing transactions will be hanging. For the admin
>>>>>>>>>>> cli, we've
>>>>>>>>>>> > never thought about it. Good point.
>>>>>>>>>>> > In summary, to adopt this solution, there are many issues
>>>>>>>>>>> needed to get
>>>>>>>>>>> > fixed.
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > For (5), you said:
>>>>>>>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your
>>>>>>>>>>> concern here
>>>>>>>>>>> > that
>>>>>>>>>>> > those good clients that use the same principal as a rogue one
>>>>>>>>>>> will get
>>>>>>>>>>> > throttled?
>>>>>>>>>>> >
>>>>>>>>>>> > If this is the case, then I believe it should be okay as other
>>>>>>>>>>> throttling
>>>>>>>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > What about applying limit/throttling to
>>>>>>>>>>> > *`/config/users/<user>/clients/<client-id>`
>>>>>>>>>>> > *similar to what we have with client quota? This should reduce
>>>>>>>>>>> the concern
>>>>>>>>>>> > about throttling good clients, right?
>>>>>>>>>>> >
>>>>>>>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>>>>>>> > Yes, We were thinking about throttling by KafkaPrinciple.
>>>>>>>>>>> Client Id is
>>>>>>>>>>> > also workable.
>>>>>>>>>>> > It's just these 2 attributes are not required.
>>>>>>>>>>> > That is, it's possible we take all clients as the same one:
>>>>>>>>>>> {default
>>>>>>>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>>>>>>> > Do you have any thoughts about it?
>>>>>>>>>>> > Maybe skip throttling for {default KafkaPrinciple + default
>>>>>>>>>>> clientID} ?
>>>>>>>>>>> >
>>>>>>>>>>> > Luke
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim <
>>>>>>>>>>> > wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >> Hi Luke & Justine,
>>>>>>>>>>> >> Thanks for looking into this issue, we have been experiencing
>>>>>>>>>>> this because
>>>>>>>>>>> >> of rouge clients as well.
>>>>>>>>>>> >>
>>>>>>>>>>> >> > 3. Having a limit to the number of active producer IDs
>>>>>>>>>>> (sort of like an
>>>>>>>>>>> >> LRU
>>>>>>>>>>> >> >cache)
>>>>>>>>>>> >> >-> The idea here is that if we hit a misconfigured client,
>>>>>>>>>>> we will expire
>>>>>>>>>>> >> >the older entries. The concern here is we have risks to lose
>>>>>>>>>>> idempotency
>>>>>>>>>>> >> >guarantees, and currently, we don't have a way to notify
>>>>>>>>>>> clients about
>>>>>>>>>>> >> >losing idempotency guarantees. Besides, the least  recently
>>>>>>>>>>> used entries
>>>>>>>>>>> >> >got removed are not always from the "bad" clients.
>>>>>>>>>>> >>
>>>>>>>>>>> >> - I have some concerns about the impact of this option on the
>>>>>>>>>>> >> transactional
>>>>>>>>>>> >> producers, for example, what will happen to an ongoing
>>>>>>>>>>> transaction
>>>>>>>>>>> >> associated with an expired PID? Would this leave the
>>>>>>>>>>> transactions in a
>>>>>>>>>>> >> "hanging" state?
>>>>>>>>>>> >>
>>>>>>>>>>> >> - How will we notify the client that the transaction can't
>>>>>>>>>>> continue due to
>>>>>>>>>>> >> an expired PID?
>>>>>>>>>>> >>
>>>>>>>>>>> >> - If PID got marked as `expired` this will mean that
>>>>>>>>>>> >> `admin.DescribeProducers` will not list them which will make
>>>>>>>>>>> >> *`
>>>>>>>>>>> >> --list`* a bit tricky as we can't identify if there are
>>>>>>>>>>> transactions
>>>>>>>>>>> >> linked
>>>>>>>>>>> >> to this expired PID or not. The same concern applies to
>>>>>>>>>>> >> *`
>>>>>>>>>>> >> --find-hanging`*.
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>> >> >5. limit/throttling the producer id based on the principle
>>>>>>>>>>> >> >-> Although we can limit the impact to a certain principle
>>>>>>>>>>> with this
>>>>>>>>>>> >> idea,
>>>>>>>>>>> >> >same concern still exists as solution #1 #2.
>>>>>>>>>>> >>
>>>>>>>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your
>>>>>>>>>>> concern here
>>>>>>>>>>> >> that
>>>>>>>>>>> >> those good clients that use the same principal as a rogue one
>>>>>>>>>>> will get
>>>>>>>>>>> >> throttled?
>>>>>>>>>>> >>
>>>>>>>>>>> >> If this is the case, then I believe it should be okay as
>>>>>>>>>>> other throttling
>>>>>>>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>>>>>>> >>
>>>>>>>>>>> >>
>>>>>>>>>>> >> What about applying limit/throttling to
>>>>>>>>>>> >> *`/config/users/<user>/clients/<client-id>`
>>>>>>>>>>> >> *similar to what we have with client quota? This should
>>>>>>>>>>> reduce the concern
>>>>>>>>>>> >> about throttling good clients, right?
>>>>>>>>>>> >>
>>>>>>>>>>> >> best,
>>>>>>>>>>> >>
>>>>>>>>>>> >> Omnia
>>>>>>>>>>> >>
>>>>>>>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >>
>>>>>>>>>>> >> > Bump this thread to see if there are any comments/thoughts.
>>>>>>>>>>> >> > Thanks.
>>>>>>>>>>> >> >
>>>>>>>>>>> >> > Luke
>>>>>>>>>>> >> >
>>>>>>>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <
>>>>>>>>>>>> wrote:
>>>>>>>>>>> >> >
>>>>>>>>>>> >> > > Hi devs,
>>>>>>>>>>> >> > >
>>>>>>>>>>> >> > > As stated in the motivation section in KIP-854
>>>>>>>>>>> >> > > <
>>>>>>>>>>> >> >
>>>>>>>>>>> >>
>>>>>>>>>>> >> > >:
>>>>>>>>>>> >> > >
>>>>>>>>>>> >> > > With idempotent producers becoming the default in Kafka,
>>>>>>>>>>> this means
>>>>>>>>>>> >> that
>>>>>>>>>>> >> > > unless otherwise specified, all new producers will be
>>>>>>>>>>> given producer
>>>>>>>>>>> >> IDs.
>>>>>>>>>>> >> > > Some (inefficient) applications may now create many
>>>>>>>>>>> non-transactional
>>>>>>>>>>> >> > > idempotent producers. Each of these producers will be
>>>>>>>>>>> assigned a
>>>>>>>>>>> >> producer
>>>>>>>>>>> >> > > ID and these IDs and their metadata are stored in the
>>>>>>>>>>> broker memory,
>>>>>>>>>>> >> > which
>>>>>>>>>>> >> > > might cause brokers out of memory.
>>>>>>>>>>> >> > >
>>>>>>>>>>> >> > > Justine (in cc.) and I and some other team members are
>>>>>>>>>>> working on the
>>>>>>>>>>> >> > > solutions for this issue. But none of them solves it
>>>>>>>>>>> completely
>>>>>>>>>>> >> without
>>>>>>>>>>> >> > > side effects. Among them, "availability" VS "idempotency
>>>>>>>>>>> guarantees"
>>>>>>>>>>> >> is
>>>>>>>>>>> >> > > what we can't decide which to sacrifice. Some of these
>>>>>>>>>>> solutions
>>>>>>>>>>> >> > sacrifice
>>>>>>>>>>> >> > > availability of produce (1,2,5) and others sacrifice
>>>>>>>>>>> idempotency
>>>>>>>>>>> >> > guarantees
>>>>>>>>>>> >> > > (3). It could be useful to know if people generally have
>>>>>>>>>>> a preference
>>>>>>>>>>> >> one
>>>>>>>>>>> >> > > way or the other. Or what other better solutions there
>>>>>>>>>>> might be.
>>>>>>>>>>> >> > >
>>>>>>>>>>> >> > > Here are the proposals we came up with:
>>>>>>>>>>> >> > >
>>>>>>>>>>> >> > > 1. Limit the total active producer ID allocation number.
>>>>>>>>>>> >> > > -> This is the simplest solution. But since the OOM issue
>>>>>>>>>>> is usually
>>>>>>>>>>> >> > > caused by a rogue or misconfigured client, and this
>>>>>>>>>>> solution might
>>>>>>>>>>> >> > "punish"
>>>>>>>>>>> >> > > the good client from sending messages.
>>>>>>>>>>> >> > >
>>>>>>>>>>> >> > > 2. Throttling the producer ID allocation rate
>>>>>>>>>>> >> > > -> Same concern as the solution #1.
>>>>>>>>>>> >> > >
>>>>>>>>>>> >> > > 3. Having a limit to the number of active producer IDs
>>>>>>>>>>> (sort of like
>>>>>>>>>>> >> an
>>>>>>>>>>> >> > > LRU cache)
>>>>>>>>>>> >> > > -> The idea here is that if we hit a misconfigured
>>>>>>>>>>> client, we will
>>>>>>>>>>> >> expire
>>>>>>>>>>> >> > > the older entries. The concern here is we have risks to
>>>>>>>>>>> lose
>>>>>>>>>>> >> idempotency
>>>>>>>>>>> >> > > guarantees, and currently, we don't have a way to notify
>>>>>>>>>>> clients about
>>>>>>>>>>> >> > > losing idempotency guarantees. Besides, the least
>>>>>>>>>>> recently used
>>>>>>>>>>> >> entries
>>>>>>>>>>> >> > > got removed are not always from the "bad" clients.
>>>>>>>>>>> >> > >
>>>>>>>>>>> >> > > 4. allow clients to "close" the producer ID usage
>>>>>>>>>>> >> > > -> We can provide a way for producer to "close"
>>>>>>>>>>> producerID usage.
>>>>>>>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID
>>>>>>>>>>> requested to
>>>>>>>>>>> >> allocate
>>>>>>>>>>> >> > > one. After that, we'll keep the producer ID metadata in
>>>>>>>>>>> broker even if
>>>>>>>>>>> >> > the
>>>>>>>>>>> >> > > producer is "closed". Having a closed API (ex:
>>>>>>>>>>> END_PRODUCER_ID), we
>>>>>>>>>>> >> can
>>>>>>>>>>> >> > > remove the entry from broker side. In client side, we can
>>>>>>>>>>> send it when
>>>>>>>>>>> >> > > producer closing. The concern is, the old clients
>>>>>>>>>>> (including non-java
>>>>>>>>>>> >> > > clients) will still suffer from the OOM issue.
>>>>>>>>>>> >> > >
>>>>>>>>>>> >> > > 5. limit/throttling the producer id based on the principle
>>>>>>>>>>> >> > > -> Although we can limit the impact to a certain
>>>>>>>>>>> principle with this
>>>>>>>>>>> >> > idea,
>>>>>>>>>>> >> > > same concern still exists as solution #1 #2.
>>>>>>>>>>> >> > >
>>>>>>>>>>> >> > > Any thoughts/feedback are welcomed.
>>>>>>>>>>> >> > >
>>>>>>>>>>> >> > > Thank you.
>>>>>>>>>>> >> > > Luke
>>>>>>>>>>> >> > >
>>>>>>>>>>> >> >
>>>>>>>>>>> >>
>>>>>>>>>>> >

Reply via email to