Hi Luke and Justine. There are few updates on KIP-936  
https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
 to introduce throttling on PIDs per User and would love to hear your feedback 
in the discussion thread  
https://lists.apache.org/thread/nxp395zmvc0s8r4ohg91kdb19dxsbxlt if you have 
time. 

Thanks 
Omnia

> On 6 Jun 2023, at 15:04, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote:
> 
> Thanks, Luke for the feedback
> 
>> 1. how do we store value in bloom filter? It's unclear from this KIP that 
>> what we store inside bloom filter, and how we throttle them.
>> My understanding is, we have a map with key = kafkaPrinciple, and value = 
>> PID for each bloom filter.
>> And when new PID created for a userA, we update the map to add PID into the 
>> cache value (i.e. the bloom filter)
>> When the window passed half of the time, we created another bloom filter, 
>> and this time, when new PID comes, we check if this new PID existed in 
>> previous bloom filter, if not, we add into the new bloom filter. And in the 
>> meantime, we track the "new created" count (filtered by previous bloom 
>> filter) for throttling the users.
>> Is my understanding correct?
> 
> Not quite what am proposing. I'm proposing 
> a cache layer to be used only for checking if we encountered the PID before 
> or not for a given KafkaPrincipal. Not as a counter.
> if the cache layer doesn't contain the PIDs I'll increment the metric sensor 
> using Sensor::record (the sensor will be created during the initial 
> interaction with this KafkaPrincipal). Sensor::record fails with 
> QuotaViolationException when we reach the max of the sensor.
> If incrementing the sensor didn't fail with QuotaViolationException then I'll 
> add the PID to the cache for the next time
> To achieve this I'm proposing this the cache layer will be represented as a 
> "cachedMap = Map<KafkaPrincipal, Map<Timestamp, SimpleBloomFilter<PIDs>>". I 
> wrapped "Map<Timestamp, SimpleBloomFilter<PIDs>>" into 
> "TimedControlledBloomFilter" where we decide which bloom filter to write to, 
> which bloomfilter to delete, etc.
> 
> When we encounter the producer for the first time,
> the new quota manager will create the sensor and update its value.
> Then it will update the cache with this PID for the next time.
> The cache will create an entry for the user in the cachedMap
> The cache will be like this
> Map { "UserA" -> TimedBloomFilter {
>                                bloom_filter_1_create_timestamp -> 
> bloom_filter_1
>                              }
>         }
> the PID will be added to bloom_filter_1
> 2. If the producer tries to produce with the same PID the next time; the 
> quota manager will not update the sensor or the cache. And will not throttle 
> the user.
> 3. However, if the producer tries to produce with a new PID, it will be added 
> to bloom_filter_1 as long as we are within the first half of 
> producer.id.quota.window.size.seconds
> 4. If the producer sends any new PIDs after the first half of 
> producer.id.quota.window.size.seconds, we will create a new bloom filter to 
> store the PIDs from the next half of the window
> The cache will be like this
> Map { "UserA" -> TimedBloomFilter {
>                                bloom_filter_1_create_timestamp -> 
> bloom_filter_1
>                                bloom_filter_2_create_timestamp -> 
> bloom_filter_2
>                              }
>         }
> All PIDs from this point until the end of this window will be added to 
> bloom_filter_2.
> And both bloom_filter_1 and bloom_filter_2 will be used for checking if we 
> came across PID before or not.
> 5. a scheduler will run in the background to delete any bloom filter with 
> create_timestamp >= producer.id.quota.window.size.seconds from 
> TimedBloomFilter automatically
> 6. If the user stopped producing and all its bloom filters got deleted by the 
> scheduler the user entry will be removed from the cachedMap.
> 
> I updated the KIP to add more clarification to this caching layer.
> 
>> 2. what will user get when they can't allocate new PID due to throttling?
> 
> 
> The client will get `QuotaViolationException` similar to  ClientQuotaManager 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936:+Throttle+number+of+active+PIDs#KIP936:ThrottlenumberofactivePIDs-ClientErrors
> 
>> 3. This config: producer.id.quota.window.num is unclear to me?
> 
> `quota.window.num` is a standard config between all Kafka quota types. I am 
> re-using the same description and default value in the existing Kafka 
> codebase (am not a fan of the description as it's not clear). The sample here 
> is referring to the sliding time window. Kafka Quotas keeps 10 in memory + 
> the current window so in total they are 11.
> 
> > Finally, I think this KIP is good to have an official KIP discuss thread 
> > for community review.
> I will open the official KIP discussion today.
> 
> Thanks
> Omnia
> 
> On Tue, Jun 6, 2023 at 10:19 AM Luke Chen <show...@apache.org 
> <mailto:show...@apache.org>> wrote:
>> Hi Omnia,
>> 
>> I finally got time to check this KIP.
>> Thanks for putting all this together.
>> 
>> Questions:
>> 1. how do we store value in bloom filter? It's unclear from this KIP that 
>> what we store inside bloom filter, and how we throttle them.
>> My understanding is, we have a map with key = kafkaPrinciple, and value = 
>> PID for each bloom filter.
>> And when new PID created for a userA, we update the map to add PID into the 
>> cache value (i.e. the bloom filter)
>> When the window passed half of the time, we created another bloom filter, 
>> and this time, when new PID comes, we check if this new PID existed in 
>> previous bloom filter, if not, we add into the new bloom filter. And in the 
>> meantime, we track the "new created" count (filtered by previous bloom 
>> filter) for throttling the users.
>> Is my understanding correct?
>> 
>> 2. what will user get when they can't allocate new PID due to throttling?
>> You might need to address in the KIP.
>> 
>> 3. This config: producer.id.quota.window.num is unclear to me?
>> "The number of samples to retain in memory for alter producer id quotas"
>> What's the "samples" mean here? Do you mean we only track the top 11 
>> kafkaPrinciple usage each window?
>> 
>> Finally, I think this KIP is good to have an official KIP discuss thread for 
>> community review.
>> Thanks for the KIP!
>> 
>> Luke
>> 
>> 
>> 
>> 
>> On Mon, Jun 5, 2023 at 11:44 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com 
>> <mailto:o.g.h.ibra...@gmail.com>> wrote:
>>> Hi Justine, Thanks for having a look
>>> > One question I have is how will we handle a scenario where potentially 
>>> > each new client has a new Kafka Principal? Is that simply not covered by 
>>> > throttling?
>>> if any new client setup a new principal they will be throttled based on the 
>>> throttling for `/config/users/<default>` or 
>>> /config/users/<the_new_principal>
>>> 
>>> 
>>> On Wed, May 31, 2023 at 6:50 PM Justine Olshan <jols...@confluent.io 
>>> <mailto:jols...@confluent.io>> wrote:
>>>> Hey Omnia,
>>>> 
>>>> I was doing a bit of snooping (I actually just get updates for the KIP 
>>>> page) and I saw this draft was in progress. I shared it with some of my 
>>>> colleagues as well who I previously discussed the issue with. 
>>>> 
>>>> The initial look was pretty promising to me. I appreciate the detailing of 
>>>> the rejected options since we had quite a few we worked through :) 
>>>> 
>>>> One question I have is how will we handle a scenario where potentially 
>>>> each new client has a new Kafka Principal? Is that simply not covered by 
>>>> throttling?
>>>> 
>>>> Thanks,
>>>> Justine
>>>> 
>>>> On Wed, May 31, 2023 at 10:08 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com 
>>>> <mailto:o.g.h.ibra...@gmail.com>> wrote:
>>>>> Hi Justine and Luke,
>>>>> 
>>>>> I started a KIP draft here 
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
>>>>>   for a proposal would appreciate it if you could provide any initial 
>>>>> feedback before opening a broader discussion.
>>>>> 
>>>>> Thanks
>>>>> 
>>>>> On Wed, Feb 22, 2023 at 4:35 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com 
>>>>> <mailto:o.g.h.ibra...@gmail.com>> wrote:
>>>>>> Hi Justine,
>>>>>> My initial thought of throttling the initProducerId was to get ripped 
>>>>>> off the problem at the source (which creates too many PIDs per client) 
>>>>>> and fail faster but if having this on the produce request level is 
>>>>>> easier this should be fine. I am guessing it will be the same direction 
>>>>>> as we may ClientQuotaManage for Produce throttling with a different 
>>>>>> quota window than `quota.window.size.seconds `.
>>>>>> 
>>>>>> If this is good as an initial solution I can put start a KIP and see 
>>>>>> what the wider community feels about this.
>>>>>> 
>>>>>> Also, I noticed that at some point one of us hit "Replay" instead of 
>>>>>> "Replay to All" :)  So here are the previous conversations
>>>>>> 
>>>>>> On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io 
>>>>>> <mailto:jols...@confluent.io>> wrote:
>>>>>>> Hey Omnia,
>>>>>>> 
>>>>>>> Thanks for the response. I think I understand your explanations here 
>>>>>>> with respect to principal and clientId usage.
>>>>>>> 
>>>>>>> For the throttling -- handleInitProducerIdRequest will allocate the ID 
>>>>>>> to the producer, but we don't actually store it on the broker or 
>>>>>>> increment our metric until the first produce request for that producer 
>>>>>>> is sent (or sent again after previously expiring). Would you consider 
>>>>>>> throttling the produce request instead? It may be hard to get any 
>>>>>>> metrics from the transaction coordinator where the initProducerId 
>>>>>>> request is handled.
>>>>>>> 
>>>>>>> Justine
>>>>>> 
>>>>>>  
>>>>>> On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com 
>>>>>> <mailto:o.g.h.ibra...@gmail.com>> wrote:
>>>>>>> Hey Justine, 
>>>>>>> > If I understand your message correctly, there are issues with 
>>>>>>> > identifying the source of the rogue clients? So you propose to add a 
>>>>>>> > new metric for that? 
>>>>>>> > And also proposing to throttle based on clientId as a potential 
>>>>>>> > follow up?
>>>>>>> I want to identify rogue clients by KafkaPrincipal (and/or clientId) 
>>>>>>> similarly to how we identify clients in Fetch/Produce/Request 
>>>>>>> QuotaManagers. Using KafkaPrincipal should give cluster admin the 
>>>>>>> ability to throttle later based on principal which is most likely to be 
>>>>>>> a smaller set than clientIds. My initial thought was to add a metrics 
>>>>>>> that represent how many InitProducerIDRequest are sent by 
>>>>>>> KafkaPrincipal (and/or clientId) similar to Fetch/Produce QuotaManagers.
>>>>>>> Then as a follow-up, we can throttle based on either KafkaPrinciple or 
>>>>>>> clientId (maybe default as well to align this with other QuotaManagers 
>>>>>>> in Kafka).
>>>>>>> 
>>>>>>> >1. Does we rely on the client using the same ID? What if there are 
>>>>>>> >many clients that all use different client IDs?
>>>>>>> This is why I want to use the combination of KafkaPrincipal or clientId 
>>>>>>> similar to some other quotas we have in Kafka already. This will be a 
>>>>>>> similar risk to Fetch/Produce quota in Kafka which also relay on the 
>>>>>>> client to use the same clientId and KafkaPrincipal.
>>>>>>> 
>>>>>>> >2. Are there places where high cardinality of this metric is a 
>>>>>>> >concern? I can imagine many client IDs in the system. Would we treat 
>>>>>>> >this as a rate metric (ie, when we get an init producer ID and return 
>>>>>>> >a new producer ID we emit a count for that client id?) Or something 
>>>>>>> >else?
>>>>>>> My initial thought here was to follow the steps of ClientQuotaManager 
>>>>>>> and ClientRequestQuotaManager and use a rate metric. However, I think 
>>>>>>> we can emit it either
>>>>>>> when we return the new PID. However, I have concerns that we may circle 
>>>>>>> back to the previous concerns with OMM due to keeping track of ACTIVE 
>>>>>>> PIDs per KafkaPrincipal(and/or) clientId in the future. Also this would 
>>>>>>> be the first time Kafka throttle IDs for any client.
>>>>>>> or once we recieve initProducerIDRequest and throttle before even 
>>>>>>> hitting `handleInitProducerIdRequest`. Going this direction we may need 
>>>>>>> to throttle it within a different quota window than 
>>>>>>> `quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request per 
>>>>>>> second wouldn't be efficient for most cases. I personally think this 
>>>>>>> direction is easier as it seems more consistent with the existing quota 
>>>>>>> implementation. Specially that Kafka has already the concept of 
>>>>>>> throttling subset of requests (in ControllerMutationQuotaManager) but 
>>>>>>> never had any concept of throttling active IDs for any client.
>>>>>>> 
>>>>>>> What do you think?
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Omnia
>>>>>>  
>>>>>> On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io 
>>>>>> <mailto:jols...@confluent.io>> wrote:
>>>>>>> Hey Omnia, 
>>>>>>> Sorry for losing track of this. 
>>>>>>> 
>>>>>>> If I understand your message correctly, there are issues with 
>>>>>>> identifying the source of the rogue clients? So you propose to add a 
>>>>>>> new metric for that? 
>>>>>>> And also proposing to throttle based on clientId as a potential follow 
>>>>>>> up?
>>>>>>> 
>>>>>>> I think both of these make sense. The only things I can think of for 
>>>>>>> the metric are:
>>>>>>> 1. Does we rely on the client using the same ID? What if there are many 
>>>>>>> clients that all use different client IDs?
>>>>>>> 2. Are there places where high cardinality of this metric is a concern? 
>>>>>>> I can imagine many client IDs in the system. Would we treat this as a 
>>>>>>> rate metric (ie, when we get an init producer ID and return a new 
>>>>>>> producer ID we emit a count for that client id?) Or something else?
>>>>>>> 
>>>>>>> Thanks, 
>>>>>>> Justine
>>>>>> 
>>>>>> Thanks
>>>>>> Omnia
>>>>>> 
>>>>>> On Thu, Feb 2, 2023 at 4:44 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com 
>>>>>> <mailto:o.g.h.ibra...@gmail.com>> wrote:
>>>>>>> Hi Luke and Justine,
>>>>>>> Are there any thoughts or updates on this? I would love to help with 
>>>>>>> this as we are hitting this more frequently now. 
>>>>>>> 
>>>>>>> best,
>>>>>>> 
>>>>>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com 
>>>>>>> <mailto:o.g.h.ibra...@gmail.com>> wrote:
>>>>>>>> Hi Luke and Justine,
>>>>>>>>> For (3), you said:
>>>>>>>>> > - I have some concerns about the impact of this option on the
>>>>>>>>> transactional
>>>>>>>>> producers, for example, what will happen to an ongoing transaction
>>>>>>>>> associated with an expired PID? Would this leave the transactions in a
>>>>>>>>> "hanging" state?
>>>>>>>>> 
>>>>>>>>> - How will we notify the client that the transaction can't continue 
>>>>>>>>> due to
>>>>>>>>> an expired PID?
>>>>>>>>> 
>>>>>>>>> - If PID got marked as `expired` this will mean that
>>>>>>>>> `admin.DescribeProducers` will not list them which will make
>>>>>>>>> *`kafka-transactions.sh
>>>>>>>>> --list`* a bit tricky as we can't identify if there are transactions 
>>>>>>>>> linked
>>>>>>>>> to this expired PID or not. The same concern applies to
>>>>>>>>> *`kafka-transactions.sh
>>>>>>>>> --find-hanging`*.
>>>>>>>>> 
>>>>>>>>> --> Yes, you're right. Those are also concerns for this solution.
>>>>>>>>> Currently, there's no way to notify clients about the expiration.
>>>>>>>>> Also, the ongoing transactions will be hanging. For the admin cli, 
>>>>>>>>> we've
>>>>>>>>> never thought about it. Good point.
>>>>>>>>> In summary, to adopt this solution, there are many issues needed to 
>>>>>>>>> get
>>>>>>>>> fixed.
>>>>>>>> 
>>>>>>>> Justin already clarified that if PID is attached to a transaction it 
>>>>>>>> will not expire so identifying the transactions shouldn't be a concern 
>>>>>>>> anymore. 
>>>>>>>> The only concern here will be that this solution will not solve the 
>>>>>>>> problem if the rouge producer is a transactional producer with hanging 
>>>>>>>> transactions.
>>>>>>>> If anyone faced this situation they will need to abort the hanging 
>>>>>>>> transactions manually and then the solution to expire a PID can then 
>>>>>>>> work.
>>>>>>>> 
>>>>>>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id 
>>>>>>>>> is also
>>>>>>>>> workable.
>>>>>>>>> It's just these 2 attributes are not required.
>>>>>>>>> That is, it's possible we take all clients as the same one: {default
>>>>>>>>> KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>>>>> Do you have any thoughts about it?
>>>>>>>>> Maybe skip throttling for {default KafkaPrinciple + default clientID}
>>>>>>>> 
>>>>>>>> Throttling for default KafkaPrinciple and default ClientID is useful 
>>>>>>>> when we need to have a hard limit on the whole cluster and whoever is 
>>>>>>>> running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is 
>>>>>>>> reused between different producer applications.
>>>>>>>> I usually find it helpful to have a way to apply throttling only on 
>>>>>>>> the rough clients only once I identify them without punishing everyone 
>>>>>>>> on the cluster. However, there are two problems with this
>>>>>>>> - There's no easy way at the moment to link PIDs to clientId or 
>>>>>>>> KafkaPrinciple. This need to be addressed first.
>>>>>>>> - Is Justin's comment on the throttling, and the fact that will mean 
>>>>>>>> we either block all requests or have to store the request in memory 
>>>>>>>> which in both cases has side downs on the producer experince.
>>>>>>>> 
>>>>>>>>> I recently had another discussion with my team and it does seem like 
>>>>>>>>> there
>>>>>>>>> should be a way to make it more clear to the clients what is going 
>>>>>>>>> on. A
>>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a 
>>>>>>>>> way to
>>>>>>>>> improve the story for newer clients. (Ie if we choose to expire based 
>>>>>>>>> on a
>>>>>>>>> size limit, we should include a response indicating the ID has 
>>>>>>>>> expired.) We
>>>>>>>>> also discussed ways to redefine the guarantees so that users who have
>>>>>>>>> stronger idempotency requirements can ensure them (over 
>>>>>>>>> availability/memory
>>>>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>>> 
>>>>>>>> It may be easier to improve the experience for new clients. However, 
>>>>>>>> if we improved only the new clients we may need a way to help teams 
>>>>>>>> who run Kafka with rough clients on old versions by at least giving 
>>>>>>>> them an easy way to identify the clientId/ or KafkaPrinciple that 
>>>>>>>> generated these PIDs.
>>>>>>>> 
>>>>>>>> For context, it's very tricky to even identify which clientId is 
>>>>>>>> creating all these PIDs that caused OOM, which is a contributing part 
>>>>>>>> of the issue at the moment. So maybe one option here could be adding a 
>>>>>>>> new metric that tracks the number of generated PIDs per clientId. This 
>>>>>>>> will help the team who runs the Kafka cluster to 
>>>>>>>> - contact these rough clients and ask them to fix their clients or 
>>>>>>>> upgrade to a new client if the new client version has a better 
>>>>>>>> experience.
>>>>>>>> - or if ended with a throttling solution this may help identify which 
>>>>>>>> clientId needs to be throttled.
>>>>>>>> 
>>>>>>>> Maybe we can start with a solution for identifying the rough clients 
>>>>>>>> first and keep looking for a solution to limit them, what do you think?
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> 
>>>>>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan 
>>>>>>>> <jols...@confluent.io.invalid> wrote:
>>>>>>>>> Oops.  I realized I just replied to Omnia 🤦‍♀️
>>>>>>>>> 
>>>>>>>>> Here was my response for the mailing thread:
>>>>>>>>> 
>>>>>>>>> Hey Omnia,
>>>>>>>>> Sorry to hear this is a problem for you as well. :(
>>>>>>>>> > * I have some concerns about the impact of this option on the
>>>>>>>>> transactional producers, for example, what will happen to an ongoing
>>>>>>>>> transaction associated with an expired PID? Would this leave the
>>>>>>>>> transactions in a "hanging" state?*
>>>>>>>>> We currently check if a transaction is ongoing and do not expire the
>>>>>>>>> producer ID if it has an ongoing transaction. I suspect we will 
>>>>>>>>> continue to
>>>>>>>>> do this with any solution we pick.
>>>>>>>>> 
>>>>>>>>> My team members and I looked a bit into the throttling case and it 
>>>>>>>>> can get
>>>>>>>>> a bit tricky since it means we need to throttle the produce request 
>>>>>>>>> before
>>>>>>>>> it is processed. This means we either block all requests or have to 
>>>>>>>>> store
>>>>>>>>> the request in memory (which is not great if we are trying to save 
>>>>>>>>> memory).
>>>>>>>>> 
>>>>>>>>> I recently had another discussion with my team and it does seem like 
>>>>>>>>> there
>>>>>>>>> should be a way to make it more clear to the clients what is going 
>>>>>>>>> on. A
>>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a 
>>>>>>>>> way to
>>>>>>>>> improve the story for newer clients. (Ie if we choose to expire based 
>>>>>>>>> on a
>>>>>>>>> size limit, we should include a response indicating the ID has 
>>>>>>>>> expired.) We
>>>>>>>>> also discussed ways to redefine the guarantees so that users who have
>>>>>>>>> stronger idempotency requirements can ensure them (over 
>>>>>>>>> availability/memory
>>>>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>>>> 
>>>>>>>>> Thanks again for commenting here, hopefully we can come to a good 
>>>>>>>>> solution.
>>>>>>>>> 
>>>>>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com 
>>>>>>>>> <mailto:show...@gmail.com>> wrote:
>>>>>>>>> 
>>>>>>>>> > Hi Omnia,
>>>>>>>>> >
>>>>>>>>> > Thanks for your reply.
>>>>>>>>> >
>>>>>>>>> > For (3), you said:
>>>>>>>>> > > - I have some concerns about the impact of this option on the
>>>>>>>>> > transactional
>>>>>>>>> > producers, for example, what will happen to an ongoing transaction
>>>>>>>>> > associated with an expired PID? Would this leave the transactions 
>>>>>>>>> > in a
>>>>>>>>> > "hanging" state?
>>>>>>>>> >
>>>>>>>>> > - How will we notify the client that the transaction can't continue 
>>>>>>>>> > due to
>>>>>>>>> > an expired PID?
>>>>>>>>> >
>>>>>>>>> > - If PID got marked as `expired` this will mean that
>>>>>>>>> > `admin.DescribeProducers` will not list them which will make
>>>>>>>>> > *`kafka-transactions.sh
>>>>>>>>> > --list`* a bit tricky as we can't identify if there are 
>>>>>>>>> > transactions linked
>>>>>>>>> > to this expired PID or not. The same concern applies to
>>>>>>>>> > *`kafka-transactions.sh
>>>>>>>>> > --find-hanging`*.
>>>>>>>>> >
>>>>>>>>> > --> Yes, you're right. Those are also concerns for this solution.
>>>>>>>>> > Currently, there's no way to notify clients about the expiration.
>>>>>>>>> > Also, the ongoing transactions will be hanging. For the admin cli, 
>>>>>>>>> > we've
>>>>>>>>> > never thought about it. Good point.
>>>>>>>>> > In summary, to adopt this solution, there are many issues needed to 
>>>>>>>>> > get
>>>>>>>>> > fixed.
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > For (5), you said:
>>>>>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your concern 
>>>>>>>>> > > here
>>>>>>>>> > that
>>>>>>>>> > those good clients that use the same principal as a rogue one will 
>>>>>>>>> > get
>>>>>>>>> > throttled?
>>>>>>>>> >
>>>>>>>>> > If this is the case, then I believe it should be okay as other 
>>>>>>>>> > throttling
>>>>>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > What about applying limit/throttling to
>>>>>>>>> > *`/config/users/<user>/clients/<client-id>`
>>>>>>>>> > *similar to what we have with client quota? This should reduce the 
>>>>>>>>> > concern
>>>>>>>>> > about throttling good clients, right?
>>>>>>>>> >
>>>>>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client Id 
>>>>>>>>> > is
>>>>>>>>> > also workable.
>>>>>>>>> > It's just these 2 attributes are not required.
>>>>>>>>> > That is, it's possible we take all clients as the same one: {default
>>>>>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>>>>> > Do you have any thoughts about it?
>>>>>>>>> > Maybe skip throttling for {default KafkaPrinciple + default 
>>>>>>>>> > clientID} ?
>>>>>>>>> >
>>>>>>>>> > Luke
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim 
>>>>>>>>> > <o.g.h.ibra...@gmail.com <mailto:o.g.h.ibra...@gmail.com>>
>>>>>>>>> > wrote:
>>>>>>>>> >
>>>>>>>>> >> Hi Luke & Justine,
>>>>>>>>> >> Thanks for looking into this issue, we have been experiencing this 
>>>>>>>>> >> because
>>>>>>>>> >> of rouge clients as well.
>>>>>>>>> >>
>>>>>>>>> >> > 3. Having a limit to the number of active producer IDs (sort of 
>>>>>>>>> >> > like an
>>>>>>>>> >> LRU
>>>>>>>>> >> >cache)
>>>>>>>>> >> >-> The idea here is that if we hit a misconfigured client, we 
>>>>>>>>> >> >will expire
>>>>>>>>> >> >the older entries. The concern here is we have risks to lose 
>>>>>>>>> >> >idempotency
>>>>>>>>> >> >guarantees, and currently, we don't have a way to notify clients 
>>>>>>>>> >> >about
>>>>>>>>> >> >losing idempotency guarantees. Besides, the least  recently used 
>>>>>>>>> >> >entries
>>>>>>>>> >> >got removed are not always from the "bad" clients.
>>>>>>>>> >>
>>>>>>>>> >> - I have some concerns about the impact of this option on the
>>>>>>>>> >> transactional
>>>>>>>>> >> producers, for example, what will happen to an ongoing transaction
>>>>>>>>> >> associated with an expired PID? Would this leave the transactions 
>>>>>>>>> >> in a
>>>>>>>>> >> "hanging" state?
>>>>>>>>> >>
>>>>>>>>> >> - How will we notify the client that the transaction can't 
>>>>>>>>> >> continue due to
>>>>>>>>> >> an expired PID?
>>>>>>>>> >>
>>>>>>>>> >> - If PID got marked as `expired` this will mean that
>>>>>>>>> >> `admin.DescribeProducers` will not list them which will make
>>>>>>>>> >> *`kafka-transactions.sh
>>>>>>>>> >> --list`* a bit tricky as we can't identify if there are 
>>>>>>>>> >> transactions
>>>>>>>>> >> linked
>>>>>>>>> >> to this expired PID or not. The same concern applies to
>>>>>>>>> >> *`kafka-transactions.sh
>>>>>>>>> >> --find-hanging`*.
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> >5. limit/throttling the producer id based on the principle
>>>>>>>>> >> >-> Although we can limit the impact to a certain principle with 
>>>>>>>>> >> >this
>>>>>>>>> >> idea,
>>>>>>>>> >> >same concern still exists as solution #1 #2.
>>>>>>>>> >>
>>>>>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your concern 
>>>>>>>>> >> here
>>>>>>>>> >> that
>>>>>>>>> >> those good clients that use the same principal as a rogue one will 
>>>>>>>>> >> get
>>>>>>>>> >> throttled?
>>>>>>>>> >>
>>>>>>>>> >> If this is the case, then I believe it should be okay as other 
>>>>>>>>> >> throttling
>>>>>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> What about applying limit/throttling to
>>>>>>>>> >> *`/config/users/<user>/clients/<client-id>`
>>>>>>>>> >> *similar to what we have with client quota? This should reduce the 
>>>>>>>>> >> concern
>>>>>>>>> >> about throttling good clients, right?
>>>>>>>>> >>
>>>>>>>>> >> best,
>>>>>>>>> >>
>>>>>>>>> >> Omnia
>>>>>>>>> >>
>>>>>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com 
>>>>>>>>> >> <mailto:show...@gmail.com>> wrote:
>>>>>>>>> >>
>>>>>>>>> >> > Bump this thread to see if there are any comments/thoughts.
>>>>>>>>> >> > Thanks.
>>>>>>>>> >> >
>>>>>>>>> >> > Luke
>>>>>>>>> >> >
>>>>>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com 
>>>>>>>>> >> > <mailto:show...@gmail.com>> wrote:
>>>>>>>>> >> >
>>>>>>>>> >> > > Hi devs,
>>>>>>>>> >> > >
>>>>>>>>> >> > > As stated in the motivation section in KIP-854
>>>>>>>>> >> > > <
>>>>>>>>> >> >
>>>>>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry
>>>>>>>>> >> > >:
>>>>>>>>> >> > >
>>>>>>>>> >> > > With idempotent producers becoming the default in Kafka, this 
>>>>>>>>> >> > > means
>>>>>>>>> >> that
>>>>>>>>> >> > > unless otherwise specified, all new producers will be given 
>>>>>>>>> >> > > producer
>>>>>>>>> >> IDs.
>>>>>>>>> >> > > Some (inefficient) applications may now create many 
>>>>>>>>> >> > > non-transactional
>>>>>>>>> >> > > idempotent producers. Each of these producers will be assigned 
>>>>>>>>> >> > > a
>>>>>>>>> >> producer
>>>>>>>>> >> > > ID and these IDs and their metadata are stored in the broker 
>>>>>>>>> >> > > memory,
>>>>>>>>> >> > which
>>>>>>>>> >> > > might cause brokers out of memory.
>>>>>>>>> >> > >
>>>>>>>>> >> > > Justine (in cc.) and I and some other team members are working 
>>>>>>>>> >> > > on the
>>>>>>>>> >> > > solutions for this issue. But none of them solves it completely
>>>>>>>>> >> without
>>>>>>>>> >> > > side effects. Among them, "availability" VS "idempotency 
>>>>>>>>> >> > > guarantees"
>>>>>>>>> >> is
>>>>>>>>> >> > > what we can't decide which to sacrifice. Some of these 
>>>>>>>>> >> > > solutions
>>>>>>>>> >> > sacrifice
>>>>>>>>> >> > > availability of produce (1,2,5) and others sacrifice 
>>>>>>>>> >> > > idempotency
>>>>>>>>> >> > guarantees
>>>>>>>>> >> > > (3). It could be useful to know if people generally have a 
>>>>>>>>> >> > > preference
>>>>>>>>> >> one
>>>>>>>>> >> > > way or the other. Or what other better solutions there might 
>>>>>>>>> >> > > be.
>>>>>>>>> >> > >
>>>>>>>>> >> > > Here are the proposals we came up with:
>>>>>>>>> >> > >
>>>>>>>>> >> > > 1. Limit the total active producer ID allocation number.
>>>>>>>>> >> > > -> This is the simplest solution. But since the OOM issue is 
>>>>>>>>> >> > > usually
>>>>>>>>> >> > > caused by a rogue or misconfigured client, and this solution 
>>>>>>>>> >> > > might
>>>>>>>>> >> > "punish"
>>>>>>>>> >> > > the good client from sending messages.
>>>>>>>>> >> > >
>>>>>>>>> >> > > 2. Throttling the producer ID allocation rate
>>>>>>>>> >> > > -> Same concern as the solution #1.
>>>>>>>>> >> > >
>>>>>>>>> >> > > 3. Having a limit to the number of active producer IDs (sort 
>>>>>>>>> >> > > of like
>>>>>>>>> >> an
>>>>>>>>> >> > > LRU cache)
>>>>>>>>> >> > > -> The idea here is that if we hit a misconfigured client, we 
>>>>>>>>> >> > > will
>>>>>>>>> >> expire
>>>>>>>>> >> > > the older entries. The concern here is we have risks to lose
>>>>>>>>> >> idempotency
>>>>>>>>> >> > > guarantees, and currently, we don't have a way to notify 
>>>>>>>>> >> > > clients about
>>>>>>>>> >> > > losing idempotency guarantees. Besides, the least  recently 
>>>>>>>>> >> > > used
>>>>>>>>> >> entries
>>>>>>>>> >> > > got removed are not always from the "bad" clients.
>>>>>>>>> >> > >
>>>>>>>>> >> > > 4. allow clients to "close" the producer ID usage
>>>>>>>>> >> > > -> We can provide a way for producer to "close" producerID 
>>>>>>>>> >> > > usage.
>>>>>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested to
>>>>>>>>> >> allocate
>>>>>>>>> >> > > one. After that, we'll keep the producer ID metadata in broker 
>>>>>>>>> >> > > even if
>>>>>>>>> >> > the
>>>>>>>>> >> > > producer is "closed". Having a closed API (ex: 
>>>>>>>>> >> > > END_PRODUCER_ID), we
>>>>>>>>> >> can
>>>>>>>>> >> > > remove the entry from broker side. In client side, we can send 
>>>>>>>>> >> > > it when
>>>>>>>>> >> > > producer closing. The concern is, the old clients (including 
>>>>>>>>> >> > > non-java
>>>>>>>>> >> > > clients) will still suffer from the OOM issue.
>>>>>>>>> >> > >
>>>>>>>>> >> > > 5. limit/throttling the producer id based on the principle
>>>>>>>>> >> > > -> Although we can limit the impact to a certain principle 
>>>>>>>>> >> > > with this
>>>>>>>>> >> > idea,
>>>>>>>>> >> > > same concern still exists as solution #1 #2.
>>>>>>>>> >> > >
>>>>>>>>> >> > > Any thoughts/feedback are welcomed.
>>>>>>>>> >> > >
>>>>>>>>> >> > > Thank you.
>>>>>>>>> >> > > Luke
>>>>>>>>> >> > >
>>>>>>>>> >> >
>>>>>>>>> >>
>>>>>>>>> >
>>>>>> 
>>>>>> On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io 
>>>>>> <mailto:jols...@confluent.io>> wrote:
>>>>>>> Hey Omnia,
>>>>>>> 
>>>>>>> Thanks for the response. I think I understand your explanations here 
>>>>>>> with respect to principal and clientId usage.
>>>>>>> 
>>>>>>> For the throttling -- handleInitProducerIdRequest will allocate the ID 
>>>>>>> to the producer, but we don't actually store it on the broker or 
>>>>>>> increment our metric until the first produce request for that producer 
>>>>>>> is sent (or sent again after previously expiring). Would you consider 
>>>>>>> throttling the produce request instead? It may be hard to get any 
>>>>>>> metrics from the transaction coordinator where the initProducerId 
>>>>>>> request is handled.
>>>>>>> 
>>>>>>> Justine
>>>>>>> 
>>>>>>> On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com 
>>>>>>> <mailto:o.g.h.ibra...@gmail.com>> wrote:
>>>>>>>> Hey Justine, 
>>>>>>>> > If I understand your message correctly, there are issues with 
>>>>>>>> > identifying the source of the rogue clients? So you propose to add a 
>>>>>>>> > new metric for that? 
>>>>>>>> > And also proposing to throttle based on clientId as a potential 
>>>>>>>> > follow up?
>>>>>>>> I want to identify rogue clients by KafkaPrincipal (and/or clientId) 
>>>>>>>> similarly to how we identify clients in Fetch/Produce/Request 
>>>>>>>> QuotaManagers. Using KafkaPrincipal should give cluster admin the 
>>>>>>>> ability to throttle later based on principal which is most likely to 
>>>>>>>> be a smaller set than clientIds. My initial thought was to add a 
>>>>>>>> metrics that represent how many InitProducerIDRequest are sent by 
>>>>>>>> KafkaPrincipal (and/or clientId) similar to Fetch/Produce 
>>>>>>>> QuotaManagers.
>>>>>>>> Then as a follow-up, we can throttle based on either KafkaPrinciple or 
>>>>>>>> clientId (maybe default as well to align this with other QuotaManagers 
>>>>>>>> in Kafka).
>>>>>>>> 
>>>>>>>> >1. Does we rely on the client using the same ID? What if there are 
>>>>>>>> >many clients that all use different client IDs?
>>>>>>>> This is why I want to use the combination of KafkaPrincipal or 
>>>>>>>> clientId similar to some other quotas we have in Kafka already. This 
>>>>>>>> will be a similar risk to Fetch/Produce quota in Kafka which also 
>>>>>>>> relay on the client to use the same clientId and KafkaPrincipal.
>>>>>>>> 
>>>>>>>> >2. Are there places where high cardinality of this metric is a 
>>>>>>>> >concern? I can imagine many client IDs in the system. Would we treat 
>>>>>>>> >this as a rate metric (ie, when we get an init producer ID and return 
>>>>>>>> >a new producer ID we emit a count for that client id?) Or something 
>>>>>>>> >else?
>>>>>>>> My initial thought here was to follow the steps of ClientQuotaManager 
>>>>>>>> and ClientRequestQuotaManager and use a rate metric. However, I think 
>>>>>>>> we can emit it either
>>>>>>>> when we return the new PID. However, I have concerns that we may 
>>>>>>>> circle back to the previous concerns with OMM due to keeping track of 
>>>>>>>> ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also 
>>>>>>>> this would be the first time Kafka throttle IDs for any client.
>>>>>>>> or once we recieve initProducerIDRequest and throttle before even 
>>>>>>>> hitting `handleInitProducerIdRequest`. Going this direction we may 
>>>>>>>> need to throttle it within a different quota window than 
>>>>>>>> `quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request 
>>>>>>>> per second wouldn't be efficient for most cases. I personally think 
>>>>>>>> this direction is easier as it seems more consistent with the existing 
>>>>>>>> quota implementation. Specially that Kafka has already the concept of 
>>>>>>>> throttling subset of requests (in ControllerMutationQuotaManager) but 
>>>>>>>> never had any concept of throttling active IDs for any client.
>>>>>>>> 
>>>>>>>> What do you think?
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> Omnia
>>>>>>>> 
>>>>>>>> On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io 
>>>>>>>> <mailto:jols...@confluent.io>> wrote:
>>>>>>>>> Hey Omnia, 
>>>>>>>>> Sorry for losing track of this. 
>>>>>>>>> 
>>>>>>>>> If I understand your message correctly, there are issues with 
>>>>>>>>> identifying the source of the rogue clients? So you propose to add a 
>>>>>>>>> new metric for that? 
>>>>>>>>> And also proposing to throttle based on clientId as a potential 
>>>>>>>>> follow up?
>>>>>>>>> 
>>>>>>>>> I think both of these make sense. The only things I can think of for 
>>>>>>>>> the metric are:
>>>>>>>>> 1. Does we rely on the client using the same ID? What if there are 
>>>>>>>>> many clients that all use different client IDs?
>>>>>>>>> 2. Are there places where high cardinality of this metric is a 
>>>>>>>>> concern? I can imagine many client IDs in the system. Would we treat 
>>>>>>>>> this as a rate metric (ie, when we get an init producer ID and return 
>>>>>>>>> a new producer ID we emit a count for that client id?) Or something 
>>>>>>>>> else?
>>>>>>>>> 
>>>>>>>>> Thanks, 
>>>>>>>>> Justine
>>>>>>>>> 
>>>>>>>>> On Thu, Feb 2, 2023 at 8:44 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com 
>>>>>>>>> <mailto:o.g.h.ibra...@gmail.com>> wrote:
>>>>>>>>>> Hi Luke and Justine,
>>>>>>>>>> Are there any thoughts or updates on this? I would love to help with 
>>>>>>>>>> this as we are hitting this more frequently now. 
>>>>>>>>>> 
>>>>>>>>>> best,
>>>>>>>>>> 
>>>>>>>>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim 
>>>>>>>>>> <o.g.h.ibra...@gmail.com <mailto:o.g.h.ibra...@gmail.com>> wrote:
>>>>>>>>>>> Hi Luke and Justine,
>>>>>>>>>>>> For (3), you said:
>>>>>>>>>>>> > - I have some concerns about the impact of this option on the
>>>>>>>>>>>> transactional
>>>>>>>>>>>> producers, for example, what will happen to an ongoing transaction
>>>>>>>>>>>> associated with an expired PID? Would this leave the transactions 
>>>>>>>>>>>> in a
>>>>>>>>>>>> "hanging" state?
>>>>>>>>>>>> 
>>>>>>>>>>>> - How will we notify the client that the transaction can't 
>>>>>>>>>>>> continue due to
>>>>>>>>>>>> an expired PID?
>>>>>>>>>>>> 
>>>>>>>>>>>> - If PID got marked as `expired` this will mean that
>>>>>>>>>>>> `admin.DescribeProducers` will not list them which will make
>>>>>>>>>>>> *`kafka-transactions.sh
>>>>>>>>>>>> --list`* a bit tricky as we can't identify if there are 
>>>>>>>>>>>> transactions linked
>>>>>>>>>>>> to this expired PID or not. The same concern applies to
>>>>>>>>>>>> *`kafka-transactions.sh
>>>>>>>>>>>> --find-hanging`*.
>>>>>>>>>>>> 
>>>>>>>>>>>> --> Yes, you're right. Those are also concerns for this solution.
>>>>>>>>>>>> Currently, there's no way to notify clients about the expiration.
>>>>>>>>>>>> Also, the ongoing transactions will be hanging. For the admin cli, 
>>>>>>>>>>>> we've
>>>>>>>>>>>> never thought about it. Good point.
>>>>>>>>>>>> In summary, to adopt this solution, there are many issues needed 
>>>>>>>>>>>> to get
>>>>>>>>>>>> fixed.
>>>>>>>>>>> 
>>>>>>>>>>> Justin already clarified that if PID is attached to a transaction 
>>>>>>>>>>> it will not expire so identifying the transactions shouldn't be a 
>>>>>>>>>>> concern anymore. 
>>>>>>>>>>> The only concern here will be that this solution will not solve the 
>>>>>>>>>>> problem if the rouge producer is a transactional producer with 
>>>>>>>>>>> hanging transactions.
>>>>>>>>>>> If anyone faced this situation they will need to abort the hanging 
>>>>>>>>>>> transactions manually and then the solution to expire a PID can 
>>>>>>>>>>> then work.
>>>>>>>>>>> 
>>>>>>>>>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>>>>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client 
>>>>>>>>>>>> Id is also
>>>>>>>>>>>> workable.
>>>>>>>>>>>> It's just these 2 attributes are not required.
>>>>>>>>>>>> That is, it's possible we take all clients as the same one: 
>>>>>>>>>>>> {default
>>>>>>>>>>>> KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>>>>>>>> Do you have any thoughts about it?
>>>>>>>>>>>> Maybe skip throttling for {default KafkaPrinciple + default 
>>>>>>>>>>>> clientID}
>>>>>>>>>>> 
>>>>>>>>>>> Throttling for default KafkaPrinciple and default ClientID is 
>>>>>>>>>>> useful when we need to have a hard limit on the whole cluster and 
>>>>>>>>>>> whoever is running the cluster doesn't knowclientsntIDs or if a 
>>>>>>>>>>> KafkaPrinciple is reused between different producer applications.
>>>>>>>>>>> I usually find it helpful to have a way to apply throttling only on 
>>>>>>>>>>> the rough clients only once I identify them without punishing 
>>>>>>>>>>> everyone on the cluster. However, there are two problems with this
>>>>>>>>>>> - There's no easy way at the moment to link PIDs to clientId or 
>>>>>>>>>>> KafkaPrinciple. This need to be addressed first.
>>>>>>>>>>> - Is Justin's comment on the throttling, and the fact that will 
>>>>>>>>>>> mean we either block all requests or have to store the request in 
>>>>>>>>>>> memory which in both cases has side downs on the producer experince.
>>>>>>>>>>> 
>>>>>>>>>>>> I recently had another discussion with my team and it does seem 
>>>>>>>>>>>> like there
>>>>>>>>>>>> should be a way to make it more clear to the clients what is going 
>>>>>>>>>>>> on. A
>>>>>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is 
>>>>>>>>>>>> a way to
>>>>>>>>>>>> improve the story for newer clients. (Ie if we choose to expire 
>>>>>>>>>>>> based on a
>>>>>>>>>>>> size limit, we should include a response indicating the ID has 
>>>>>>>>>>>> expired.) We
>>>>>>>>>>>> also discussed ways to redefine the guarantees so that users who 
>>>>>>>>>>>> have
>>>>>>>>>>>> stronger idempotency requirements can ensure them (over 
>>>>>>>>>>>> availability/memory
>>>>>>>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>>>>>> 
>>>>>>>>>>> It may be easier to improve the experience for new clients. 
>>>>>>>>>>> However, if we improved only the new clients we may need a way to 
>>>>>>>>>>> help teams who run Kafka with rough clients on old versions by at 
>>>>>>>>>>> least giving them an easy way to identify the clientId/ or 
>>>>>>>>>>> KafkaPrinciple that generated these PIDs.
>>>>>>>>>>> 
>>>>>>>>>>> For context, it's very tricky to even identify which clientId is 
>>>>>>>>>>> creating all these PIDs that caused OOM, which is a contributing 
>>>>>>>>>>> part of the issue at the moment. So maybe one option here could be 
>>>>>>>>>>> adding a new metric that tracks the number of generated PIDs per 
>>>>>>>>>>> clientId. This will help the team who runs the Kafka cluster to 
>>>>>>>>>>> - contact these rough clients and ask them to fix their clients or 
>>>>>>>>>>> upgrade to a new client if the new client version has a better 
>>>>>>>>>>> experience.
>>>>>>>>>>> - or if ended with a throttling solution this may help identify 
>>>>>>>>>>> which clientId needs to be throttled.
>>>>>>>>>>> 
>>>>>>>>>>> Maybe we can start with a solution for identifying the rough 
>>>>>>>>>>> clients first and keep looking for a solution to limit them, what 
>>>>>>>>>>> do you think?
>>>>>>>>>>> 
>>>>>>>>>>> Thanks
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan 
>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
>>>>>>>>>>>> Oops.  I realized I just replied to Omnia 🤦‍♀️
>>>>>>>>>>>> 
>>>>>>>>>>>> Here was my response for the mailing thread:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hey Omnia,
>>>>>>>>>>>> Sorry to hear this is a problem for you as well. :(
>>>>>>>>>>>> > * I have some concerns about the impact of this option on the
>>>>>>>>>>>> transactional producers, for example, what will happen to an 
>>>>>>>>>>>> ongoing
>>>>>>>>>>>> transaction associated with an expired PID? Would this leave the
>>>>>>>>>>>> transactions in a "hanging" state?*
>>>>>>>>>>>> We currently check if a transaction is ongoing and do not expire 
>>>>>>>>>>>> the
>>>>>>>>>>>> producer ID if it has an ongoing transaction. I suspect we will 
>>>>>>>>>>>> continue to
>>>>>>>>>>>> do this with any solution we pick.
>>>>>>>>>>>> 
>>>>>>>>>>>> My team members and I looked a bit into the throttling case and it 
>>>>>>>>>>>> can get
>>>>>>>>>>>> a bit tricky since it means we need to throttle the produce 
>>>>>>>>>>>> request before
>>>>>>>>>>>> it is processed. This means we either block all requests or have 
>>>>>>>>>>>> to store
>>>>>>>>>>>> the request in memory (which is not great if we are trying to save 
>>>>>>>>>>>> memory).
>>>>>>>>>>>> 
>>>>>>>>>>>> I recently had another discussion with my team and it does seem 
>>>>>>>>>>>> like there
>>>>>>>>>>>> should be a way to make it more clear to the clients what is going 
>>>>>>>>>>>> on. A
>>>>>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is 
>>>>>>>>>>>> a way to
>>>>>>>>>>>> improve the story for newer clients. (Ie if we choose to expire 
>>>>>>>>>>>> based on a
>>>>>>>>>>>> size limit, we should include a response indicating the ID has 
>>>>>>>>>>>> expired.) We
>>>>>>>>>>>> also discussed ways to redefine the guarantees so that users who 
>>>>>>>>>>>> have
>>>>>>>>>>>> stronger idempotency requirements can ensure them (over 
>>>>>>>>>>>> availability/memory
>>>>>>>>>>>> concerns). Let me know if you have any ideas here.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks again for commenting here, hopefully we can come to a good 
>>>>>>>>>>>> solution.
>>>>>>>>>>>> 
>>>>>>>>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com 
>>>>>>>>>>>> <mailto:show...@gmail.com>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> > Hi Omnia,
>>>>>>>>>>>> >
>>>>>>>>>>>> > Thanks for your reply.
>>>>>>>>>>>> >
>>>>>>>>>>>> > For (3), you said:
>>>>>>>>>>>> > > - I have some concerns about the impact of this option on the
>>>>>>>>>>>> > transactional
>>>>>>>>>>>> > producers, for example, what will happen to an ongoing 
>>>>>>>>>>>> > transaction
>>>>>>>>>>>> > associated with an expired PID? Would this leave the 
>>>>>>>>>>>> > transactions in a
>>>>>>>>>>>> > "hanging" state?
>>>>>>>>>>>> >
>>>>>>>>>>>> > - How will we notify the client that the transaction can't 
>>>>>>>>>>>> > continue due to
>>>>>>>>>>>> > an expired PID?
>>>>>>>>>>>> >
>>>>>>>>>>>> > - If PID got marked as `expired` this will mean that
>>>>>>>>>>>> > `admin.DescribeProducers` will not list them which will make
>>>>>>>>>>>> > *`kafka-transactions.sh
>>>>>>>>>>>> > --list`* a bit tricky as we can't identify if there are 
>>>>>>>>>>>> > transactions linked
>>>>>>>>>>>> > to this expired PID or not. The same concern applies to
>>>>>>>>>>>> > *`kafka-transactions.sh
>>>>>>>>>>>> > --find-hanging`*.
>>>>>>>>>>>> >
>>>>>>>>>>>> > --> Yes, you're right. Those are also concerns for this solution.
>>>>>>>>>>>> > Currently, there's no way to notify clients about the expiration.
>>>>>>>>>>>> > Also, the ongoing transactions will be hanging. For the admin 
>>>>>>>>>>>> > cli, we've
>>>>>>>>>>>> > never thought about it. Good point.
>>>>>>>>>>>> > In summary, to adopt this solution, there are many issues needed 
>>>>>>>>>>>> > to get
>>>>>>>>>>>> > fixed.
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > For (5), you said:
>>>>>>>>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your 
>>>>>>>>>>>> > > concern here
>>>>>>>>>>>> > that
>>>>>>>>>>>> > those good clients that use the same principal as a rogue one 
>>>>>>>>>>>> > will get
>>>>>>>>>>>> > throttled?
>>>>>>>>>>>> >
>>>>>>>>>>>> > If this is the case, then I believe it should be okay as other 
>>>>>>>>>>>> > throttling
>>>>>>>>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > What about applying limit/throttling to
>>>>>>>>>>>> > *`/config/users/<user>/clients/<client-id>`
>>>>>>>>>>>> > *similar to what we have with client quota? This should reduce 
>>>>>>>>>>>> > the concern
>>>>>>>>>>>> > about throttling good clients, right?
>>>>>>>>>>>> >
>>>>>>>>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear)
>>>>>>>>>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client 
>>>>>>>>>>>> > Id is
>>>>>>>>>>>> > also workable.
>>>>>>>>>>>> > It's just these 2 attributes are not required.
>>>>>>>>>>>> > That is, it's possible we take all clients as the same one: 
>>>>>>>>>>>> > {default
>>>>>>>>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it.
>>>>>>>>>>>> > Do you have any thoughts about it?
>>>>>>>>>>>> > Maybe skip throttling for {default KafkaPrinciple + default 
>>>>>>>>>>>> > clientID} ?
>>>>>>>>>>>> >
>>>>>>>>>>>> > Luke
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim 
>>>>>>>>>>>> > <o.g.h.ibra...@gmail.com <mailto:o.g.h.ibra...@gmail.com>>
>>>>>>>>>>>> > wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> >> Hi Luke & Justine,
>>>>>>>>>>>> >> Thanks for looking into this issue, we have been experiencing 
>>>>>>>>>>>> >> this because
>>>>>>>>>>>> >> of rouge clients as well.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> > 3. Having a limit to the number of active producer IDs (sort 
>>>>>>>>>>>> >> > of like an
>>>>>>>>>>>> >> LRU
>>>>>>>>>>>> >> >cache)
>>>>>>>>>>>> >> >-> The idea here is that if we hit a misconfigured client, we 
>>>>>>>>>>>> >> >will expire
>>>>>>>>>>>> >> >the older entries. The concern here is we have risks to lose 
>>>>>>>>>>>> >> >idempotency
>>>>>>>>>>>> >> >guarantees, and currently, we don't have a way to notify 
>>>>>>>>>>>> >> >clients about
>>>>>>>>>>>> >> >losing idempotency guarantees. Besides, the least  recently 
>>>>>>>>>>>> >> >used entries
>>>>>>>>>>>> >> >got removed are not always from the "bad" clients.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> - I have some concerns about the impact of this option on the
>>>>>>>>>>>> >> transactional
>>>>>>>>>>>> >> producers, for example, what will happen to an ongoing 
>>>>>>>>>>>> >> transaction
>>>>>>>>>>>> >> associated with an expired PID? Would this leave the 
>>>>>>>>>>>> >> transactions in a
>>>>>>>>>>>> >> "hanging" state?
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> - How will we notify the client that the transaction can't 
>>>>>>>>>>>> >> continue due to
>>>>>>>>>>>> >> an expired PID?
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> - If PID got marked as `expired` this will mean that
>>>>>>>>>>>> >> `admin.DescribeProducers` will not list them which will make
>>>>>>>>>>>> >> *`kafka-transactions.sh
>>>>>>>>>>>> >> --list`* a bit tricky as we can't identify if there are 
>>>>>>>>>>>> >> transactions
>>>>>>>>>>>> >> linked
>>>>>>>>>>>> >> to this expired PID or not. The same concern applies to
>>>>>>>>>>>> >> *`kafka-transactions.sh
>>>>>>>>>>>> >> --find-hanging`*.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> >5. limit/throttling the producer id based on the principle
>>>>>>>>>>>> >> >-> Although we can limit the impact to a certain principle 
>>>>>>>>>>>> >> >with this
>>>>>>>>>>>> >> idea,
>>>>>>>>>>>> >> >same concern still exists as solution #1 #2.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your 
>>>>>>>>>>>> >> concern here
>>>>>>>>>>>> >> that
>>>>>>>>>>>> >> those good clients that use the same principal as a rogue one 
>>>>>>>>>>>> >> will get
>>>>>>>>>>>> >> throttled?
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> If this is the case, then I believe it should be okay as other 
>>>>>>>>>>>> >> throttling
>>>>>>>>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour.
>>>>>>>>>>>> >>
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> What about applying limit/throttling to
>>>>>>>>>>>> >> *`/config/users/<user>/clients/<client-id>`
>>>>>>>>>>>> >> *similar to what we have with client quota? This should reduce 
>>>>>>>>>>>> >> the concern
>>>>>>>>>>>> >> about throttling good clients, right?
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> best,
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> Omnia
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com 
>>>>>>>>>>>> >> <mailto:show...@gmail.com>> wrote:
>>>>>>>>>>>> >>
>>>>>>>>>>>> >> > Bump this thread to see if there are any comments/thoughts.
>>>>>>>>>>>> >> > Thanks.
>>>>>>>>>>>> >> >
>>>>>>>>>>>> >> > Luke
>>>>>>>>>>>> >> >
>>>>>>>>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com 
>>>>>>>>>>>> >> > <mailto:show...@gmail.com>> wrote:
>>>>>>>>>>>> >> >
>>>>>>>>>>>> >> > > Hi devs,
>>>>>>>>>>>> >> > >
>>>>>>>>>>>> >> > > As stated in the motivation section in KIP-854
>>>>>>>>>>>> >> > > <
>>>>>>>>>>>> >> >
>>>>>>>>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry
>>>>>>>>>>>> >> > >:
>>>>>>>>>>>> >> > >
>>>>>>>>>>>> >> > > With idempotent producers becoming the default in Kafka, 
>>>>>>>>>>>> >> > > this means
>>>>>>>>>>>> >> that
>>>>>>>>>>>> >> > > unless otherwise specified, all new producers will be given 
>>>>>>>>>>>> >> > > producer
>>>>>>>>>>>> >> IDs.
>>>>>>>>>>>> >> > > Some (inefficient) applications may now create many 
>>>>>>>>>>>> >> > > non-transactional
>>>>>>>>>>>> >> > > idempotent producers. Each of these producers will be 
>>>>>>>>>>>> >> > > assigned a
>>>>>>>>>>>> >> producer
>>>>>>>>>>>> >> > > ID and these IDs and their metadata are stored in the 
>>>>>>>>>>>> >> > > broker memory,
>>>>>>>>>>>> >> > which
>>>>>>>>>>>> >> > > might cause brokers out of memory.
>>>>>>>>>>>> >> > >
>>>>>>>>>>>> >> > > Justine (in cc.) and I and some other team members are 
>>>>>>>>>>>> >> > > working on the
>>>>>>>>>>>> >> > > solutions for this issue. But none of them solves it 
>>>>>>>>>>>> >> > > completely
>>>>>>>>>>>> >> without
>>>>>>>>>>>> >> > > side effects. Among them, "availability" VS "idempotency 
>>>>>>>>>>>> >> > > guarantees"
>>>>>>>>>>>> >> is
>>>>>>>>>>>> >> > > what we can't decide which to sacrifice. Some of these 
>>>>>>>>>>>> >> > > solutions
>>>>>>>>>>>> >> > sacrifice
>>>>>>>>>>>> >> > > availability of produce (1,2,5) and others sacrifice 
>>>>>>>>>>>> >> > > idempotency
>>>>>>>>>>>> >> > guarantees
>>>>>>>>>>>> >> > > (3). It could be useful to know if people generally have a 
>>>>>>>>>>>> >> > > preference
>>>>>>>>>>>> >> one
>>>>>>>>>>>> >> > > way or the other. Or what other better solutions there 
>>>>>>>>>>>> >> > > might be.
>>>>>>>>>>>> >> > >
>>>>>>>>>>>> >> > > Here are the proposals we came up with:
>>>>>>>>>>>> >> > >
>>>>>>>>>>>> >> > > 1. Limit the total active producer ID allocation number.
>>>>>>>>>>>> >> > > -> This is the simplest solution. But since the OOM issue 
>>>>>>>>>>>> >> > > is usually
>>>>>>>>>>>> >> > > caused by a rogue or misconfigured client, and this 
>>>>>>>>>>>> >> > > solution might
>>>>>>>>>>>> >> > "punish"
>>>>>>>>>>>> >> > > the good client from sending messages.
>>>>>>>>>>>> >> > >
>>>>>>>>>>>> >> > > 2. Throttling the producer ID allocation rate
>>>>>>>>>>>> >> > > -> Same concern as the solution #1.
>>>>>>>>>>>> >> > >
>>>>>>>>>>>> >> > > 3. Having a limit to the number of active producer IDs 
>>>>>>>>>>>> >> > > (sort of like
>>>>>>>>>>>> >> an
>>>>>>>>>>>> >> > > LRU cache)
>>>>>>>>>>>> >> > > -> The idea here is that if we hit a misconfigured client, 
>>>>>>>>>>>> >> > > we will
>>>>>>>>>>>> >> expire
>>>>>>>>>>>> >> > > the older entries. The concern here is we have risks to lose
>>>>>>>>>>>> >> idempotency
>>>>>>>>>>>> >> > > guarantees, and currently, we don't have a way to notify 
>>>>>>>>>>>> >> > > clients about
>>>>>>>>>>>> >> > > losing idempotency guarantees. Besides, the least  recently 
>>>>>>>>>>>> >> > > used
>>>>>>>>>>>> >> entries
>>>>>>>>>>>> >> > > got removed are not always from the "bad" clients.
>>>>>>>>>>>> >> > >
>>>>>>>>>>>> >> > > 4. allow clients to "close" the producer ID usage
>>>>>>>>>>>> >> > > -> We can provide a way for producer to "close" producerID 
>>>>>>>>>>>> >> > > usage.
>>>>>>>>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested 
>>>>>>>>>>>> >> > > to
>>>>>>>>>>>> >> allocate
>>>>>>>>>>>> >> > > one. After that, we'll keep the producer ID metadata in 
>>>>>>>>>>>> >> > > broker even if
>>>>>>>>>>>> >> > the
>>>>>>>>>>>> >> > > producer is "closed". Having a closed API (ex: 
>>>>>>>>>>>> >> > > END_PRODUCER_ID), we
>>>>>>>>>>>> >> can
>>>>>>>>>>>> >> > > remove the entry from broker side. In client side, we can 
>>>>>>>>>>>> >> > > send it when
>>>>>>>>>>>> >> > > producer closing. The concern is, the old clients 
>>>>>>>>>>>> >> > > (including non-java
>>>>>>>>>>>> >> > > clients) will still suffer from the OOM issue.
>>>>>>>>>>>> >> > >
>>>>>>>>>>>> >> > > 5. limit/throttling the producer id based on the principle
>>>>>>>>>>>> >> > > -> Although we can limit the impact to a certain principle 
>>>>>>>>>>>> >> > > with this
>>>>>>>>>>>> >> > idea,
>>>>>>>>>>>> >> > > same concern still exists as solution #1 #2.
>>>>>>>>>>>> >> > >
>>>>>>>>>>>> >> > > Any thoughts/feedback are welcomed.
>>>>>>>>>>>> >> > >
>>>>>>>>>>>> >> > > Thank you.
>>>>>>>>>>>> >> > > Luke
>>>>>>>>>>>> >> > >
>>>>>>>>>>>> >> >
>>>>>>>>>>>> >>
>>>>>>>>>>>> >

Reply via email to