Hi, Just bringing some offline discussion and recent updated to the KIP here to the mailing list Claude updated the KIP to use LayeredBloomFilter from Apache-commons https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayeredBloomFilter.html for the caching layer instead of implementing TimeBasedBloomFilter from scratch. Claude and I had some discussion regarding some configurations that impact the bloom filter. The KIP originally proposed to split each window into half however this now has been updated to use `producer.id <http://producer.id/>.quota.cache.layer.count` which control how many layer each window will have. Default is 4 layers. I updated the Rejected Alternatives section to reflect why 2 layered bloom filter has been rejected The KIP originally proposed to start a new bloom layer ever producer.id <http://producer.id/>.quota.window.size.seconds/2 now this will be triggered ever producer.id <http://producer.id/>.quota.window.size.seconds/producer.id <http://producer.id/>.quota.cache.layer.count. I updated the diagram in the KIP to reflect this. The producer_ids_rate per user will be driving the maximum number of PIDs in Shape https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/Shape.html instead of introducing a global config for max size of Shape as it goes against the other rejected alternatives. Now Shape can be `Shape.fromNP(producer_ids_rate, producer.id <http://producer.id/>.quota.cache.false.positive.rate) The KIP now propose another config to control the bloom's false postive rate using producer.id <http://producer.id/>.quota.cache.false.positive.rate by default this is set to 0.001 instead of being hardcoded.
I think the KIP is now in better shape to ask others for a review. Omnia > On 24 Apr 2024, at 14:48, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: > > Hi Glaude sorry that it took me a while to respond. I finally had time to > look into your implementation here > https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java#L121 > and so far it make sense. > >> So an early PID is added to the first filter and the associated metric is >> updated. >> that PID is seen multiple times over the next 60 minutes, but is not added >> to the Bloom filters again. >> once the 60 minutes elapses the first filter is cleared, or removed and a >> new one started. In any case the PID is no longer recorded in any extant >> Bloom filter. >> the PID is seen again and is added to the newest bloom filter and the >> associated metric is updated. >> >> I believe at this point the metric is incorrect, the PID has been counted >> 2x, when it has been in use for the entire time. > > You are right! My original design with slides window bloom carries this risk > of duplication. I had a look into your track method and it makes sense. >> 1. p.i.q.window.size.seconds the length of time that a window will >> exist. This is also the maximum time between PID uses where the PID is >> considered to be the same. Reuse of the PID after p.iq.window.size.seconds >> triggers recording the PID as a new PID. >> Define a new configuration option "producer.id.quota.window.count" as >> the number of windows active in window.size.seconds. > This is correct. > >> 3. p.i.q.window.num, specified as 11 in the KIP. I thought this was how >> many PIDs were expected in each window. In the original KIP this means >> that we expect to see the PID 22 times (2 windows x 11). In the Layered >> Bloom filter this would be the N value for the Shape. > producer.id.quota.window.num is how many metrics samples we retain but not > how many PID we store in memory. >> 2. p.i.q.window.count (the new one), how many sections to break >> p.i.q.window.size.seconds into. In the initial KIP this is 2. This value >> gives us the timing for creating a new layer in the layered bloom filter. >> So every (p.i.q.window.size.seconds / p.i.q.window.count) seconds a new >> layer will be created and an old layer or layers will be removed. The >> layered bloom filter will add layers to keep the probability of false >> positives in range. >> 4. p.i.q.window.fpr (needs to be specified) the expected false positive >> rate. Not sure how to express this in the config in a way that makes sense >> but something like 0.000006 or the like. This is the P value for the >> Shape. See https://hur.st/bloomfilter for a Bloom filter > > For these two we might need to introduce them. How would they interact with > each other in the layered bloom filter? > > Thanks > >> On 16 Apr 2024, at 08:00, Claude Warren <cla...@xenei.com> wrote: >> >> The difference between p.i.q.window.count and p.i.q.window.num: >> >> To be honest, I may have misunderstood your definition of window num. But >> here is what I have in mind: >> >> >> 1. p.i.q.window.size.seconds the length of time that a window will >> exist. This is also the maximum time between PID uses where the PID is >> considered to be the same. Reuse of the PID after p.iq.window.size.seconds >> triggers recording the PID as a new PID. >> Define a new configuration option "producer.id.quota.window.count" as >> the number of windows active in window.size.seconds. >> 2. p.i.q.window.count (the new one), how many sections to break >> p.i.q.window.size.seconds into. In the initial KIP this is 2. This value >> gives us the timing for creating a new layer in the layered bloom filter. >> So every (p.i.q.window.size.seconds / p.i.q.window.count) seconds a new >> layer will be created and an old layer or layers will be removed. The >> layered bloom filter will add layers to keep the probability of false >> positives in range. >> 3. p.i.q.window.num, specified as 11 in the KIP. I thought this was how >> many PIDs were expected in each window. In the original KIP this means >> that we expect to see the PID 22 times (2 windows x 11). In the Layered >> Bloom filter this would be the N value for the Shape. >> 4. p.i.q.window.fpr (needs to be specified) the expected false positive >> rate. Not sure how to express this in the config in a way that makes sense >> but something like 0.000006 or the like. This is the P value for the >> Shape. See https://hur.st/bloomfilter for a Bloom filter calculator. >> >> Once we have the N and P for the Shape the shape can be instantiated as >> "Shape s = Shape.fromNP( int n, double p );" >> >> In the layered filter once N items have been added to the layer a new layer >> is created. Layers are removed after p.i.q.window.size.seconds so if there >> is a burst of PIDs the number of layers will expand and then shrink back as >> the PIDs expire. While running there is always at least 1 layer. >> >> Some calculated points: >> >> - No layer will span more than p.i.q.window.size.seconds / >> p.i.q.window.count seconds. >> - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 >> p.i.q.window.count =2 and a rate of 22 PIDs per minute there will be 2 >> layers. >> - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 >> p.i.q.window.count =2 and a rate of 10 PIDs per minute there will be 2 >> layers. >> - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 >> p.i.q.window.count =2 and that no PIDS have been seen in the last 30 >> seconds there will be 2 layers. >> - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 >> p.i.q.window.count =2 and that no PIDS have been seen in the last 60 >> seconds there will be 1 layer. >> - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 >> p.i.q.window.count =2 and a rate of 23 to 44 PIDs per minute there will be >> 4 layers. >> - the false positive rate across the layers remains at or below Shape.P >> - Assuming Shape.N = 11 and Shape.P = 0.000006 the Bloom filter at each >> layer will consume 35 bytes. https://hur.st/bloomfilter provides a quick >> calculator for other values. >> >> Claude >> >> >> >> >> >> >> On Tue, Apr 16, 2024 at 8:06 AM Claude Warren <cla...@xenei.com> wrote: >> >>> Let's put aside the CPC datasketch idea and just discuss the Bloom filter >>> approach. >>> >>> I thinkthe problem with the way the KIP is worded is that PIDs are only >>> added if they are not seen in either of the Bloom filters. >>> >>> So an early PID is added to the first filter and the associated metric is >>> updated. >>> that PID is seen multiple times over the next 60 minutes, but is not added >>> to the Bloom filters again. >>> once the 60 minutes elapses the first filter is cleared, or removed and a >>> new one started. In any case the PID is no longer recorded in any extant >>> Bloom filter. >>> the PID is seen again and is added to the newest bloom filter and the >>> associated metric is updated. >>> >>> I believe at this point the metric is incorrect, the PID has been counted >>> 2x, when it has been in use for the entire time. >>> >>> The "track" method that I added solves this problem by ensuring that the >>> PID is always seen in the latter half of the set of Bloom filters. In the >>> case of 2 filters that is always the second one, but remember that the >>> number of layers will grow as the filters become saturated. So if your >>> filter is intended to hold 500 PIDs and the 501st PID is registered before >>> the expiration a new layer (Bloom filter) is added for new PIDS to be added >>> into. >>> >>> On Mon, Apr 15, 2024 at 5:00 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>> wrote: >>> >>>> Hi Claude, >>>> Thanks for the implementation of the LayeredBloomFilter in apache >>>> commons. >>>> >>>>> Define a new configuration option "producer.id.quota.window.count" as >>>>> the number of windows active in window.size.seconds. >>>> What is the different between “producer.id.quota.window.count” and >>>> producer.id.quota.window.num >>>> >>>>> Basically the kip says, if the PID is found in either of the Bloom >>>> filters >>>>> then no action is taken >>>>> If the PID is not found then it is added and the quota rating metrics >>>> are >>>>> incremented. >>>>> In this case long running PIDs will be counted multiple times. >>>> >>>> The PID is considered not encountered if both frames of the window don’t >>>> have it. If you checked the diagram of for `Caching layer to track active >>>> PIDs per KafkaPrincipal` you will see that each window will have 2 bloom >>>> layers and the first created one will be disposed only when we start the >>>> next window. Which means window2 is starting from the 2nd bloom. Basically >>>> the bloom filter in the KIP is trying to implement a sliding window >>>> pattern. >>>> >>>>> think the question is not whether or not we have seen a given PID >>>> before >>>>> but rather how many unique PIDs did the principal create in the last >>>> hour. >>>>> Perhaps more exactly it is: did the Principal create more than X PIDS in >>>>> the last Y time units? >>>> We don’t really care about the count of unique PIDs per user. The KIP is >>>> trying to follow and build on top of ClientQuotaManager which already have >>>> a patter for throttling that the producer client is aware of so we don’t >>>> need to upgrade old clients for brokers to throttle them and they can >>>> respect the throttling. >>>> >>>> The pattern for throttling is that we record the activities by >>>> incrementing a metric sensor and only when we catch >>>> `QuotaViolationException` from the quota sensor we will be sending a >>>> throttleTimeMs to the client. >>>> For bandwidth throttling for example we increment the sensor by the size >>>> of the request. For PID the KIP is aiming to call >>>> `QuotaManagers::producerIdQuotaManager::maybeRecordAndGetThrottleTimeMs` to >>>> increment by +1 every time we encounter a new PID and if and if >>>> `Sensor::record` returned `QuotaViolationException` then we will send back >>>> to the producer the trolling time that the client should wait for before >>>> sending a new request with a new PID. >>>> I hope this make sense. >>>> >>>>> This question can be quickly answered by a CPC datasketch [1]. The >>>>> solution would be something like: >>>>> Break the Y time units into a set of Y' smaller partitions (e.g. 60 >>>>> 1-minute partitions for an hour). Create a circular queue of Y' CPC >>>>> datasketches for each principal. Implement a queue entry selector >>>> based on >>>>> the modulus of the system by the resolution of the Y' partitions. On >>>> each >>>>> call: >>>> I didn’t evaluate CPC datasketch or any counter solution as I explained >>>> above the aim is not to build a counter specially the Kafka Sensor can be >>>> enough to indicate if we are violating the quota or not. >>>> >>>> Thanks >>>> Omnia >>>> >>>>> On 15 Apr 2024, at 10:35, Claude Warren <cla...@apache.org> wrote: >>>>> >>>>> After thinking about his KIP over the weekend I think that there is >>>> another >>>>> lighter weight approach. >>>>> >>>>> I think the question is not whether or not we have seen a given PID >>>> before >>>>> but rather how many unique PIDs did the principal create in the last >>>> hour. >>>>> Perhaps more exactly it is: did the Principal create more than X PIDS in >>>>> the last Y time units? >>>>> >>>>> This question can be quickly answered by a CPC datasketch [1]. The >>>>> solution would be something like: >>>>> Break the Y time units into a set of Y' smaller partitions (e.g. 60 >>>>> 1-minute partitions for an hour). Create a circular queue of Y' CPC >>>>> datasketches for each principal. Implement a queue entry selector >>>> based on >>>>> the modulus of the system by the resolution of the Y' partitions. On >>>> each >>>>> call: >>>>> >>>>> On queue entry selector change clear the CPC (makes it empty) >>>>> Add the latest PID to the current queue entry. >>>>> Sum up the CPCs and check if the max (or min) estimate of unique counts >>>>> exceeds the limit for the user. >>>>> >>>>> When the CPC returns a zero estimated count then the principal has gone >>>>> away and the principal/CPC-queue pair can be removed from the tracking >>>>> system. >>>>> >>>>> I believe that this code solution is smaller and faster than the Bloom >>>>> filter implementation. >>>>> >>>>> [1] https://datasketches.apache.org/docs/CPC/CPC.html >>>>> >>>>> >>>>> >>>>> On Fri, Apr 12, 2024 at 3:10 PM Claude Warren <cla...@apache.org> >>>> wrote: >>>>> >>>>>> I think there is an issue in the KIP. >>>>>> >>>>>> Basically the kip says, if the PID is found in either of the Bloom >>>> filters >>>>>> then no action is taken >>>>>> If the PID is not found then it is added and the quota rating metrics >>>> are >>>>>> incremented. >>>>>> >>>>>> In this case long running PIDs will be counted multiple times. >>>>>> >>>>>> Let's assume a 30 minute window with 2 15-minute frames. So for the >>>> first >>>>>> 15 minutes all PIDs are placed in the first Bloom filter and for the >>>> 2nd 15 >>>>>> minutes all new PIDs are placed in the second bloom filter. At the >>>> 3rd 15 >>>>>> minutes the first filter is removed and a new empty one created. >>>>>> >>>>>> Let's denote Bloom filters as BFn{} and indicate the contained pids >>>>>> between the braces. >>>>>> >>>>>> >>>>>> So at t0 lets insert PID0 and increment the rating metrics. Thus we >>>> have >>>>>> BF0{PID0} >>>>>> at t0+5 let's insert PID1 and increment the rating metrics. Thus we >>>> have >>>>>> BF0{PID0, PID1} >>>>>> at t0+10 we see PID0 again but no changes occur. >>>>>> at t0+15 we start t1 and we have BF0{PID0, PID1}, BF1{} >>>>>> at t1+5 we see PID2, increment the rating metrics, and we have >>>> BF0{PID0, >>>>>> PID1}, BF1{PID2} >>>>>> at t1+6 we see PID0 again and no changes occur >>>>>> at t1+7 we see PID1 again and no changes occur >>>>>> at t1+15 we start a new window and dispose of BF0. Thus we have >>>>>> BF1{PID2}, BF2{} >>>>>> at t2+1 we see PID3, increment the rating metrics, and we have we have >>>>>> BF1{PID2}, BF2{PID3} >>>>>> at t2+6 we see PID0 again but now it is not in the list so we increment >>>>>> the rating metrics and add it BF1{PID2}, BF2{PID3, PID0} >>>>>> >>>>>> But we just saw PID0 15 minutes ago. Well within the 30 minute window >>>> we >>>>>> are trying to track. Or am I missing something? It seems like we >>>> need to >>>>>> add each PID to the last bloom filter >>>>>> >>>>>> On Fri, Apr 12, 2024 at 2:45 PM Claude Warren <cla...@apache.org> >>>> wrote: >>>>>> >>>>>>> Initial code is available at >>>>>>> >>>> https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java >>>>>>> >>>>>>> On Tue, Apr 9, 2024 at 2:37 PM Claude Warren <cla...@apache.org> >>>> wrote: >>>>>>> >>>>>>>> I should also note that the probability of false positives does not >>>> fall >>>>>>>> below shape.P because as it approaches shape.P a new layer is >>>> created and >>>>>>>> filters are added to that. So no layer in the LayeredBloomFilter >>>> exceeds >>>>>>>> shape.P thus the entire filter does not exceed shape.P. >>>>>>>> >>>>>>>> Claude >>>>>>>> >>>>>>>> On Tue, Apr 9, 2024 at 2:26 PM Claude Warren <cla...@apache.org> >>>> wrote: >>>>>>>> >>>>>>>>> The overall design for KIP-936 seems sound to me. I would make the >>>>>>>>> following changes: >>>>>>>>> >>>>>>>>> Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from >>>>>>>>> commons-collections v4.5 >>>>>>>>> >>>>>>>>> Define the producer.id.quota.window.size.seconds to be the length of >>>>>>>>> time that a Bloom filter of PIDs will exist. >>>>>>>>> Define a new configuration option "producer.id.quota.window.count" >>>> as >>>>>>>>> the number of windows active in window.size.seconds. >>>>>>>>> >>>>>>>>> Define the "Shape" (See commons-collections bloomfilters v4.5) of >>>> the >>>>>>>>> bloom filter from the average number of PIDs expected in >>>>>>>>> window.size.seconds/window.count (call this N) and the probability >>>> of false >>>>>>>>> positives (call this P). Due to the way the LayeredBloomFilter >>>> works the >>>>>>>>> number of items can be a lower number than the max. I'll explain >>>> that in a >>>>>>>>> minute. >>>>>>>>> >>>>>>>>> The LayeredBloomFilter implements the standard BloomFilter interface >>>>>>>>> but internally keeps an ordered list of filters (called layers) >>>> from oldest >>>>>>>>> created to newest. It adds new layers when a specified Predicate >>>>>>>>> (checkExtend) returns true. It will remove filters as defined by a >>>>>>>>> specified Consumer (filterCleanup). >>>>>>>>> >>>>>>>>> Everytime a BloomFilter is merged into the LayeredBloomFilter the >>>>>>>>> filter checks to the "checkExtend" predicate. If it fires the >>>>>>>>> "filterCleanup" is called to remove any layers that should be >>>> removed and a >>>>>>>>> new layer is added. >>>>>>>>> >>>>>>>>> Define the layers of the LayeredBloomFilter to comprise a standard >>>>>>>>> BloomFilter and an associated expiration timestamp. >>>>>>>>> >>>>>>>>> We can thus define >>>>>>>>> >>>>>>>>> - "checkExtend" to require a new layer window.size.seconds / >>>>>>>>> window.count seconds or when the current layer contains shape.N >>>> items. >>>>>>>>> - "filterCleanup" to start at the head of the list of layers and >>>>>>>>> remove any expired filters, usually 0, every window.size.seconds >>>> 1, >>>>>>>>> infrequently more than 1. >>>>>>>>> >>>>>>>>> This system will correctly handle bursty loads. There are 3 cases >>>> to >>>>>>>>> consider: >>>>>>>>> >>>>>>>>> 1. If the producer is producing fewer than shape.N PIDs the layer >>>>>>>>> will not fill up before the next layer is added. >>>>>>>>> 2. If the producer is producing shape.N PIDs the layer will be >>>>>>>>> processed as either a 1 or a 3 depending on system timings. >>>>>>>>> 3. If the producer is producing more than shape.N PIDs the layer >>>>>>>>> will fill up and a new layer will be created with an expiration >>>> timestamp >>>>>>>>> window.size.seconds from when it was created. This is the case >>>> that leads >>>>>>>>> to the filterCleanup infrequently having more than 1 layer to >>>> remove. >>>>>>>>> >>>>>>>>> The last case to consider is if a producer stops generating PIDs, in >>>>>>>>> this case we should walk the map of producers, call >>>> "filterCleanup", and >>>>>>>>> then check to see if the LayeredBloomFilter is empty. If so, >>>> remove it >>>>>>>>> from the map. It will be empty if the producer has not produced a >>>> PID for >>>>>>>>> window.size.seconds. >>>>>>>>> >>>>>>>>> I have this solution mostly coded, though I must admit I do not know >>>>>>>>> where to plugin the ProducerIdQuotaManager defined in the KIP >>>>>>>>> >>>>>>>>> Claude >>>>>>>>> >>>>>>>> >>>> >>>> >>> >>> -- >>> LinkedIn: http://www.linkedin.com/in/claudewarren >>> >> >> >> -- >> LinkedIn: http://www.linkedin.com/in/claudewarren >