Hi Glaude sorry that it took me a while to respond. I finally had time to look into your implementation here https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java#L121 and so far it make sense.
> So an early PID is added to the first filter and the associated metric is > updated. > that PID is seen multiple times over the next 60 minutes, but is not added > to the Bloom filters again. > once the 60 minutes elapses the first filter is cleared, or removed and a > new one started. In any case the PID is no longer recorded in any extant > Bloom filter. > the PID is seen again and is added to the newest bloom filter and the > associated metric is updated. > > I believe at this point the metric is incorrect, the PID has been counted > 2x, when it has been in use for the entire time. You are right! My original design with slides window bloom carries this risk of duplication. I had a look into your track method and it makes sense. > 1. p.i.q.window.size.seconds the length of time that a window will > exist. This is also the maximum time between PID uses where the PID is > considered to be the same. Reuse of the PID after p.iq.window.size.seconds > triggers recording the PID as a new PID. > Define a new configuration option "producer.id.quota.window.count" as > the number of windows active in window.size.seconds. This is correct. > 3. p.i.q.window.num, specified as 11 in the KIP. I thought this was how > many PIDs were expected in each window. In the original KIP this means > that we expect to see the PID 22 times (2 windows x 11). In the Layered > Bloom filter this would be the N value for the Shape. producer.id.quota.window.num is how many metrics samples we retain but not how many PID we store in memory. > 2. p.i.q.window.count (the new one), how many sections to break > p.i.q.window.size.seconds into. In the initial KIP this is 2. This value > gives us the timing for creating a new layer in the layered bloom filter. > So every (p.i.q.window.size.seconds / p.i.q.window.count) seconds a new > layer will be created and an old layer or layers will be removed. The > layered bloom filter will add layers to keep the probability of false > positives in range. > 4. p.i.q.window.fpr (needs to be specified) the expected false positive > rate. Not sure how to express this in the config in a way that makes sense > but something like 0.000006 or the like. This is the P value for the > Shape. See https://hur.st/bloomfilter for a Bloom filter For these two we might need to introduce them. How would they interact with each other in the layered bloom filter? Thanks > On 16 Apr 2024, at 08:00, Claude Warren <cla...@xenei.com> wrote: > > The difference between p.i.q.window.count and p.i.q.window.num: > > To be honest, I may have misunderstood your definition of window num. But > here is what I have in mind: > > > 1. p.i.q.window.size.seconds the length of time that a window will > exist. This is also the maximum time between PID uses where the PID is > considered to be the same. Reuse of the PID after p.iq.window.size.seconds > triggers recording the PID as a new PID. > Define a new configuration option "producer.id.quota.window.count" as > the number of windows active in window.size.seconds. > 2. p.i.q.window.count (the new one), how many sections to break > p.i.q.window.size.seconds into. In the initial KIP this is 2. This value > gives us the timing for creating a new layer in the layered bloom filter. > So every (p.i.q.window.size.seconds / p.i.q.window.count) seconds a new > layer will be created and an old layer or layers will be removed. The > layered bloom filter will add layers to keep the probability of false > positives in range. > 3. p.i.q.window.num, specified as 11 in the KIP. I thought this was how > many PIDs were expected in each window. In the original KIP this means > that we expect to see the PID 22 times (2 windows x 11). In the Layered > Bloom filter this would be the N value for the Shape. > 4. p.i.q.window.fpr (needs to be specified) the expected false positive > rate. Not sure how to express this in the config in a way that makes sense > but something like 0.000006 or the like. This is the P value for the > Shape. See https://hur.st/bloomfilter for a Bloom filter calculator. > > Once we have the N and P for the Shape the shape can be instantiated as > "Shape s = Shape.fromNP( int n, double p );" > > In the layered filter once N items have been added to the layer a new layer > is created. Layers are removed after p.i.q.window.size.seconds so if there > is a burst of PIDs the number of layers will expand and then shrink back as > the PIDs expire. While running there is always at least 1 layer. > > Some calculated points: > > - No layer will span more than p.i.q.window.size.seconds / > p.i.q.window.count seconds. > - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 > p.i.q.window.count =2 and a rate of 22 PIDs per minute there will be 2 > layers. > - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 > p.i.q.window.count =2 and a rate of 10 PIDs per minute there will be 2 > layers. > - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 > p.i.q.window.count =2 and that no PIDS have been seen in the last 30 > seconds there will be 2 layers. > - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 > p.i.q.window.count =2 and that no PIDS have been seen in the last 60 > seconds there will be 1 layer. > - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60 > p.i.q.window.count =2 and a rate of 23 to 44 PIDs per minute there will be > 4 layers. > - the false positive rate across the layers remains at or below Shape.P > - Assuming Shape.N = 11 and Shape.P = 0.000006 the Bloom filter at each > layer will consume 35 bytes. https://hur.st/bloomfilter provides a quick > calculator for other values. > > Claude > > > > > > > On Tue, Apr 16, 2024 at 8:06 AM Claude Warren <cla...@xenei.com> wrote: > >> Let's put aside the CPC datasketch idea and just discuss the Bloom filter >> approach. >> >> I thinkthe problem with the way the KIP is worded is that PIDs are only >> added if they are not seen in either of the Bloom filters. >> >> So an early PID is added to the first filter and the associated metric is >> updated. >> that PID is seen multiple times over the next 60 minutes, but is not added >> to the Bloom filters again. >> once the 60 minutes elapses the first filter is cleared, or removed and a >> new one started. In any case the PID is no longer recorded in any extant >> Bloom filter. >> the PID is seen again and is added to the newest bloom filter and the >> associated metric is updated. >> >> I believe at this point the metric is incorrect, the PID has been counted >> 2x, when it has been in use for the entire time. >> >> The "track" method that I added solves this problem by ensuring that the >> PID is always seen in the latter half of the set of Bloom filters. In the >> case of 2 filters that is always the second one, but remember that the >> number of layers will grow as the filters become saturated. So if your >> filter is intended to hold 500 PIDs and the 501st PID is registered before >> the expiration a new layer (Bloom filter) is added for new PIDS to be added >> into. >> >> On Mon, Apr 15, 2024 at 5:00 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >> wrote: >> >>> Hi Claude, >>> Thanks for the implementation of the LayeredBloomFilter in apache >>> commons. >>> >>>> Define a new configuration option "producer.id.quota.window.count" as >>>> the number of windows active in window.size.seconds. >>> What is the different between “producer.id.quota.window.count” and >>> producer.id.quota.window.num >>> >>>> Basically the kip says, if the PID is found in either of the Bloom >>> filters >>>> then no action is taken >>>> If the PID is not found then it is added and the quota rating metrics >>> are >>>> incremented. >>>> In this case long running PIDs will be counted multiple times. >>> >>> The PID is considered not encountered if both frames of the window don’t >>> have it. If you checked the diagram of for `Caching layer to track active >>> PIDs per KafkaPrincipal` you will see that each window will have 2 bloom >>> layers and the first created one will be disposed only when we start the >>> next window. Which means window2 is starting from the 2nd bloom. Basically >>> the bloom filter in the KIP is trying to implement a sliding window >>> pattern. >>> >>>> think the question is not whether or not we have seen a given PID >>> before >>>> but rather how many unique PIDs did the principal create in the last >>> hour. >>>> Perhaps more exactly it is: did the Principal create more than X PIDS in >>>> the last Y time units? >>> We don’t really care about the count of unique PIDs per user. The KIP is >>> trying to follow and build on top of ClientQuotaManager which already have >>> a patter for throttling that the producer client is aware of so we don’t >>> need to upgrade old clients for brokers to throttle them and they can >>> respect the throttling. >>> >>> The pattern for throttling is that we record the activities by >>> incrementing a metric sensor and only when we catch >>> `QuotaViolationException` from the quota sensor we will be sending a >>> throttleTimeMs to the client. >>> For bandwidth throttling for example we increment the sensor by the size >>> of the request. For PID the KIP is aiming to call >>> `QuotaManagers::producerIdQuotaManager::maybeRecordAndGetThrottleTimeMs` to >>> increment by +1 every time we encounter a new PID and if and if >>> `Sensor::record` returned `QuotaViolationException` then we will send back >>> to the producer the trolling time that the client should wait for before >>> sending a new request with a new PID. >>> I hope this make sense. >>> >>>> This question can be quickly answered by a CPC datasketch [1]. The >>>> solution would be something like: >>>> Break the Y time units into a set of Y' smaller partitions (e.g. 60 >>>> 1-minute partitions for an hour). Create a circular queue of Y' CPC >>>> datasketches for each principal. Implement a queue entry selector >>> based on >>>> the modulus of the system by the resolution of the Y' partitions. On >>> each >>>> call: >>> I didn’t evaluate CPC datasketch or any counter solution as I explained >>> above the aim is not to build a counter specially the Kafka Sensor can be >>> enough to indicate if we are violating the quota or not. >>> >>> Thanks >>> Omnia >>> >>>> On 15 Apr 2024, at 10:35, Claude Warren <cla...@apache.org> wrote: >>>> >>>> After thinking about his KIP over the weekend I think that there is >>> another >>>> lighter weight approach. >>>> >>>> I think the question is not whether or not we have seen a given PID >>> before >>>> but rather how many unique PIDs did the principal create in the last >>> hour. >>>> Perhaps more exactly it is: did the Principal create more than X PIDS in >>>> the last Y time units? >>>> >>>> This question can be quickly answered by a CPC datasketch [1]. The >>>> solution would be something like: >>>> Break the Y time units into a set of Y' smaller partitions (e.g. 60 >>>> 1-minute partitions for an hour). Create a circular queue of Y' CPC >>>> datasketches for each principal. Implement a queue entry selector >>> based on >>>> the modulus of the system by the resolution of the Y' partitions. On >>> each >>>> call: >>>> >>>> On queue entry selector change clear the CPC (makes it empty) >>>> Add the latest PID to the current queue entry. >>>> Sum up the CPCs and check if the max (or min) estimate of unique counts >>>> exceeds the limit for the user. >>>> >>>> When the CPC returns a zero estimated count then the principal has gone >>>> away and the principal/CPC-queue pair can be removed from the tracking >>>> system. >>>> >>>> I believe that this code solution is smaller and faster than the Bloom >>>> filter implementation. >>>> >>>> [1] https://datasketches.apache.org/docs/CPC/CPC.html >>>> >>>> >>>> >>>> On Fri, Apr 12, 2024 at 3:10 PM Claude Warren <cla...@apache.org> >>> wrote: >>>> >>>>> I think there is an issue in the KIP. >>>>> >>>>> Basically the kip says, if the PID is found in either of the Bloom >>> filters >>>>> then no action is taken >>>>> If the PID is not found then it is added and the quota rating metrics >>> are >>>>> incremented. >>>>> >>>>> In this case long running PIDs will be counted multiple times. >>>>> >>>>> Let's assume a 30 minute window with 2 15-minute frames. So for the >>> first >>>>> 15 minutes all PIDs are placed in the first Bloom filter and for the >>> 2nd 15 >>>>> minutes all new PIDs are placed in the second bloom filter. At the >>> 3rd 15 >>>>> minutes the first filter is removed and a new empty one created. >>>>> >>>>> Let's denote Bloom filters as BFn{} and indicate the contained pids >>>>> between the braces. >>>>> >>>>> >>>>> So at t0 lets insert PID0 and increment the rating metrics. Thus we >>> have >>>>> BF0{PID0} >>>>> at t0+5 let's insert PID1 and increment the rating metrics. Thus we >>> have >>>>> BF0{PID0, PID1} >>>>> at t0+10 we see PID0 again but no changes occur. >>>>> at t0+15 we start t1 and we have BF0{PID0, PID1}, BF1{} >>>>> at t1+5 we see PID2, increment the rating metrics, and we have >>> BF0{PID0, >>>>> PID1}, BF1{PID2} >>>>> at t1+6 we see PID0 again and no changes occur >>>>> at t1+7 we see PID1 again and no changes occur >>>>> at t1+15 we start a new window and dispose of BF0. Thus we have >>>>> BF1{PID2}, BF2{} >>>>> at t2+1 we see PID3, increment the rating metrics, and we have we have >>>>> BF1{PID2}, BF2{PID3} >>>>> at t2+6 we see PID0 again but now it is not in the list so we increment >>>>> the rating metrics and add it BF1{PID2}, BF2{PID3, PID0} >>>>> >>>>> But we just saw PID0 15 minutes ago. Well within the 30 minute window >>> we >>>>> are trying to track. Or am I missing something? It seems like we >>> need to >>>>> add each PID to the last bloom filter >>>>> >>>>> On Fri, Apr 12, 2024 at 2:45 PM Claude Warren <cla...@apache.org> >>> wrote: >>>>> >>>>>> Initial code is available at >>>>>> >>> https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java >>>>>> >>>>>> On Tue, Apr 9, 2024 at 2:37 PM Claude Warren <cla...@apache.org> >>> wrote: >>>>>> >>>>>>> I should also note that the probability of false positives does not >>> fall >>>>>>> below shape.P because as it approaches shape.P a new layer is >>> created and >>>>>>> filters are added to that. So no layer in the LayeredBloomFilter >>> exceeds >>>>>>> shape.P thus the entire filter does not exceed shape.P. >>>>>>> >>>>>>> Claude >>>>>>> >>>>>>> On Tue, Apr 9, 2024 at 2:26 PM Claude Warren <cla...@apache.org> >>> wrote: >>>>>>> >>>>>>>> The overall design for KIP-936 seems sound to me. I would make the >>>>>>>> following changes: >>>>>>>> >>>>>>>> Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from >>>>>>>> commons-collections v4.5 >>>>>>>> >>>>>>>> Define the producer.id.quota.window.size.seconds to be the length of >>>>>>>> time that a Bloom filter of PIDs will exist. >>>>>>>> Define a new configuration option "producer.id.quota.window.count" >>> as >>>>>>>> the number of windows active in window.size.seconds. >>>>>>>> >>>>>>>> Define the "Shape" (See commons-collections bloomfilters v4.5) of >>> the >>>>>>>> bloom filter from the average number of PIDs expected in >>>>>>>> window.size.seconds/window.count (call this N) and the probability >>> of false >>>>>>>> positives (call this P). Due to the way the LayeredBloomFilter >>> works the >>>>>>>> number of items can be a lower number than the max. I'll explain >>> that in a >>>>>>>> minute. >>>>>>>> >>>>>>>> The LayeredBloomFilter implements the standard BloomFilter interface >>>>>>>> but internally keeps an ordered list of filters (called layers) >>> from oldest >>>>>>>> created to newest. It adds new layers when a specified Predicate >>>>>>>> (checkExtend) returns true. It will remove filters as defined by a >>>>>>>> specified Consumer (filterCleanup). >>>>>>>> >>>>>>>> Everytime a BloomFilter is merged into the LayeredBloomFilter the >>>>>>>> filter checks to the "checkExtend" predicate. If it fires the >>>>>>>> "filterCleanup" is called to remove any layers that should be >>> removed and a >>>>>>>> new layer is added. >>>>>>>> >>>>>>>> Define the layers of the LayeredBloomFilter to comprise a standard >>>>>>>> BloomFilter and an associated expiration timestamp. >>>>>>>> >>>>>>>> We can thus define >>>>>>>> >>>>>>>> - "checkExtend" to require a new layer window.size.seconds / >>>>>>>> window.count seconds or when the current layer contains shape.N >>> items. >>>>>>>> - "filterCleanup" to start at the head of the list of layers and >>>>>>>> remove any expired filters, usually 0, every window.size.seconds >>> 1, >>>>>>>> infrequently more than 1. >>>>>>>> >>>>>>>> This system will correctly handle bursty loads. There are 3 cases >>> to >>>>>>>> consider: >>>>>>>> >>>>>>>> 1. If the producer is producing fewer than shape.N PIDs the layer >>>>>>>> will not fill up before the next layer is added. >>>>>>>> 2. If the producer is producing shape.N PIDs the layer will be >>>>>>>> processed as either a 1 or a 3 depending on system timings. >>>>>>>> 3. If the producer is producing more than shape.N PIDs the layer >>>>>>>> will fill up and a new layer will be created with an expiration >>> timestamp >>>>>>>> window.size.seconds from when it was created. This is the case >>> that leads >>>>>>>> to the filterCleanup infrequently having more than 1 layer to >>> remove. >>>>>>>> >>>>>>>> The last case to consider is if a producer stops generating PIDs, in >>>>>>>> this case we should walk the map of producers, call >>> "filterCleanup", and >>>>>>>> then check to see if the LayeredBloomFilter is empty. If so, >>> remove it >>>>>>>> from the map. It will be empty if the producer has not produced a >>> PID for >>>>>>>> window.size.seconds. >>>>>>>> >>>>>>>> I have this solution mostly coded, though I must admit I do not know >>>>>>>> where to plugin the ProducerIdQuotaManager defined in the KIP >>>>>>>> >>>>>>>> Claude >>>>>>>> >>>>>>> >>> >>> >> >> -- >> LinkedIn: http://www.linkedin.com/in/claudewarren >> > > > -- > LinkedIn: http://www.linkedin.com/in/claudewarren