Hi Luke and Justine. There are few updates on KIP-936 https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs to introduce throttling on PIDs per User and would love to hear your feedback in the discussion thread https://lists.apache.org/thread/nxp395zmvc0s8r4ohg91kdb19dxsbxlt if you have time.
Thanks Omnia > On 6 Jun 2023, at 15:04, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: > > Thanks, Luke for the feedback > >> 1. how do we store value in bloom filter? It's unclear from this KIP that >> what we store inside bloom filter, and how we throttle them. >> My understanding is, we have a map with key = kafkaPrinciple, and value = >> PID for each bloom filter. >> And when new PID created for a userA, we update the map to add PID into the >> cache value (i.e. the bloom filter) >> When the window passed half of the time, we created another bloom filter, >> and this time, when new PID comes, we check if this new PID existed in >> previous bloom filter, if not, we add into the new bloom filter. And in the >> meantime, we track the "new created" count (filtered by previous bloom >> filter) for throttling the users. >> Is my understanding correct? > > Not quite what am proposing. I'm proposing > a cache layer to be used only for checking if we encountered the PID before > or not for a given KafkaPrincipal. Not as a counter. > if the cache layer doesn't contain the PIDs I'll increment the metric sensor > using Sensor::record (the sensor will be created during the initial > interaction with this KafkaPrincipal). Sensor::record fails with > QuotaViolationException when we reach the max of the sensor. > If incrementing the sensor didn't fail with QuotaViolationException then I'll > add the PID to the cache for the next time > To achieve this I'm proposing this the cache layer will be represented as a > "cachedMap = Map<KafkaPrincipal, Map<Timestamp, SimpleBloomFilter<PIDs>>". I > wrapped "Map<Timestamp, SimpleBloomFilter<PIDs>>" into > "TimedControlledBloomFilter" where we decide which bloom filter to write to, > which bloomfilter to delete, etc. > > When we encounter the producer for the first time, > the new quota manager will create the sensor and update its value. > Then it will update the cache with this PID for the next time. > The cache will create an entry for the user in the cachedMap > The cache will be like this > Map { "UserA" -> TimedBloomFilter { > bloom_filter_1_create_timestamp -> > bloom_filter_1 > } > } > the PID will be added to bloom_filter_1 > 2. If the producer tries to produce with the same PID the next time; the > quota manager will not update the sensor or the cache. And will not throttle > the user. > 3. However, if the producer tries to produce with a new PID, it will be added > to bloom_filter_1 as long as we are within the first half of > producer.id.quota.window.size.seconds > 4. If the producer sends any new PIDs after the first half of > producer.id.quota.window.size.seconds, we will create a new bloom filter to > store the PIDs from the next half of the window > The cache will be like this > Map { "UserA" -> TimedBloomFilter { > bloom_filter_1_create_timestamp -> > bloom_filter_1 > bloom_filter_2_create_timestamp -> > bloom_filter_2 > } > } > All PIDs from this point until the end of this window will be added to > bloom_filter_2. > And both bloom_filter_1 and bloom_filter_2 will be used for checking if we > came across PID before or not. > 5. a scheduler will run in the background to delete any bloom filter with > create_timestamp >= producer.id.quota.window.size.seconds from > TimedBloomFilter automatically > 6. If the user stopped producing and all its bloom filters got deleted by the > scheduler the user entry will be removed from the cachedMap. > > I updated the KIP to add more clarification to this caching layer. > >> 2. what will user get when they can't allocate new PID due to throttling? > > > The client will get `QuotaViolationException` similar to ClientQuotaManager > https://cwiki.apache.org/confluence/display/KAFKA/KIP-936:+Throttle+number+of+active+PIDs#KIP936:ThrottlenumberofactivePIDs-ClientErrors > >> 3. This config: producer.id.quota.window.num is unclear to me? > > `quota.window.num` is a standard config between all Kafka quota types. I am > re-using the same description and default value in the existing Kafka > codebase (am not a fan of the description as it's not clear). The sample here > is referring to the sliding time window. Kafka Quotas keeps 10 in memory + > the current window so in total they are 11. > > > Finally, I think this KIP is good to have an official KIP discuss thread > > for community review. > I will open the official KIP discussion today. > > Thanks > Omnia > > On Tue, Jun 6, 2023 at 10:19 AM Luke Chen <show...@apache.org > <mailto:show...@apache.org>> wrote: >> Hi Omnia, >> >> I finally got time to check this KIP. >> Thanks for putting all this together. >> >> Questions: >> 1. how do we store value in bloom filter? It's unclear from this KIP that >> what we store inside bloom filter, and how we throttle them. >> My understanding is, we have a map with key = kafkaPrinciple, and value = >> PID for each bloom filter. >> And when new PID created for a userA, we update the map to add PID into the >> cache value (i.e. the bloom filter) >> When the window passed half of the time, we created another bloom filter, >> and this time, when new PID comes, we check if this new PID existed in >> previous bloom filter, if not, we add into the new bloom filter. And in the >> meantime, we track the "new created" count (filtered by previous bloom >> filter) for throttling the users. >> Is my understanding correct? >> >> 2. what will user get when they can't allocate new PID due to throttling? >> You might need to address in the KIP. >> >> 3. This config: producer.id.quota.window.num is unclear to me? >> "The number of samples to retain in memory for alter producer id quotas" >> What's the "samples" mean here? Do you mean we only track the top 11 >> kafkaPrinciple usage each window? >> >> Finally, I think this KIP is good to have an official KIP discuss thread for >> community review. >> Thanks for the KIP! >> >> Luke >> >> >> >> >> On Mon, Jun 5, 2023 at 11:44 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com >> <mailto:o.g.h.ibra...@gmail.com>> wrote: >>> Hi Justine, Thanks for having a look >>> > One question I have is how will we handle a scenario where potentially >>> > each new client has a new Kafka Principal? Is that simply not covered by >>> > throttling? >>> if any new client setup a new principal they will be throttled based on the >>> throttling for `/config/users/<default>` or >>> /config/users/<the_new_principal> >>> >>> >>> On Wed, May 31, 2023 at 6:50 PM Justine Olshan <jols...@confluent.io >>> <mailto:jols...@confluent.io>> wrote: >>>> Hey Omnia, >>>> >>>> I was doing a bit of snooping (I actually just get updates for the KIP >>>> page) and I saw this draft was in progress. I shared it with some of my >>>> colleagues as well who I previously discussed the issue with. >>>> >>>> The initial look was pretty promising to me. I appreciate the detailing of >>>> the rejected options since we had quite a few we worked through :) >>>> >>>> One question I have is how will we handle a scenario where potentially >>>> each new client has a new Kafka Principal? Is that simply not covered by >>>> throttling? >>>> >>>> Thanks, >>>> Justine >>>> >>>> On Wed, May 31, 2023 at 10:08 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com >>>> <mailto:o.g.h.ibra...@gmail.com>> wrote: >>>>> Hi Justine and Luke, >>>>> >>>>> I started a KIP draft here >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs >>>>> for a proposal would appreciate it if you could provide any initial >>>>> feedback before opening a broader discussion. >>>>> >>>>> Thanks >>>>> >>>>> On Wed, Feb 22, 2023 at 4:35 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com >>>>> <mailto:o.g.h.ibra...@gmail.com>> wrote: >>>>>> Hi Justine, >>>>>> My initial thought of throttling the initProducerId was to get ripped >>>>>> off the problem at the source (which creates too many PIDs per client) >>>>>> and fail faster but if having this on the produce request level is >>>>>> easier this should be fine. I am guessing it will be the same direction >>>>>> as we may ClientQuotaManage for Produce throttling with a different >>>>>> quota window than `quota.window.size.seconds `. >>>>>> >>>>>> If this is good as an initial solution I can put start a KIP and see >>>>>> what the wider community feels about this. >>>>>> >>>>>> Also, I noticed that at some point one of us hit "Replay" instead of >>>>>> "Replay to All" :) So here are the previous conversations >>>>>> >>>>>> On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io >>>>>> <mailto:jols...@confluent.io>> wrote: >>>>>>> Hey Omnia, >>>>>>> >>>>>>> Thanks for the response. I think I understand your explanations here >>>>>>> with respect to principal and clientId usage. >>>>>>> >>>>>>> For the throttling -- handleInitProducerIdRequest will allocate the ID >>>>>>> to the producer, but we don't actually store it on the broker or >>>>>>> increment our metric until the first produce request for that producer >>>>>>> is sent (or sent again after previously expiring). Would you consider >>>>>>> throttling the produce request instead? It may be hard to get any >>>>>>> metrics from the transaction coordinator where the initProducerId >>>>>>> request is handled. >>>>>>> >>>>>>> Justine >>>>>> >>>>>> >>>>>> On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com >>>>>> <mailto:o.g.h.ibra...@gmail.com>> wrote: >>>>>>> Hey Justine, >>>>>>> > If I understand your message correctly, there are issues with >>>>>>> > identifying the source of the rogue clients? So you propose to add a >>>>>>> > new metric for that? >>>>>>> > And also proposing to throttle based on clientId as a potential >>>>>>> > follow up? >>>>>>> I want to identify rogue clients by KafkaPrincipal (and/or clientId) >>>>>>> similarly to how we identify clients in Fetch/Produce/Request >>>>>>> QuotaManagers. Using KafkaPrincipal should give cluster admin the >>>>>>> ability to throttle later based on principal which is most likely to be >>>>>>> a smaller set than clientIds. My initial thought was to add a metrics >>>>>>> that represent how many InitProducerIDRequest are sent by >>>>>>> KafkaPrincipal (and/or clientId) similar to Fetch/Produce QuotaManagers. >>>>>>> Then as a follow-up, we can throttle based on either KafkaPrinciple or >>>>>>> clientId (maybe default as well to align this with other QuotaManagers >>>>>>> in Kafka). >>>>>>> >>>>>>> >1. Does we rely on the client using the same ID? What if there are >>>>>>> >many clients that all use different client IDs? >>>>>>> This is why I want to use the combination of KafkaPrincipal or clientId >>>>>>> similar to some other quotas we have in Kafka already. This will be a >>>>>>> similar risk to Fetch/Produce quota in Kafka which also relay on the >>>>>>> client to use the same clientId and KafkaPrincipal. >>>>>>> >>>>>>> >2. Are there places where high cardinality of this metric is a >>>>>>> >concern? I can imagine many client IDs in the system. Would we treat >>>>>>> >this as a rate metric (ie, when we get an init producer ID and return >>>>>>> >a new producer ID we emit a count for that client id?) Or something >>>>>>> >else? >>>>>>> My initial thought here was to follow the steps of ClientQuotaManager >>>>>>> and ClientRequestQuotaManager and use a rate metric. However, I think >>>>>>> we can emit it either >>>>>>> when we return the new PID. However, I have concerns that we may circle >>>>>>> back to the previous concerns with OMM due to keeping track of ACTIVE >>>>>>> PIDs per KafkaPrincipal(and/or) clientId in the future. Also this would >>>>>>> be the first time Kafka throttle IDs for any client. >>>>>>> or once we recieve initProducerIDRequest and throttle before even >>>>>>> hitting `handleInitProducerIdRequest`. Going this direction we may need >>>>>>> to throttle it within a different quota window than >>>>>>> `quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request per >>>>>>> second wouldn't be efficient for most cases. I personally think this >>>>>>> direction is easier as it seems more consistent with the existing quota >>>>>>> implementation. Specially that Kafka has already the concept of >>>>>>> throttling subset of requests (in ControllerMutationQuotaManager) but >>>>>>> never had any concept of throttling active IDs for any client. >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> Thanks >>>>>>> Omnia >>>>>> >>>>>> On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io >>>>>> <mailto:jols...@confluent.io>> wrote: >>>>>>> Hey Omnia, >>>>>>> Sorry for losing track of this. >>>>>>> >>>>>>> If I understand your message correctly, there are issues with >>>>>>> identifying the source of the rogue clients? So you propose to add a >>>>>>> new metric for that? >>>>>>> And also proposing to throttle based on clientId as a potential follow >>>>>>> up? >>>>>>> >>>>>>> I think both of these make sense. The only things I can think of for >>>>>>> the metric are: >>>>>>> 1. Does we rely on the client using the same ID? What if there are many >>>>>>> clients that all use different client IDs? >>>>>>> 2. Are there places where high cardinality of this metric is a concern? >>>>>>> I can imagine many client IDs in the system. Would we treat this as a >>>>>>> rate metric (ie, when we get an init producer ID and return a new >>>>>>> producer ID we emit a count for that client id?) Or something else? >>>>>>> >>>>>>> Thanks, >>>>>>> Justine >>>>>> >>>>>> Thanks >>>>>> Omnia >>>>>> >>>>>> On Thu, Feb 2, 2023 at 4:44 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com >>>>>> <mailto:o.g.h.ibra...@gmail.com>> wrote: >>>>>>> Hi Luke and Justine, >>>>>>> Are there any thoughts or updates on this? I would love to help with >>>>>>> this as we are hitting this more frequently now. >>>>>>> >>>>>>> best, >>>>>>> >>>>>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com >>>>>>> <mailto:o.g.h.ibra...@gmail.com>> wrote: >>>>>>>> Hi Luke and Justine, >>>>>>>>> For (3), you said: >>>>>>>>> > - I have some concerns about the impact of this option on the >>>>>>>>> transactional >>>>>>>>> producers, for example, what will happen to an ongoing transaction >>>>>>>>> associated with an expired PID? Would this leave the transactions in a >>>>>>>>> "hanging" state? >>>>>>>>> >>>>>>>>> - How will we notify the client that the transaction can't continue >>>>>>>>> due to >>>>>>>>> an expired PID? >>>>>>>>> >>>>>>>>> - If PID got marked as `expired` this will mean that >>>>>>>>> `admin.DescribeProducers` will not list them which will make >>>>>>>>> *`kafka-transactions.sh >>>>>>>>> --list`* a bit tricky as we can't identify if there are transactions >>>>>>>>> linked >>>>>>>>> to this expired PID or not. The same concern applies to >>>>>>>>> *`kafka-transactions.sh >>>>>>>>> --find-hanging`*. >>>>>>>>> >>>>>>>>> --> Yes, you're right. Those are also concerns for this solution. >>>>>>>>> Currently, there's no way to notify clients about the expiration. >>>>>>>>> Also, the ongoing transactions will be hanging. For the admin cli, >>>>>>>>> we've >>>>>>>>> never thought about it. Good point. >>>>>>>>> In summary, to adopt this solution, there are many issues needed to >>>>>>>>> get >>>>>>>>> fixed. >>>>>>>> >>>>>>>> Justin already clarified that if PID is attached to a transaction it >>>>>>>> will not expire so identifying the transactions shouldn't be a concern >>>>>>>> anymore. >>>>>>>> The only concern here will be that this solution will not solve the >>>>>>>> problem if the rouge producer is a transactional producer with hanging >>>>>>>> transactions. >>>>>>>> If anyone faced this situation they will need to abort the hanging >>>>>>>> transactions manually and then the solution to expire a PID can then >>>>>>>> work. >>>>>>>> >>>>>>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client Id >>>>>>>>> is also >>>>>>>>> workable. >>>>>>>>> It's just these 2 attributes are not required. >>>>>>>>> That is, it's possible we take all clients as the same one: {default >>>>>>>>> KafkaPrinciple + default clientID}, and apply throttling on it. >>>>>>>>> Do you have any thoughts about it? >>>>>>>>> Maybe skip throttling for {default KafkaPrinciple + default clientID} >>>>>>>> >>>>>>>> Throttling for default KafkaPrinciple and default ClientID is useful >>>>>>>> when we need to have a hard limit on the whole cluster and whoever is >>>>>>>> running the cluster doesn't knowclientsntIDs or if a KafkaPrinciple is >>>>>>>> reused between different producer applications. >>>>>>>> I usually find it helpful to have a way to apply throttling only on >>>>>>>> the rough clients only once I identify them without punishing everyone >>>>>>>> on the cluster. However, there are two problems with this >>>>>>>> - There's no easy way at the moment to link PIDs to clientId or >>>>>>>> KafkaPrinciple. This need to be addressed first. >>>>>>>> - Is Justin's comment on the throttling, and the fact that will mean >>>>>>>> we either block all requests or have to store the request in memory >>>>>>>> which in both cases has side downs on the producer experince. >>>>>>>> >>>>>>>>> I recently had another discussion with my team and it does seem like >>>>>>>>> there >>>>>>>>> should be a way to make it more clear to the clients what is going >>>>>>>>> on. A >>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a >>>>>>>>> way to >>>>>>>>> improve the story for newer clients. (Ie if we choose to expire based >>>>>>>>> on a >>>>>>>>> size limit, we should include a response indicating the ID has >>>>>>>>> expired.) We >>>>>>>>> also discussed ways to redefine the guarantees so that users who have >>>>>>>>> stronger idempotency requirements can ensure them (over >>>>>>>>> availability/memory >>>>>>>>> concerns). Let me know if you have any ideas here. >>>>>>>> >>>>>>>> It may be easier to improve the experience for new clients. However, >>>>>>>> if we improved only the new clients we may need a way to help teams >>>>>>>> who run Kafka with rough clients on old versions by at least giving >>>>>>>> them an easy way to identify the clientId/ or KafkaPrinciple that >>>>>>>> generated these PIDs. >>>>>>>> >>>>>>>> For context, it's very tricky to even identify which clientId is >>>>>>>> creating all these PIDs that caused OOM, which is a contributing part >>>>>>>> of the issue at the moment. So maybe one option here could be adding a >>>>>>>> new metric that tracks the number of generated PIDs per clientId. This >>>>>>>> will help the team who runs the Kafka cluster to >>>>>>>> - contact these rough clients and ask them to fix their clients or >>>>>>>> upgrade to a new client if the new client version has a better >>>>>>>> experience. >>>>>>>> - or if ended with a throttling solution this may help identify which >>>>>>>> clientId needs to be throttled. >>>>>>>> >>>>>>>> Maybe we can start with a solution for identifying the rough clients >>>>>>>> first and keep looking for a solution to limit them, what do you think? >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan >>>>>>>> <jols...@confluent.io.invalid> wrote: >>>>>>>>> Oops. I realized I just replied to Omnia 🤦♀️ >>>>>>>>> >>>>>>>>> Here was my response for the mailing thread: >>>>>>>>> >>>>>>>>> Hey Omnia, >>>>>>>>> Sorry to hear this is a problem for you as well. :( >>>>>>>>> > * I have some concerns about the impact of this option on the >>>>>>>>> transactional producers, for example, what will happen to an ongoing >>>>>>>>> transaction associated with an expired PID? Would this leave the >>>>>>>>> transactions in a "hanging" state?* >>>>>>>>> We currently check if a transaction is ongoing and do not expire the >>>>>>>>> producer ID if it has an ongoing transaction. I suspect we will >>>>>>>>> continue to >>>>>>>>> do this with any solution we pick. >>>>>>>>> >>>>>>>>> My team members and I looked a bit into the throttling case and it >>>>>>>>> can get >>>>>>>>> a bit tricky since it means we need to throttle the produce request >>>>>>>>> before >>>>>>>>> it is processed. This means we either block all requests or have to >>>>>>>>> store >>>>>>>>> the request in memory (which is not great if we are trying to save >>>>>>>>> memory). >>>>>>>>> >>>>>>>>> I recently had another discussion with my team and it does seem like >>>>>>>>> there >>>>>>>>> should be a way to make it more clear to the clients what is going >>>>>>>>> on. A >>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is a >>>>>>>>> way to >>>>>>>>> improve the story for newer clients. (Ie if we choose to expire based >>>>>>>>> on a >>>>>>>>> size limit, we should include a response indicating the ID has >>>>>>>>> expired.) We >>>>>>>>> also discussed ways to redefine the guarantees so that users who have >>>>>>>>> stronger idempotency requirements can ensure them (over >>>>>>>>> availability/memory >>>>>>>>> concerns). Let me know if you have any ideas here. >>>>>>>>> >>>>>>>>> Thanks again for commenting here, hopefully we can come to a good >>>>>>>>> solution. >>>>>>>>> >>>>>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com >>>>>>>>> <mailto:show...@gmail.com>> wrote: >>>>>>>>> >>>>>>>>> > Hi Omnia, >>>>>>>>> > >>>>>>>>> > Thanks for your reply. >>>>>>>>> > >>>>>>>>> > For (3), you said: >>>>>>>>> > > - I have some concerns about the impact of this option on the >>>>>>>>> > transactional >>>>>>>>> > producers, for example, what will happen to an ongoing transaction >>>>>>>>> > associated with an expired PID? Would this leave the transactions >>>>>>>>> > in a >>>>>>>>> > "hanging" state? >>>>>>>>> > >>>>>>>>> > - How will we notify the client that the transaction can't continue >>>>>>>>> > due to >>>>>>>>> > an expired PID? >>>>>>>>> > >>>>>>>>> > - If PID got marked as `expired` this will mean that >>>>>>>>> > `admin.DescribeProducers` will not list them which will make >>>>>>>>> > *`kafka-transactions.sh >>>>>>>>> > --list`* a bit tricky as we can't identify if there are >>>>>>>>> > transactions linked >>>>>>>>> > to this expired PID or not. The same concern applies to >>>>>>>>> > *`kafka-transactions.sh >>>>>>>>> > --find-hanging`*. >>>>>>>>> > >>>>>>>>> > --> Yes, you're right. Those are also concerns for this solution. >>>>>>>>> > Currently, there's no way to notify clients about the expiration. >>>>>>>>> > Also, the ongoing transactions will be hanging. For the admin cli, >>>>>>>>> > we've >>>>>>>>> > never thought about it. Good point. >>>>>>>>> > In summary, to adopt this solution, there are many issues needed to >>>>>>>>> > get >>>>>>>>> > fixed. >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > For (5), you said: >>>>>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your concern >>>>>>>>> > > here >>>>>>>>> > that >>>>>>>>> > those good clients that use the same principal as a rogue one will >>>>>>>>> > get >>>>>>>>> > throttled? >>>>>>>>> > >>>>>>>>> > If this is the case, then I believe it should be okay as other >>>>>>>>> > throttling >>>>>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > What about applying limit/throttling to >>>>>>>>> > *`/config/users/<user>/clients/<client-id>` >>>>>>>>> > *similar to what we have with client quota? This should reduce the >>>>>>>>> > concern >>>>>>>>> > about throttling good clients, right? >>>>>>>>> > >>>>>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>>>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client Id >>>>>>>>> > is >>>>>>>>> > also workable. >>>>>>>>> > It's just these 2 attributes are not required. >>>>>>>>> > That is, it's possible we take all clients as the same one: {default >>>>>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it. >>>>>>>>> > Do you have any thoughts about it? >>>>>>>>> > Maybe skip throttling for {default KafkaPrinciple + default >>>>>>>>> > clientID} ? >>>>>>>>> > >>>>>>>>> > Luke >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim >>>>>>>>> > <o.g.h.ibra...@gmail.com <mailto:o.g.h.ibra...@gmail.com>> >>>>>>>>> > wrote: >>>>>>>>> > >>>>>>>>> >> Hi Luke & Justine, >>>>>>>>> >> Thanks for looking into this issue, we have been experiencing this >>>>>>>>> >> because >>>>>>>>> >> of rouge clients as well. >>>>>>>>> >> >>>>>>>>> >> > 3. Having a limit to the number of active producer IDs (sort of >>>>>>>>> >> > like an >>>>>>>>> >> LRU >>>>>>>>> >> >cache) >>>>>>>>> >> >-> The idea here is that if we hit a misconfigured client, we >>>>>>>>> >> >will expire >>>>>>>>> >> >the older entries. The concern here is we have risks to lose >>>>>>>>> >> >idempotency >>>>>>>>> >> >guarantees, and currently, we don't have a way to notify clients >>>>>>>>> >> >about >>>>>>>>> >> >losing idempotency guarantees. Besides, the least recently used >>>>>>>>> >> >entries >>>>>>>>> >> >got removed are not always from the "bad" clients. >>>>>>>>> >> >>>>>>>>> >> - I have some concerns about the impact of this option on the >>>>>>>>> >> transactional >>>>>>>>> >> producers, for example, what will happen to an ongoing transaction >>>>>>>>> >> associated with an expired PID? Would this leave the transactions >>>>>>>>> >> in a >>>>>>>>> >> "hanging" state? >>>>>>>>> >> >>>>>>>>> >> - How will we notify the client that the transaction can't >>>>>>>>> >> continue due to >>>>>>>>> >> an expired PID? >>>>>>>>> >> >>>>>>>>> >> - If PID got marked as `expired` this will mean that >>>>>>>>> >> `admin.DescribeProducers` will not list them which will make >>>>>>>>> >> *`kafka-transactions.sh >>>>>>>>> >> --list`* a bit tricky as we can't identify if there are >>>>>>>>> >> transactions >>>>>>>>> >> linked >>>>>>>>> >> to this expired PID or not. The same concern applies to >>>>>>>>> >> *`kafka-transactions.sh >>>>>>>>> >> --find-hanging`*. >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >5. limit/throttling the producer id based on the principle >>>>>>>>> >> >-> Although we can limit the impact to a certain principle with >>>>>>>>> >> >this >>>>>>>>> >> idea, >>>>>>>>> >> >same concern still exists as solution #1 #2. >>>>>>>>> >> >>>>>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your concern >>>>>>>>> >> here >>>>>>>>> >> that >>>>>>>>> >> those good clients that use the same principal as a rogue one will >>>>>>>>> >> get >>>>>>>>> >> throttled? >>>>>>>>> >> >>>>>>>>> >> If this is the case, then I believe it should be okay as other >>>>>>>>> >> throttling >>>>>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> What about applying limit/throttling to >>>>>>>>> >> *`/config/users/<user>/clients/<client-id>` >>>>>>>>> >> *similar to what we have with client quota? This should reduce the >>>>>>>>> >> concern >>>>>>>>> >> about throttling good clients, right? >>>>>>>>> >> >>>>>>>>> >> best, >>>>>>>>> >> >>>>>>>>> >> Omnia >>>>>>>>> >> >>>>>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com >>>>>>>>> >> <mailto:show...@gmail.com>> wrote: >>>>>>>>> >> >>>>>>>>> >> > Bump this thread to see if there are any comments/thoughts. >>>>>>>>> >> > Thanks. >>>>>>>>> >> > >>>>>>>>> >> > Luke >>>>>>>>> >> > >>>>>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com >>>>>>>>> >> > <mailto:show...@gmail.com>> wrote: >>>>>>>>> >> > >>>>>>>>> >> > > Hi devs, >>>>>>>>> >> > > >>>>>>>>> >> > > As stated in the motivation section in KIP-854 >>>>>>>>> >> > > < >>>>>>>>> >> > >>>>>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry >>>>>>>>> >> > >: >>>>>>>>> >> > > >>>>>>>>> >> > > With idempotent producers becoming the default in Kafka, this >>>>>>>>> >> > > means >>>>>>>>> >> that >>>>>>>>> >> > > unless otherwise specified, all new producers will be given >>>>>>>>> >> > > producer >>>>>>>>> >> IDs. >>>>>>>>> >> > > Some (inefficient) applications may now create many >>>>>>>>> >> > > non-transactional >>>>>>>>> >> > > idempotent producers. Each of these producers will be assigned >>>>>>>>> >> > > a >>>>>>>>> >> producer >>>>>>>>> >> > > ID and these IDs and their metadata are stored in the broker >>>>>>>>> >> > > memory, >>>>>>>>> >> > which >>>>>>>>> >> > > might cause brokers out of memory. >>>>>>>>> >> > > >>>>>>>>> >> > > Justine (in cc.) and I and some other team members are working >>>>>>>>> >> > > on the >>>>>>>>> >> > > solutions for this issue. But none of them solves it completely >>>>>>>>> >> without >>>>>>>>> >> > > side effects. Among them, "availability" VS "idempotency >>>>>>>>> >> > > guarantees" >>>>>>>>> >> is >>>>>>>>> >> > > what we can't decide which to sacrifice. Some of these >>>>>>>>> >> > > solutions >>>>>>>>> >> > sacrifice >>>>>>>>> >> > > availability of produce (1,2,5) and others sacrifice >>>>>>>>> >> > > idempotency >>>>>>>>> >> > guarantees >>>>>>>>> >> > > (3). It could be useful to know if people generally have a >>>>>>>>> >> > > preference >>>>>>>>> >> one >>>>>>>>> >> > > way or the other. Or what other better solutions there might >>>>>>>>> >> > > be. >>>>>>>>> >> > > >>>>>>>>> >> > > Here are the proposals we came up with: >>>>>>>>> >> > > >>>>>>>>> >> > > 1. Limit the total active producer ID allocation number. >>>>>>>>> >> > > -> This is the simplest solution. But since the OOM issue is >>>>>>>>> >> > > usually >>>>>>>>> >> > > caused by a rogue or misconfigured client, and this solution >>>>>>>>> >> > > might >>>>>>>>> >> > "punish" >>>>>>>>> >> > > the good client from sending messages. >>>>>>>>> >> > > >>>>>>>>> >> > > 2. Throttling the producer ID allocation rate >>>>>>>>> >> > > -> Same concern as the solution #1. >>>>>>>>> >> > > >>>>>>>>> >> > > 3. Having a limit to the number of active producer IDs (sort >>>>>>>>> >> > > of like >>>>>>>>> >> an >>>>>>>>> >> > > LRU cache) >>>>>>>>> >> > > -> The idea here is that if we hit a misconfigured client, we >>>>>>>>> >> > > will >>>>>>>>> >> expire >>>>>>>>> >> > > the older entries. The concern here is we have risks to lose >>>>>>>>> >> idempotency >>>>>>>>> >> > > guarantees, and currently, we don't have a way to notify >>>>>>>>> >> > > clients about >>>>>>>>> >> > > losing idempotency guarantees. Besides, the least recently >>>>>>>>> >> > > used >>>>>>>>> >> entries >>>>>>>>> >> > > got removed are not always from the "bad" clients. >>>>>>>>> >> > > >>>>>>>>> >> > > 4. allow clients to "close" the producer ID usage >>>>>>>>> >> > > -> We can provide a way for producer to "close" producerID >>>>>>>>> >> > > usage. >>>>>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested to >>>>>>>>> >> allocate >>>>>>>>> >> > > one. After that, we'll keep the producer ID metadata in broker >>>>>>>>> >> > > even if >>>>>>>>> >> > the >>>>>>>>> >> > > producer is "closed". Having a closed API (ex: >>>>>>>>> >> > > END_PRODUCER_ID), we >>>>>>>>> >> can >>>>>>>>> >> > > remove the entry from broker side. In client side, we can send >>>>>>>>> >> > > it when >>>>>>>>> >> > > producer closing. The concern is, the old clients (including >>>>>>>>> >> > > non-java >>>>>>>>> >> > > clients) will still suffer from the OOM issue. >>>>>>>>> >> > > >>>>>>>>> >> > > 5. limit/throttling the producer id based on the principle >>>>>>>>> >> > > -> Although we can limit the impact to a certain principle >>>>>>>>> >> > > with this >>>>>>>>> >> > idea, >>>>>>>>> >> > > same concern still exists as solution #1 #2. >>>>>>>>> >> > > >>>>>>>>> >> > > Any thoughts/feedback are welcomed. >>>>>>>>> >> > > >>>>>>>>> >> > > Thank you. >>>>>>>>> >> > > Luke >>>>>>>>> >> > > >>>>>>>>> >> > >>>>>>>>> >> >>>>>>>>> > >>>>>> >>>>>> On Wed, Feb 15, 2023 at 12:20 AM Justine Olshan <jols...@confluent.io >>>>>> <mailto:jols...@confluent.io>> wrote: >>>>>>> Hey Omnia, >>>>>>> >>>>>>> Thanks for the response. I think I understand your explanations here >>>>>>> with respect to principal and clientId usage. >>>>>>> >>>>>>> For the throttling -- handleInitProducerIdRequest will allocate the ID >>>>>>> to the producer, but we don't actually store it on the broker or >>>>>>> increment our metric until the first produce request for that producer >>>>>>> is sent (or sent again after previously expiring). Would you consider >>>>>>> throttling the produce request instead? It may be hard to get any >>>>>>> metrics from the transaction coordinator where the initProducerId >>>>>>> request is handled. >>>>>>> >>>>>>> Justine >>>>>>> >>>>>>> On Tue, Feb 14, 2023 at 9:29 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com >>>>>>> <mailto:o.g.h.ibra...@gmail.com>> wrote: >>>>>>>> Hey Justine, >>>>>>>> > If I understand your message correctly, there are issues with >>>>>>>> > identifying the source of the rogue clients? So you propose to add a >>>>>>>> > new metric for that? >>>>>>>> > And also proposing to throttle based on clientId as a potential >>>>>>>> > follow up? >>>>>>>> I want to identify rogue clients by KafkaPrincipal (and/or clientId) >>>>>>>> similarly to how we identify clients in Fetch/Produce/Request >>>>>>>> QuotaManagers. Using KafkaPrincipal should give cluster admin the >>>>>>>> ability to throttle later based on principal which is most likely to >>>>>>>> be a smaller set than clientIds. My initial thought was to add a >>>>>>>> metrics that represent how many InitProducerIDRequest are sent by >>>>>>>> KafkaPrincipal (and/or clientId) similar to Fetch/Produce >>>>>>>> QuotaManagers. >>>>>>>> Then as a follow-up, we can throttle based on either KafkaPrinciple or >>>>>>>> clientId (maybe default as well to align this with other QuotaManagers >>>>>>>> in Kafka). >>>>>>>> >>>>>>>> >1. Does we rely on the client using the same ID? What if there are >>>>>>>> >many clients that all use different client IDs? >>>>>>>> This is why I want to use the combination of KafkaPrincipal or >>>>>>>> clientId similar to some other quotas we have in Kafka already. This >>>>>>>> will be a similar risk to Fetch/Produce quota in Kafka which also >>>>>>>> relay on the client to use the same clientId and KafkaPrincipal. >>>>>>>> >>>>>>>> >2. Are there places where high cardinality of this metric is a >>>>>>>> >concern? I can imagine many client IDs in the system. Would we treat >>>>>>>> >this as a rate metric (ie, when we get an init producer ID and return >>>>>>>> >a new producer ID we emit a count for that client id?) Or something >>>>>>>> >else? >>>>>>>> My initial thought here was to follow the steps of ClientQuotaManager >>>>>>>> and ClientRequestQuotaManager and use a rate metric. However, I think >>>>>>>> we can emit it either >>>>>>>> when we return the new PID. However, I have concerns that we may >>>>>>>> circle back to the previous concerns with OMM due to keeping track of >>>>>>>> ACTIVE PIDs per KafkaPrincipal(and/or) clientId in the future. Also >>>>>>>> this would be the first time Kafka throttle IDs for any client. >>>>>>>> or once we recieve initProducerIDRequest and throttle before even >>>>>>>> hitting `handleInitProducerIdRequest`. Going this direction we may >>>>>>>> need to throttle it within a different quota window than >>>>>>>> `quota.window.size.seconds ` as throttling INIT_PRODUCER_ID request >>>>>>>> per second wouldn't be efficient for most cases. I personally think >>>>>>>> this direction is easier as it seems more consistent with the existing >>>>>>>> quota implementation. Specially that Kafka has already the concept of >>>>>>>> throttling subset of requests (in ControllerMutationQuotaManager) but >>>>>>>> never had any concept of throttling active IDs for any client. >>>>>>>> >>>>>>>> What do you think? >>>>>>>> >>>>>>>> Thanks >>>>>>>> Omnia >>>>>>>> >>>>>>>> On Thu, Feb 2, 2023 at 5:39 PM Justine Olshan <jols...@confluent.io >>>>>>>> <mailto:jols...@confluent.io>> wrote: >>>>>>>>> Hey Omnia, >>>>>>>>> Sorry for losing track of this. >>>>>>>>> >>>>>>>>> If I understand your message correctly, there are issues with >>>>>>>>> identifying the source of the rogue clients? So you propose to add a >>>>>>>>> new metric for that? >>>>>>>>> And also proposing to throttle based on clientId as a potential >>>>>>>>> follow up? >>>>>>>>> >>>>>>>>> I think both of these make sense. The only things I can think of for >>>>>>>>> the metric are: >>>>>>>>> 1. Does we rely on the client using the same ID? What if there are >>>>>>>>> many clients that all use different client IDs? >>>>>>>>> 2. Are there places where high cardinality of this metric is a >>>>>>>>> concern? I can imagine many client IDs in the system. Would we treat >>>>>>>>> this as a rate metric (ie, when we get an init producer ID and return >>>>>>>>> a new producer ID we emit a count for that client id?) Or something >>>>>>>>> else? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Justine >>>>>>>>> >>>>>>>>> On Thu, Feb 2, 2023 at 8:44 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com >>>>>>>>> <mailto:o.g.h.ibra...@gmail.com>> wrote: >>>>>>>>>> Hi Luke and Justine, >>>>>>>>>> Are there any thoughts or updates on this? I would love to help with >>>>>>>>>> this as we are hitting this more frequently now. >>>>>>>>>> >>>>>>>>>> best, >>>>>>>>>> >>>>>>>>>> On Mon, Oct 31, 2022 at 6:15 PM Omnia Ibrahim >>>>>>>>>> <o.g.h.ibra...@gmail.com <mailto:o.g.h.ibra...@gmail.com>> wrote: >>>>>>>>>>> Hi Luke and Justine, >>>>>>>>>>>> For (3), you said: >>>>>>>>>>>> > - I have some concerns about the impact of this option on the >>>>>>>>>>>> transactional >>>>>>>>>>>> producers, for example, what will happen to an ongoing transaction >>>>>>>>>>>> associated with an expired PID? Would this leave the transactions >>>>>>>>>>>> in a >>>>>>>>>>>> "hanging" state? >>>>>>>>>>>> >>>>>>>>>>>> - How will we notify the client that the transaction can't >>>>>>>>>>>> continue due to >>>>>>>>>>>> an expired PID? >>>>>>>>>>>> >>>>>>>>>>>> - If PID got marked as `expired` this will mean that >>>>>>>>>>>> `admin.DescribeProducers` will not list them which will make >>>>>>>>>>>> *`kafka-transactions.sh >>>>>>>>>>>> --list`* a bit tricky as we can't identify if there are >>>>>>>>>>>> transactions linked >>>>>>>>>>>> to this expired PID or not. The same concern applies to >>>>>>>>>>>> *`kafka-transactions.sh >>>>>>>>>>>> --find-hanging`*. >>>>>>>>>>>> >>>>>>>>>>>> --> Yes, you're right. Those are also concerns for this solution. >>>>>>>>>>>> Currently, there's no way to notify clients about the expiration. >>>>>>>>>>>> Also, the ongoing transactions will be hanging. For the admin cli, >>>>>>>>>>>> we've >>>>>>>>>>>> never thought about it. Good point. >>>>>>>>>>>> In summary, to adopt this solution, there are many issues needed >>>>>>>>>>>> to get >>>>>>>>>>>> fixed. >>>>>>>>>>> >>>>>>>>>>> Justin already clarified that if PID is attached to a transaction >>>>>>>>>>> it will not expire so identifying the transactions shouldn't be a >>>>>>>>>>> concern anymore. >>>>>>>>>>> The only concern here will be that this solution will not solve the >>>>>>>>>>> problem if the rouge producer is a transactional producer with >>>>>>>>>>> hanging transactions. >>>>>>>>>>> If anyone faced this situation they will need to abort the hanging >>>>>>>>>>> transactions manually and then the solution to expire a PID can >>>>>>>>>>> then work. >>>>>>>>>>> >>>>>>>>>>>> --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>>>>>>>>> Yes, We were thinking about throttling by KafkaPrinciple. Client >>>>>>>>>>>> Id is also >>>>>>>>>>>> workable. >>>>>>>>>>>> It's just these 2 attributes are not required. >>>>>>>>>>>> That is, it's possible we take all clients as the same one: >>>>>>>>>>>> {default >>>>>>>>>>>> KafkaPrinciple + default clientID}, and apply throttling on it. >>>>>>>>>>>> Do you have any thoughts about it? >>>>>>>>>>>> Maybe skip throttling for {default KafkaPrinciple + default >>>>>>>>>>>> clientID} >>>>>>>>>>> >>>>>>>>>>> Throttling for default KafkaPrinciple and default ClientID is >>>>>>>>>>> useful when we need to have a hard limit on the whole cluster and >>>>>>>>>>> whoever is running the cluster doesn't knowclientsntIDs or if a >>>>>>>>>>> KafkaPrinciple is reused between different producer applications. >>>>>>>>>>> I usually find it helpful to have a way to apply throttling only on >>>>>>>>>>> the rough clients only once I identify them without punishing >>>>>>>>>>> everyone on the cluster. However, there are two problems with this >>>>>>>>>>> - There's no easy way at the moment to link PIDs to clientId or >>>>>>>>>>> KafkaPrinciple. This need to be addressed first. >>>>>>>>>>> - Is Justin's comment on the throttling, and the fact that will >>>>>>>>>>> mean we either block all requests or have to store the request in >>>>>>>>>>> memory which in both cases has side downs on the producer experince. >>>>>>>>>>> >>>>>>>>>>>> I recently had another discussion with my team and it does seem >>>>>>>>>>>> like there >>>>>>>>>>>> should be a way to make it more clear to the clients what is going >>>>>>>>>>>> on. A >>>>>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is >>>>>>>>>>>> a way to >>>>>>>>>>>> improve the story for newer clients. (Ie if we choose to expire >>>>>>>>>>>> based on a >>>>>>>>>>>> size limit, we should include a response indicating the ID has >>>>>>>>>>>> expired.) We >>>>>>>>>>>> also discussed ways to redefine the guarantees so that users who >>>>>>>>>>>> have >>>>>>>>>>>> stronger idempotency requirements can ensure them (over >>>>>>>>>>>> availability/memory >>>>>>>>>>>> concerns). Let me know if you have any ideas here. >>>>>>>>>>> >>>>>>>>>>> It may be easier to improve the experience for new clients. >>>>>>>>>>> However, if we improved only the new clients we may need a way to >>>>>>>>>>> help teams who run Kafka with rough clients on old versions by at >>>>>>>>>>> least giving them an easy way to identify the clientId/ or >>>>>>>>>>> KafkaPrinciple that generated these PIDs. >>>>>>>>>>> >>>>>>>>>>> For context, it's very tricky to even identify which clientId is >>>>>>>>>>> creating all these PIDs that caused OOM, which is a contributing >>>>>>>>>>> part of the issue at the moment. So maybe one option here could be >>>>>>>>>>> adding a new metric that tracks the number of generated PIDs per >>>>>>>>>>> clientId. This will help the team who runs the Kafka cluster to >>>>>>>>>>> - contact these rough clients and ask them to fix their clients or >>>>>>>>>>> upgrade to a new client if the new client version has a better >>>>>>>>>>> experience. >>>>>>>>>>> - or if ended with a throttling solution this may help identify >>>>>>>>>>> which clientId needs to be throttled. >>>>>>>>>>> >>>>>>>>>>> Maybe we can start with a solution for identifying the rough >>>>>>>>>>> clients first and keep looking for a solution to limit them, what >>>>>>>>>>> do you think? >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>>> On Tue, Oct 18, 2022 at 5:24 PM Justine Olshan >>>>>>>>>>> <jols...@confluent.io.invalid> wrote: >>>>>>>>>>>> Oops. I realized I just replied to Omnia 🤦♀️ >>>>>>>>>>>> >>>>>>>>>>>> Here was my response for the mailing thread: >>>>>>>>>>>> >>>>>>>>>>>> Hey Omnia, >>>>>>>>>>>> Sorry to hear this is a problem for you as well. :( >>>>>>>>>>>> > * I have some concerns about the impact of this option on the >>>>>>>>>>>> transactional producers, for example, what will happen to an >>>>>>>>>>>> ongoing >>>>>>>>>>>> transaction associated with an expired PID? Would this leave the >>>>>>>>>>>> transactions in a "hanging" state?* >>>>>>>>>>>> We currently check if a transaction is ongoing and do not expire >>>>>>>>>>>> the >>>>>>>>>>>> producer ID if it has an ongoing transaction. I suspect we will >>>>>>>>>>>> continue to >>>>>>>>>>>> do this with any solution we pick. >>>>>>>>>>>> >>>>>>>>>>>> My team members and I looked a bit into the throttling case and it >>>>>>>>>>>> can get >>>>>>>>>>>> a bit tricky since it means we need to throttle the produce >>>>>>>>>>>> request before >>>>>>>>>>>> it is processed. This means we either block all requests or have >>>>>>>>>>>> to store >>>>>>>>>>>> the request in memory (which is not great if we are trying to save >>>>>>>>>>>> memory). >>>>>>>>>>>> >>>>>>>>>>>> I recently had another discussion with my team and it does seem >>>>>>>>>>>> like there >>>>>>>>>>>> should be a way to make it more clear to the clients what is going >>>>>>>>>>>> on. A >>>>>>>>>>>> lot of this protocol is implicit. I'm wondering if maybe there is >>>>>>>>>>>> a way to >>>>>>>>>>>> improve the story for newer clients. (Ie if we choose to expire >>>>>>>>>>>> based on a >>>>>>>>>>>> size limit, we should include a response indicating the ID has >>>>>>>>>>>> expired.) We >>>>>>>>>>>> also discussed ways to redefine the guarantees so that users who >>>>>>>>>>>> have >>>>>>>>>>>> stronger idempotency requirements can ensure them (over >>>>>>>>>>>> availability/memory >>>>>>>>>>>> concerns). Let me know if you have any ideas here. >>>>>>>>>>>> >>>>>>>>>>>> Thanks again for commenting here, hopefully we can come to a good >>>>>>>>>>>> solution. >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Oct 18, 2022 at 1:11 AM Luke Chen <show...@gmail.com >>>>>>>>>>>> <mailto:show...@gmail.com>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> > Hi Omnia, >>>>>>>>>>>> > >>>>>>>>>>>> > Thanks for your reply. >>>>>>>>>>>> > >>>>>>>>>>>> > For (3), you said: >>>>>>>>>>>> > > - I have some concerns about the impact of this option on the >>>>>>>>>>>> > transactional >>>>>>>>>>>> > producers, for example, what will happen to an ongoing >>>>>>>>>>>> > transaction >>>>>>>>>>>> > associated with an expired PID? Would this leave the >>>>>>>>>>>> > transactions in a >>>>>>>>>>>> > "hanging" state? >>>>>>>>>>>> > >>>>>>>>>>>> > - How will we notify the client that the transaction can't >>>>>>>>>>>> > continue due to >>>>>>>>>>>> > an expired PID? >>>>>>>>>>>> > >>>>>>>>>>>> > - If PID got marked as `expired` this will mean that >>>>>>>>>>>> > `admin.DescribeProducers` will not list them which will make >>>>>>>>>>>> > *`kafka-transactions.sh >>>>>>>>>>>> > --list`* a bit tricky as we can't identify if there are >>>>>>>>>>>> > transactions linked >>>>>>>>>>>> > to this expired PID or not. The same concern applies to >>>>>>>>>>>> > *`kafka-transactions.sh >>>>>>>>>>>> > --find-hanging`*. >>>>>>>>>>>> > >>>>>>>>>>>> > --> Yes, you're right. Those are also concerns for this solution. >>>>>>>>>>>> > Currently, there's no way to notify clients about the expiration. >>>>>>>>>>>> > Also, the ongoing transactions will be hanging. For the admin >>>>>>>>>>>> > cli, we've >>>>>>>>>>>> > never thought about it. Good point. >>>>>>>>>>>> > In summary, to adopt this solution, there are many issues needed >>>>>>>>>>>> > to get >>>>>>>>>>>> > fixed. >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > For (5), you said: >>>>>>>>>>>> > > I am assuming you mean KafkaPrincipal here! If so is your >>>>>>>>>>>> > > concern here >>>>>>>>>>>> > that >>>>>>>>>>>> > those good clients that use the same principal as a rogue one >>>>>>>>>>>> > will get >>>>>>>>>>>> > throttled? >>>>>>>>>>>> > >>>>>>>>>>>> > If this is the case, then I believe it should be okay as other >>>>>>>>>>>> > throttling >>>>>>>>>>>> > in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > What about applying limit/throttling to >>>>>>>>>>>> > *`/config/users/<user>/clients/<client-id>` >>>>>>>>>>>> > *similar to what we have with client quota? This should reduce >>>>>>>>>>>> > the concern >>>>>>>>>>>> > about throttling good clients, right? >>>>>>>>>>>> > >>>>>>>>>>>> > --> Yes, I mean KafkaPrinciple (sorry, I didn't make it clear) >>>>>>>>>>>> > Yes, We were thinking about throttling by KafkaPrinciple. Client >>>>>>>>>>>> > Id is >>>>>>>>>>>> > also workable. >>>>>>>>>>>> > It's just these 2 attributes are not required. >>>>>>>>>>>> > That is, it's possible we take all clients as the same one: >>>>>>>>>>>> > {default >>>>>>>>>>>> > KafkaPrinciple + default clientID}, and apply throttling on it. >>>>>>>>>>>> > Do you have any thoughts about it? >>>>>>>>>>>> > Maybe skip throttling for {default KafkaPrinciple + default >>>>>>>>>>>> > clientID} ? >>>>>>>>>>>> > >>>>>>>>>>>> > Luke >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > On Sat, Oct 15, 2022 at 2:33 AM Omnia Ibrahim >>>>>>>>>>>> > <o.g.h.ibra...@gmail.com <mailto:o.g.h.ibra...@gmail.com>> >>>>>>>>>>>> > wrote: >>>>>>>>>>>> > >>>>>>>>>>>> >> Hi Luke & Justine, >>>>>>>>>>>> >> Thanks for looking into this issue, we have been experiencing >>>>>>>>>>>> >> this because >>>>>>>>>>>> >> of rouge clients as well. >>>>>>>>>>>> >> >>>>>>>>>>>> >> > 3. Having a limit to the number of active producer IDs (sort >>>>>>>>>>>> >> > of like an >>>>>>>>>>>> >> LRU >>>>>>>>>>>> >> >cache) >>>>>>>>>>>> >> >-> The idea here is that if we hit a misconfigured client, we >>>>>>>>>>>> >> >will expire >>>>>>>>>>>> >> >the older entries. The concern here is we have risks to lose >>>>>>>>>>>> >> >idempotency >>>>>>>>>>>> >> >guarantees, and currently, we don't have a way to notify >>>>>>>>>>>> >> >clients about >>>>>>>>>>>> >> >losing idempotency guarantees. Besides, the least recently >>>>>>>>>>>> >> >used entries >>>>>>>>>>>> >> >got removed are not always from the "bad" clients. >>>>>>>>>>>> >> >>>>>>>>>>>> >> - I have some concerns about the impact of this option on the >>>>>>>>>>>> >> transactional >>>>>>>>>>>> >> producers, for example, what will happen to an ongoing >>>>>>>>>>>> >> transaction >>>>>>>>>>>> >> associated with an expired PID? Would this leave the >>>>>>>>>>>> >> transactions in a >>>>>>>>>>>> >> "hanging" state? >>>>>>>>>>>> >> >>>>>>>>>>>> >> - How will we notify the client that the transaction can't >>>>>>>>>>>> >> continue due to >>>>>>>>>>>> >> an expired PID? >>>>>>>>>>>> >> >>>>>>>>>>>> >> - If PID got marked as `expired` this will mean that >>>>>>>>>>>> >> `admin.DescribeProducers` will not list them which will make >>>>>>>>>>>> >> *`kafka-transactions.sh >>>>>>>>>>>> >> --list`* a bit tricky as we can't identify if there are >>>>>>>>>>>> >> transactions >>>>>>>>>>>> >> linked >>>>>>>>>>>> >> to this expired PID or not. The same concern applies to >>>>>>>>>>>> >> *`kafka-transactions.sh >>>>>>>>>>>> >> --find-hanging`*. >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >5. limit/throttling the producer id based on the principle >>>>>>>>>>>> >> >-> Although we can limit the impact to a certain principle >>>>>>>>>>>> >> >with this >>>>>>>>>>>> >> idea, >>>>>>>>>>>> >> >same concern still exists as solution #1 #2. >>>>>>>>>>>> >> >>>>>>>>>>>> >> I am assuming you mean KafkaPrincipal here! If so is your >>>>>>>>>>>> >> concern here >>>>>>>>>>>> >> that >>>>>>>>>>>> >> those good clients that use the same principal as a rogue one >>>>>>>>>>>> >> will get >>>>>>>>>>>> >> throttled? >>>>>>>>>>>> >> >>>>>>>>>>>> >> If this is the case, then I believe it should be okay as other >>>>>>>>>>>> >> throttling >>>>>>>>>>>> >> in Kafka on *`/config/users/<user>`* has the same behaviour. >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> What about applying limit/throttling to >>>>>>>>>>>> >> *`/config/users/<user>/clients/<client-id>` >>>>>>>>>>>> >> *similar to what we have with client quota? This should reduce >>>>>>>>>>>> >> the concern >>>>>>>>>>>> >> about throttling good clients, right? >>>>>>>>>>>> >> >>>>>>>>>>>> >> best, >>>>>>>>>>>> >> >>>>>>>>>>>> >> Omnia >>>>>>>>>>>> >> >>>>>>>>>>>> >> On Tue, Oct 11, 2022 at 4:18 AM Luke Chen <show...@gmail.com >>>>>>>>>>>> >> <mailto:show...@gmail.com>> wrote: >>>>>>>>>>>> >> >>>>>>>>>>>> >> > Bump this thread to see if there are any comments/thoughts. >>>>>>>>>>>> >> > Thanks. >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > Luke >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > On Mon, Sep 26, 2022 at 11:06 AM Luke Chen <show...@gmail.com >>>>>>>>>>>> >> > <mailto:show...@gmail.com>> wrote: >>>>>>>>>>>> >> > >>>>>>>>>>>> >> > > Hi devs, >>>>>>>>>>>> >> > > >>>>>>>>>>>> >> > > As stated in the motivation section in KIP-854 >>>>>>>>>>>> >> > > < >>>>>>>>>>>> >> > >>>>>>>>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry >>>>>>>>>>>> >> > >: >>>>>>>>>>>> >> > > >>>>>>>>>>>> >> > > With idempotent producers becoming the default in Kafka, >>>>>>>>>>>> >> > > this means >>>>>>>>>>>> >> that >>>>>>>>>>>> >> > > unless otherwise specified, all new producers will be given >>>>>>>>>>>> >> > > producer >>>>>>>>>>>> >> IDs. >>>>>>>>>>>> >> > > Some (inefficient) applications may now create many >>>>>>>>>>>> >> > > non-transactional >>>>>>>>>>>> >> > > idempotent producers. Each of these producers will be >>>>>>>>>>>> >> > > assigned a >>>>>>>>>>>> >> producer >>>>>>>>>>>> >> > > ID and these IDs and their metadata are stored in the >>>>>>>>>>>> >> > > broker memory, >>>>>>>>>>>> >> > which >>>>>>>>>>>> >> > > might cause brokers out of memory. >>>>>>>>>>>> >> > > >>>>>>>>>>>> >> > > Justine (in cc.) and I and some other team members are >>>>>>>>>>>> >> > > working on the >>>>>>>>>>>> >> > > solutions for this issue. But none of them solves it >>>>>>>>>>>> >> > > completely >>>>>>>>>>>> >> without >>>>>>>>>>>> >> > > side effects. Among them, "availability" VS "idempotency >>>>>>>>>>>> >> > > guarantees" >>>>>>>>>>>> >> is >>>>>>>>>>>> >> > > what we can't decide which to sacrifice. Some of these >>>>>>>>>>>> >> > > solutions >>>>>>>>>>>> >> > sacrifice >>>>>>>>>>>> >> > > availability of produce (1,2,5) and others sacrifice >>>>>>>>>>>> >> > > idempotency >>>>>>>>>>>> >> > guarantees >>>>>>>>>>>> >> > > (3). It could be useful to know if people generally have a >>>>>>>>>>>> >> > > preference >>>>>>>>>>>> >> one >>>>>>>>>>>> >> > > way or the other. Or what other better solutions there >>>>>>>>>>>> >> > > might be. >>>>>>>>>>>> >> > > >>>>>>>>>>>> >> > > Here are the proposals we came up with: >>>>>>>>>>>> >> > > >>>>>>>>>>>> >> > > 1. Limit the total active producer ID allocation number. >>>>>>>>>>>> >> > > -> This is the simplest solution. But since the OOM issue >>>>>>>>>>>> >> > > is usually >>>>>>>>>>>> >> > > caused by a rogue or misconfigured client, and this >>>>>>>>>>>> >> > > solution might >>>>>>>>>>>> >> > "punish" >>>>>>>>>>>> >> > > the good client from sending messages. >>>>>>>>>>>> >> > > >>>>>>>>>>>> >> > > 2. Throttling the producer ID allocation rate >>>>>>>>>>>> >> > > -> Same concern as the solution #1. >>>>>>>>>>>> >> > > >>>>>>>>>>>> >> > > 3. Having a limit to the number of active producer IDs >>>>>>>>>>>> >> > > (sort of like >>>>>>>>>>>> >> an >>>>>>>>>>>> >> > > LRU cache) >>>>>>>>>>>> >> > > -> The idea here is that if we hit a misconfigured client, >>>>>>>>>>>> >> > > we will >>>>>>>>>>>> >> expire >>>>>>>>>>>> >> > > the older entries. The concern here is we have risks to lose >>>>>>>>>>>> >> idempotency >>>>>>>>>>>> >> > > guarantees, and currently, we don't have a way to notify >>>>>>>>>>>> >> > > clients about >>>>>>>>>>>> >> > > losing idempotency guarantees. Besides, the least recently >>>>>>>>>>>> >> > > used >>>>>>>>>>>> >> entries >>>>>>>>>>>> >> > > got removed are not always from the "bad" clients. >>>>>>>>>>>> >> > > >>>>>>>>>>>> >> > > 4. allow clients to "close" the producer ID usage >>>>>>>>>>>> >> > > -> We can provide a way for producer to "close" producerID >>>>>>>>>>>> >> > > usage. >>>>>>>>>>>> >> > > Currently, we only have a way to INIT_PRODUCER_ID requested >>>>>>>>>>>> >> > > to >>>>>>>>>>>> >> allocate >>>>>>>>>>>> >> > > one. After that, we'll keep the producer ID metadata in >>>>>>>>>>>> >> > > broker even if >>>>>>>>>>>> >> > the >>>>>>>>>>>> >> > > producer is "closed". Having a closed API (ex: >>>>>>>>>>>> >> > > END_PRODUCER_ID), we >>>>>>>>>>>> >> can >>>>>>>>>>>> >> > > remove the entry from broker side. In client side, we can >>>>>>>>>>>> >> > > send it when >>>>>>>>>>>> >> > > producer closing. The concern is, the old clients >>>>>>>>>>>> >> > > (including non-java >>>>>>>>>>>> >> > > clients) will still suffer from the OOM issue. >>>>>>>>>>>> >> > > >>>>>>>>>>>> >> > > 5. limit/throttling the producer id based on the principle >>>>>>>>>>>> >> > > -> Although we can limit the impact to a certain principle >>>>>>>>>>>> >> > > with this >>>>>>>>>>>> >> > idea, >>>>>>>>>>>> >> > > same concern still exists as solution #1 #2. >>>>>>>>>>>> >> > > >>>>>>>>>>>> >> > > Any thoughts/feedback are welcomed. >>>>>>>>>>>> >> > > >>>>>>>>>>>> >> > > Thank you. >>>>>>>>>>>> >> > > Luke >>>>>>>>>>>> >> > > >>>>>>>>>>>> >> > >>>>>>>>>>>> >> >>>>>>>>>>>> >