Hi, 
Just bringing some offline discussion and recent updated to the KIP here to the 
mailing list
Claude updated the KIP to use LayeredBloomFilter from Apache-commons 
https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayeredBloomFilter.html
 for the caching layer instead of implementing TimeBasedBloomFilter from 
scratch. 
Claude and I had some discussion regarding some configurations that impact the 
bloom filter. 
The KIP originally proposed to split each window into half however this now has 
been updated to use `producer.id <http://producer.id/>.quota.cache.layer.count` 
which control how many layer each window will have. Default is 4 layers. I 
updated the Rejected Alternatives section to reflect why 2 layered bloom filter 
has been rejected
The KIP originally proposed to start a new bloom layer ever producer.id 
<http://producer.id/>.quota.window.size.seconds/2 now this will be triggered 
ever producer.id <http://producer.id/>.quota.window.size.seconds/producer.id 
<http://producer.id/>.quota.cache.layer.count. I updated the diagram in the KIP 
to reflect this. 
The producer_ids_rate per user will be driving the maximum number of PIDs in 
Shape 
https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/Shape.html
 instead of introducing a global config for max size of Shape as it goes 
against the other rejected alternatives. Now Shape can be 
`Shape.fromNP(producer_ids_rate, producer.id 
<http://producer.id/>.quota.cache.false.positive.rate)
The KIP now propose another config to control the bloom's false postive rate 
using producer.id <http://producer.id/>.quota.cache.false.positive.rate by 
default this is set to 0.001 instead of being hardcoded. 

I think the KIP is now in better shape to ask others for a review. 

Omnia

> On 24 Apr 2024, at 14:48, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote:
> 
> Hi Glaude sorry that it took me a while to respond. I finally had time to 
> look into your implementation here 
> https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java#L121
>  and so far it make sense. 
> 
>> So an early PID is added to the first filter and the associated metric is
>> updated.
>> that PID is seen multiple times over the next 60 minutes, but is not added
>> to the Bloom filters again.
>> once the 60 minutes elapses the first filter is cleared, or removed and a
>> new one started.  In any case the PID is no longer recorded in any extant
>> Bloom filter.
>> the PID is seen again and is added to the newest bloom filter and the
>> associated metric is updated.
>> 
>> I believe at this point the metric is incorrect, the PID has been counted
>> 2x, when it has been in use for the entire time.
> 
> You are right! My original design with slides window bloom carries this risk 
> of duplication. I had a look into your track method and it makes sense.
>> 1. p.i.q.window.size.seconds the length of time that a window will
>>   exist.  This is also the maximum time between PID uses where the PID is
>>   considered to be the same.  Reuse of the PID after p.iq.window.size.seconds
>>   triggers recording the PID as a new PID.
>>   Define a new configuration option "producer.id.quota.window.count" as
>>   the number of windows active in window.size.seconds.
> This is correct. 
> 
>>   3. p.i.q.window.num, specified as 11 in the KIP.  I thought this was how
>>   many PIDs were expected in each window.  In the original KIP this means
>>   that we expect to see the PID 22 times (2 windows x 11).  In the Layered
>>   Bloom filter this would be the N value for the Shape.
> producer.id.quota.window.num is how many metrics samples we retain but not 
> how many PID we store in memory. 
>>   2. p.i.q.window.count (the new one), how many sections to break
>>   p.i.q.window.size.seconds into.  In the initial KIP this is 2.  This value
>>   gives us the timing for creating a new layer in the layered bloom filter.
>>   So every (p.i.q.window.size.seconds / p.i.q.window.count) seconds a new
>>   layer will be created and an old layer or layers will be removed.  The
>>   layered bloom filter will add layers to keep the probability of false
>>   positives in range.
>>   4. p.i.q.window.fpr (needs to be specified) the expected false positive
>>   rate.  Not sure how to express this in the config in a way that makes sense
>>   but something like 0.000006 or the like.  This is the P value for the
>>   Shape.  See https://hur.st/bloomfilter for a Bloom filter 
> 
> For these two we might need to introduce them. How would they interact with 
> each other in the layered bloom filter? 
> 
> Thanks 
> 
>> On 16 Apr 2024, at 08:00, Claude Warren <cla...@xenei.com> wrote:
>> 
>> The difference between p.i.q.window.count and p.i.q.window.num:
>> 
>> To be honest, I may have misunderstood your definition of window num.  But
>> here is what I have in mind:
>> 
>> 
>>   1. p.i.q.window.size.seconds the length of time that a window will
>>   exist.  This is also the maximum time between PID uses where the PID is
>>   considered to be the same.  Reuse of the PID after p.iq.window.size.seconds
>>   triggers recording the PID as a new PID.
>>   Define a new configuration option "producer.id.quota.window.count" as
>>   the number of windows active in window.size.seconds.
>>   2. p.i.q.window.count (the new one), how many sections to break
>>   p.i.q.window.size.seconds into.  In the initial KIP this is 2.  This value
>>   gives us the timing for creating a new layer in the layered bloom filter.
>>   So every (p.i.q.window.size.seconds / p.i.q.window.count) seconds a new
>>   layer will be created and an old layer or layers will be removed.  The
>>   layered bloom filter will add layers to keep the probability of false
>>   positives in range.
>>   3. p.i.q.window.num, specified as 11 in the KIP.  I thought this was how
>>   many PIDs were expected in each window.  In the original KIP this means
>>   that we expect to see the PID 22 times (2 windows x 11).  In the Layered
>>   Bloom filter this would be the N value for the Shape.
>>   4. p.i.q.window.fpr (needs to be specified) the expected false positive
>>   rate.  Not sure how to express this in the config in a way that makes sense
>>   but something like 0.000006 or the like.  This is the P value for the
>>   Shape.  See https://hur.st/bloomfilter for a Bloom filter calculator.
>> 
>> Once we have the N and P for the Shape the shape can be instantiated as
>> "Shape s = Shape.fromNP( int n, double p );"
>> 
>> In the layered filter once N items have been added to the layer a new layer
>> is created.  Layers are removed after p.i.q.window.size.seconds so if there
>> is a burst of PIDs the number of layers will expand and then shrink back as
>> the PIDs expire.  While running there is always at least 1 layer.
>> 
>> Some calculated points:
>> 
>>   - No layer will span more than p.i.q.window.size.seconds /
>>   p.i.q.window.count seconds.
>>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>>   p.i.q.window.count =2 and a rate of 22 PIDs per minute there will be 2
>>   layers.
>>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>>   p.i.q.window.count =2 and a rate of 10 PIDs per minute there will be 2
>>   layers.
>>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>>   p.i.q.window.count =2 and that no PIDS have been seen in the last 30
>>   seconds there will be 2 layers.
>>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>>   p.i.q.window.count =2 and that no PIDS have been seen in the last 60
>>   seconds there will be 1 layer.
>>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>>   p.i.q.window.count =2 and a rate of 23 to 44 PIDs per minute there will be
>>   4 layers.
>>   - the false positive rate across the layers remains at or below Shape.P
>>   - Assuming Shape.N = 11 and Shape.P = 0.000006 the Bloom filter at each
>>   layer will consume 35 bytes. https://hur.st/bloomfilter provides a quick
>>   calculator for other values.
>> 
>> Claude
>> 
>> 
>> 
>> 
>> 
>> 
>> On Tue, Apr 16, 2024 at 8:06 AM Claude Warren <cla...@xenei.com> wrote:
>> 
>>> Let's put aside the CPC datasketch idea and just discuss the Bloom filter
>>> approach.
>>> 
>>> I thinkthe problem with the way the KIP is worded is that PIDs are only
>>> added if they are not seen in either of the Bloom filters.
>>> 
>>> So an early PID is added to the first filter and the associated metric is
>>> updated.
>>> that PID is seen multiple times over the next 60 minutes, but is not added
>>> to the Bloom filters again.
>>> once the 60 minutes elapses the first filter is cleared, or removed and a
>>> new one started.  In any case the PID is no longer recorded in any extant
>>> Bloom filter.
>>> the PID is seen again and is added to the newest bloom filter and the
>>> associated metric is updated.
>>> 
>>> I believe at this point the metric is incorrect, the PID has been counted
>>> 2x, when it has been in use for the entire time.
>>> 
>>> The "track" method that I added solves this problem by ensuring that the
>>> PID is always seen in the latter half of the set of Bloom filters.  In the
>>> case of 2 filters that is always the second one, but remember that the
>>> number of layers will grow as the filters become saturated.  So if your
>>> filter is intended to hold 500 PIDs and the 501st PID is registered before
>>> the expiration a new layer (Bloom filter) is added for new PIDS to be added
>>> into.
>>> 
>>> On Mon, Apr 15, 2024 at 5:00 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>>> wrote:
>>> 
>>>> Hi Claude,
>>>> Thanks for the implementation of the LayeredBloomFilter in apache
>>>> commons.
>>>> 
>>>>> Define a new configuration option "producer.id.quota.window.count" as
>>>>> the number of windows active in window.size.seconds.
>>>> What is the different between “producer.id.quota.window.count” and
>>>> producer.id.quota.window.num
>>>> 
>>>>> Basically the kip says, if the PID is found in either of the Bloom
>>>> filters
>>>>> then no action is taken
>>>>> If the PID is not found then it is added and the quota rating metrics
>>>> are
>>>>> incremented.
>>>>> In this case long running PIDs will be counted multiple times.
>>>> 
>>>> The PID is considered not encountered if both frames of the window don’t
>>>> have it. If you checked the diagram of for `Caching layer to track active
>>>> PIDs per KafkaPrincipal` you will see that each window will have 2 bloom
>>>> layers and the first created one will be disposed only when we start the
>>>> next window. Which means window2 is starting from the 2nd bloom. Basically
>>>> the bloom filter in the KIP is trying to implement a sliding window
>>>> pattern.
>>>> 
>>>>> think the question is not whether or not we have seen a given PID
>>>> before
>>>>> but rather how many unique PIDs did the principal create in the last
>>>> hour.
>>>>> Perhaps more exactly it is: did the Principal create more than X PIDS in
>>>>> the last Y time units?
>>>> We don’t really care about the count of unique PIDs per user. The KIP is
>>>> trying to follow and build on top of ClientQuotaManager which already have
>>>> a patter for throttling that the producer client is aware of so we don’t
>>>> need to upgrade old clients for brokers to throttle them and they can
>>>> respect the throttling.
>>>> 
>>>> The pattern for throttling is that we record the activities by
>>>> incrementing a metric sensor and only when we catch
>>>> `QuotaViolationException` from the quota sensor we will be sending a
>>>> throttleTimeMs to the client.
>>>> For bandwidth throttling for example we increment the sensor by the size
>>>> of the request. For PID the KIP is aiming to call
>>>> `QuotaManagers::producerIdQuotaManager::maybeRecordAndGetThrottleTimeMs` to
>>>> increment by +1 every time we encounter a new PID and if and if
>>>> `Sensor::record` returned `QuotaViolationException` then we will send back
>>>> to the producer the trolling time that the client should wait for before
>>>> sending a new request with a new PID.
>>>> I hope this make sense.
>>>> 
>>>>> This question can be quickly answered by a CPC datasketch [1].  The
>>>>> solution would be something like:
>>>>> Break the Y time units into a set of Y' smaller partitions (e.g. 60
>>>>> 1-minute partitions for an hour).  Create a circular queue of Y' CPC
>>>>> datasketches for each principal.  Implement a queue entry selector
>>>> based on
>>>>> the modulus of the system by the resolution of the Y' partitions. On
>>>> each
>>>>> call:
>>>> I didn’t evaluate CPC datasketch or any counter solution as I explained
>>>> above the aim is not to build a counter specially the Kafka Sensor can be
>>>> enough to indicate if we are violating the quota or not.
>>>> 
>>>> Thanks
>>>> Omnia
>>>> 
>>>>> On 15 Apr 2024, at 10:35, Claude Warren <cla...@apache.org> wrote:
>>>>> 
>>>>> After thinking about his KIP over the weekend I think that there is
>>>> another
>>>>> lighter weight approach.
>>>>> 
>>>>> I think the question is not whether or not we have seen a given PID
>>>> before
>>>>> but rather how many unique PIDs did the principal create in the last
>>>> hour.
>>>>> Perhaps more exactly it is: did the Principal create more than X PIDS in
>>>>> the last Y time units?
>>>>> 
>>>>> This question can be quickly answered by a CPC datasketch [1].  The
>>>>> solution would be something like:
>>>>> Break the Y time units into a set of Y' smaller partitions (e.g. 60
>>>>> 1-minute partitions for an hour).  Create a circular queue of Y' CPC
>>>>> datasketches for each principal.  Implement a queue entry selector
>>>> based on
>>>>> the modulus of the system by the resolution of the Y' partitions. On
>>>> each
>>>>> call:
>>>>> 
>>>>> On queue entry selector change clear the CPC (makes it empty)
>>>>> Add the latest PID to the current queue entry.
>>>>> Sum up the CPCs and check if the max (or min) estimate of unique counts
>>>>> exceeds the limit for the user.
>>>>> 
>>>>> When the CPC returns a zero estimated count then the principal has gone
>>>>> away and the principal/CPC-queue pair can be removed from the tracking
>>>>> system.
>>>>> 
>>>>> I believe that this code solution is smaller and faster than the Bloom
>>>>> filter implementation.
>>>>> 
>>>>> [1] https://datasketches.apache.org/docs/CPC/CPC.html
>>>>> 
>>>>> 
>>>>> 
>>>>> On Fri, Apr 12, 2024 at 3:10 PM Claude Warren <cla...@apache.org>
>>>> wrote:
>>>>> 
>>>>>> I think there is an issue in the KIP.
>>>>>> 
>>>>>> Basically the kip says, if the PID is found in either of the Bloom
>>>> filters
>>>>>> then no action is taken
>>>>>> If the PID is not found then it is added and the quota rating metrics
>>>> are
>>>>>> incremented.
>>>>>> 
>>>>>> In this case long running PIDs will be counted multiple times.
>>>>>> 
>>>>>> Let's assume a 30 minute window with 2 15-minute frames.  So for the
>>>> first
>>>>>> 15 minutes all PIDs are placed in the first Bloom filter and for the
>>>> 2nd 15
>>>>>> minutes all new PIDs are placed in the second bloom filter.  At the
>>>> 3rd 15
>>>>>> minutes the first filter is removed and a new empty one created.
>>>>>> 
>>>>>> Let's denote Bloom filters as BFn{} and indicate the contained pids
>>>>>> between the braces.
>>>>>> 
>>>>>> 
>>>>>> So at t0 lets insert PID0 and increment the rating metrics.  Thus we
>>>> have
>>>>>> BF0{PID0}
>>>>>> at t0+5 let's insert PID1 and increment the rating metrics.  Thus we
>>>> have
>>>>>> BF0{PID0, PID1}
>>>>>> at t0+10 we see PID0 again but no changes occur.
>>>>>> at t0+15 we start t1  and we have BF0{PID0, PID1}, BF1{}
>>>>>> at t1+5 we see PID2, increment the rating metrics, and we have
>>>> BF0{PID0,
>>>>>> PID1}, BF1{PID2}
>>>>>> at t1+6 we see PID0 again and no changes occur
>>>>>> at t1+7 we see PID1 again and no changes occur
>>>>>> at t1+15 we start a new window and dispose of BF0.  Thus we have
>>>>>> BF1{PID2}, BF2{}
>>>>>> at t2+1 we see PID3, increment the rating metrics,  and we have we have
>>>>>> BF1{PID2}, BF2{PID3}
>>>>>> at t2+6 we see PID0 again but now it is not in the list so we increment
>>>>>> the rating metrics and add it BF1{PID2}, BF2{PID3, PID0}
>>>>>> 
>>>>>> But we just saw PID0 15 minutes ago.  Well within the 30 minute window
>>>> we
>>>>>> are trying to track.  Or am I missing something?  It seems like we
>>>> need to
>>>>>> add each PID to the last bloom filter
>>>>>> 
>>>>>> On Fri, Apr 12, 2024 at 2:45 PM Claude Warren <cla...@apache.org>
>>>> wrote:
>>>>>> 
>>>>>>> Initial code is available at
>>>>>>> 
>>>> https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java
>>>>>>> 
>>>>>>> On Tue, Apr 9, 2024 at 2:37 PM Claude Warren <cla...@apache.org>
>>>> wrote:
>>>>>>> 
>>>>>>>> I should also note that the probability of false positives does not
>>>> fall
>>>>>>>> below shape.P because as it approaches shape.P a new layer is
>>>> created and
>>>>>>>> filters are added to that.  So no layer in the LayeredBloomFilter
>>>> exceeds
>>>>>>>> shape.P thus the entire filter does not exceed shape.P.
>>>>>>>> 
>>>>>>>> Claude
>>>>>>>> 
>>>>>>>> On Tue, Apr 9, 2024 at 2:26 PM Claude Warren <cla...@apache.org>
>>>> wrote:
>>>>>>>> 
>>>>>>>>> The overall design for KIP-936 seems sound to me.  I would make the
>>>>>>>>> following changes:
>>>>>>>>> 
>>>>>>>>> Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from
>>>>>>>>> commons-collections v4.5
>>>>>>>>> 
>>>>>>>>> Define the producer.id.quota.window.size.seconds to be the length of
>>>>>>>>> time that a Bloom filter of PIDs will exist.
>>>>>>>>> Define a new configuration option "producer.id.quota.window.count"
>>>> as
>>>>>>>>> the number of windows active in window.size.seconds.
>>>>>>>>> 
>>>>>>>>> Define the "Shape" (See commons-collections bloomfilters v4.5) of
>>>> the
>>>>>>>>> bloom filter from the average number of PIDs expected in
>>>>>>>>> window.size.seconds/window.count (call this N) and the probability
>>>> of false
>>>>>>>>> positives (call this P).  Due to the way the LayeredBloomFilter
>>>> works the
>>>>>>>>> number of items can be a lower number than the max.  I'll explain
>>>> that in a
>>>>>>>>> minute.
>>>>>>>>> 
>>>>>>>>> The LayeredBloomFilter implements the standard BloomFilter interface
>>>>>>>>> but internally keeps an ordered list of filters (called layers)
>>>> from oldest
>>>>>>>>> created to newest.  It adds new layers when a specified Predicate
>>>>>>>>> (checkExtend) returns true.  It will remove filters as defined by a
>>>>>>>>> specified Consumer (filterCleanup).
>>>>>>>>> 
>>>>>>>>> Everytime a BloomFilter is merged into the LayeredBloomFilter the
>>>>>>>>> filter checks to the "checkExtend" predicate.  If it fires the
>>>>>>>>> "filterCleanup" is called to remove any layers that should be
>>>> removed and a
>>>>>>>>> new layer is added.
>>>>>>>>> 
>>>>>>>>> Define the layers of the LayeredBloomFilter to comprise a standard
>>>>>>>>> BloomFilter and an associated expiration timestamp.
>>>>>>>>> 
>>>>>>>>> We can thus define
>>>>>>>>> 
>>>>>>>>>  - "checkExtend" to require a new layer window.size.seconds /
>>>>>>>>>  window.count seconds or when the current layer contains shape.N
>>>> items.
>>>>>>>>>  - "filterCleanup" to start at the head of the list of layers and
>>>>>>>>>  remove any expired filters, usually 0, every window.size.seconds
>>>> 1,
>>>>>>>>>  infrequently more than 1.
>>>>>>>>> 
>>>>>>>>> This system will correctly handle bursty loads.  There are 3 cases
>>>> to
>>>>>>>>> consider:
>>>>>>>>> 
>>>>>>>>>  1. If the producer is producing fewer than shape.N PIDs the layer
>>>>>>>>>  will not fill up before the next layer is added.
>>>>>>>>>  2. If the producer is producing shape.N PIDs the layer will be
>>>>>>>>>  processed as either a 1 or a 3 depending on system timings.
>>>>>>>>>  3. If the producer is producing more than shape.N PIDs the layer
>>>>>>>>>  will fill up and a new layer will be created with an expiration
>>>> timestamp
>>>>>>>>>  window.size.seconds from when it was created.  This is the case
>>>> that leads
>>>>>>>>>  to the filterCleanup infrequently having more than 1 layer to
>>>> remove.
>>>>>>>>> 
>>>>>>>>> The last case to consider is if a producer stops generating PIDs, in
>>>>>>>>> this case we should walk the map of producers,  call
>>>> "filterCleanup", and
>>>>>>>>> then check to see if the LayeredBloomFilter is empty.  If so,
>>>> remove it
>>>>>>>>> from the map.  It will be empty if the producer has not produced a
>>>> PID for
>>>>>>>>> window.size.seconds.
>>>>>>>>> 
>>>>>>>>> I have this solution mostly coded, though I must admit I do not know
>>>>>>>>> where to plugin the ProducerIdQuotaManager defined in the KIP
>>>>>>>>> 
>>>>>>>>> Claude
>>>>>>>>> 
>>>>>>>> 
>>>> 
>>>> 
>>> 
>>> --
>>> LinkedIn: http://www.linkedin.com/in/claudewarren
>>> 
>> 
>> 
>> -- 
>> LinkedIn: http://www.linkedin.com/in/claudewarren
> 

Reply via email to