Hi Glaude sorry that it took me a while to respond. I finally had time to look 
into your implementation here 
https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java#L121
 and so far it make sense. 

> So an early PID is added to the first filter and the associated metric is
> updated.
> that PID is seen multiple times over the next 60 minutes, but is not added
> to the Bloom filters again.
> once the 60 minutes elapses the first filter is cleared, or removed and a
> new one started.  In any case the PID is no longer recorded in any extant
> Bloom filter.
> the PID is seen again and is added to the newest bloom filter and the
> associated metric is updated.
> 
> I believe at this point the metric is incorrect, the PID has been counted
> 2x, when it has been in use for the entire time.

You are right! My original design with slides window bloom carries this risk of 
duplication. I had a look into your track method and it makes sense.
> 1. p.i.q.window.size.seconds the length of time that a window will
>   exist.  This is also the maximum time between PID uses where the PID is
>   considered to be the same.  Reuse of the PID after p.iq.window.size.seconds
>   triggers recording the PID as a new PID.
>   Define a new configuration option "producer.id.quota.window.count" as
>   the number of windows active in window.size.seconds.
This is correct. 

>   3. p.i.q.window.num, specified as 11 in the KIP.  I thought this was how
>   many PIDs were expected in each window.  In the original KIP this means
>   that we expect to see the PID 22 times (2 windows x 11).  In the Layered
>   Bloom filter this would be the N value for the Shape.
producer.id.quota.window.num is how many metrics samples we retain but not how 
many PID we store in memory. 
>   2. p.i.q.window.count (the new one), how many sections to break
>   p.i.q.window.size.seconds into.  In the initial KIP this is 2.  This value
>   gives us the timing for creating a new layer in the layered bloom filter.
>   So every (p.i.q.window.size.seconds / p.i.q.window.count) seconds a new
>   layer will be created and an old layer or layers will be removed.  The
>   layered bloom filter will add layers to keep the probability of false
>   positives in range.
>   4. p.i.q.window.fpr (needs to be specified) the expected false positive
>   rate.  Not sure how to express this in the config in a way that makes sense
>   but something like 0.000006 or the like.  This is the P value for the
>   Shape.  See https://hur.st/bloomfilter for a Bloom filter 

For these two we might need to introduce them. How would they interact with 
each other in the layered bloom filter? 

Thanks 

> On 16 Apr 2024, at 08:00, Claude Warren <cla...@xenei.com> wrote:
> 
> The difference between p.i.q.window.count and p.i.q.window.num:
> 
> To be honest, I may have misunderstood your definition of window num.  But
> here is what I have in mind:
> 
> 
>   1. p.i.q.window.size.seconds the length of time that a window will
>   exist.  This is also the maximum time between PID uses where the PID is
>   considered to be the same.  Reuse of the PID after p.iq.window.size.seconds
>   triggers recording the PID as a new PID.
>   Define a new configuration option "producer.id.quota.window.count" as
>   the number of windows active in window.size.seconds.
>   2. p.i.q.window.count (the new one), how many sections to break
>   p.i.q.window.size.seconds into.  In the initial KIP this is 2.  This value
>   gives us the timing for creating a new layer in the layered bloom filter.
>   So every (p.i.q.window.size.seconds / p.i.q.window.count) seconds a new
>   layer will be created and an old layer or layers will be removed.  The
>   layered bloom filter will add layers to keep the probability of false
>   positives in range.
>   3. p.i.q.window.num, specified as 11 in the KIP.  I thought this was how
>   many PIDs were expected in each window.  In the original KIP this means
>   that we expect to see the PID 22 times (2 windows x 11).  In the Layered
>   Bloom filter this would be the N value for the Shape.
>   4. p.i.q.window.fpr (needs to be specified) the expected false positive
>   rate.  Not sure how to express this in the config in a way that makes sense
>   but something like 0.000006 or the like.  This is the P value for the
>   Shape.  See https://hur.st/bloomfilter for a Bloom filter calculator.
> 
> Once we have the N and P for the Shape the shape can be instantiated as
> "Shape s = Shape.fromNP( int n, double p );"
> 
> In the layered filter once N items have been added to the layer a new layer
> is created.  Layers are removed after p.i.q.window.size.seconds so if there
> is a burst of PIDs the number of layers will expand and then shrink back as
> the PIDs expire.  While running there is always at least 1 layer.
> 
> Some calculated points:
> 
>   - No layer will span more than p.i.q.window.size.seconds /
>   p.i.q.window.count seconds.
>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>   p.i.q.window.count =2 and a rate of 22 PIDs per minute there will be 2
>   layers.
>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>   p.i.q.window.count =2 and a rate of 10 PIDs per minute there will be 2
>   layers.
>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>   p.i.q.window.count =2 and that no PIDS have been seen in the last 30
>   seconds there will be 2 layers.
>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>   p.i.q.window.count =2 and that no PIDS have been seen in the last 60
>   seconds there will be 1 layer.
>   - Assuming Shape.N = 11, p.i.q.window.size.seconds = 60
>   p.i.q.window.count =2 and a rate of 23 to 44 PIDs per minute there will be
>   4 layers.
>   - the false positive rate across the layers remains at or below Shape.P
>   - Assuming Shape.N = 11 and Shape.P = 0.000006 the Bloom filter at each
>   layer will consume 35 bytes. https://hur.st/bloomfilter provides a quick
>   calculator for other values.
> 
> Claude
> 
> 
> 
> 
> 
> 
> On Tue, Apr 16, 2024 at 8:06 AM Claude Warren <cla...@xenei.com> wrote:
> 
>> Let's put aside the CPC datasketch idea and just discuss the Bloom filter
>> approach.
>> 
>> I thinkthe problem with the way the KIP is worded is that PIDs are only
>> added if they are not seen in either of the Bloom filters.
>> 
>> So an early PID is added to the first filter and the associated metric is
>> updated.
>> that PID is seen multiple times over the next 60 minutes, but is not added
>> to the Bloom filters again.
>> once the 60 minutes elapses the first filter is cleared, or removed and a
>> new one started.  In any case the PID is no longer recorded in any extant
>> Bloom filter.
>> the PID is seen again and is added to the newest bloom filter and the
>> associated metric is updated.
>> 
>> I believe at this point the metric is incorrect, the PID has been counted
>> 2x, when it has been in use for the entire time.
>> 
>> The "track" method that I added solves this problem by ensuring that the
>> PID is always seen in the latter half of the set of Bloom filters.  In the
>> case of 2 filters that is always the second one, but remember that the
>> number of layers will grow as the filters become saturated.  So if your
>> filter is intended to hold 500 PIDs and the 501st PID is registered before
>> the expiration a new layer (Bloom filter) is added for new PIDS to be added
>> into.
>> 
>> On Mon, Apr 15, 2024 at 5:00 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>> wrote:
>> 
>>> Hi Claude,
>>> Thanks for the implementation of the LayeredBloomFilter in apache
>>> commons.
>>> 
>>>> Define a new configuration option "producer.id.quota.window.count" as
>>>> the number of windows active in window.size.seconds.
>>> What is the different between “producer.id.quota.window.count” and
>>> producer.id.quota.window.num
>>> 
>>>> Basically the kip says, if the PID is found in either of the Bloom
>>> filters
>>>> then no action is taken
>>>> If the PID is not found then it is added and the quota rating metrics
>>> are
>>>> incremented.
>>>> In this case long running PIDs will be counted multiple times.
>>> 
>>> The PID is considered not encountered if both frames of the window don’t
>>> have it. If you checked the diagram of for `Caching layer to track active
>>> PIDs per KafkaPrincipal` you will see that each window will have 2 bloom
>>> layers and the first created one will be disposed only when we start the
>>> next window. Which means window2 is starting from the 2nd bloom. Basically
>>> the bloom filter in the KIP is trying to implement a sliding window
>>> pattern.
>>> 
>>>> think the question is not whether or not we have seen a given PID
>>> before
>>>> but rather how many unique PIDs did the principal create in the last
>>> hour.
>>>> Perhaps more exactly it is: did the Principal create more than X PIDS in
>>>> the last Y time units?
>>> We don’t really care about the count of unique PIDs per user. The KIP is
>>> trying to follow and build on top of ClientQuotaManager which already have
>>> a patter for throttling that the producer client is aware of so we don’t
>>> need to upgrade old clients for brokers to throttle them and they can
>>> respect the throttling.
>>> 
>>> The pattern for throttling is that we record the activities by
>>> incrementing a metric sensor and only when we catch
>>> `QuotaViolationException` from the quota sensor we will be sending a
>>> throttleTimeMs to the client.
>>> For bandwidth throttling for example we increment the sensor by the size
>>> of the request. For PID the KIP is aiming to call
>>> `QuotaManagers::producerIdQuotaManager::maybeRecordAndGetThrottleTimeMs` to
>>> increment by +1 every time we encounter a new PID and if and if
>>> `Sensor::record` returned `QuotaViolationException` then we will send back
>>> to the producer the trolling time that the client should wait for before
>>> sending a new request with a new PID.
>>> I hope this make sense.
>>> 
>>>> This question can be quickly answered by a CPC datasketch [1].  The
>>>> solution would be something like:
>>>> Break the Y time units into a set of Y' smaller partitions (e.g. 60
>>>> 1-minute partitions for an hour).  Create a circular queue of Y' CPC
>>>> datasketches for each principal.  Implement a queue entry selector
>>> based on
>>>> the modulus of the system by the resolution of the Y' partitions. On
>>> each
>>>> call:
>>> I didn’t evaluate CPC datasketch or any counter solution as I explained
>>> above the aim is not to build a counter specially the Kafka Sensor can be
>>> enough to indicate if we are violating the quota or not.
>>> 
>>> Thanks
>>> Omnia
>>> 
>>>> On 15 Apr 2024, at 10:35, Claude Warren <cla...@apache.org> wrote:
>>>> 
>>>> After thinking about his KIP over the weekend I think that there is
>>> another
>>>> lighter weight approach.
>>>> 
>>>> I think the question is not whether or not we have seen a given PID
>>> before
>>>> but rather how many unique PIDs did the principal create in the last
>>> hour.
>>>> Perhaps more exactly it is: did the Principal create more than X PIDS in
>>>> the last Y time units?
>>>> 
>>>> This question can be quickly answered by a CPC datasketch [1].  The
>>>> solution would be something like:
>>>> Break the Y time units into a set of Y' smaller partitions (e.g. 60
>>>> 1-minute partitions for an hour).  Create a circular queue of Y' CPC
>>>> datasketches for each principal.  Implement a queue entry selector
>>> based on
>>>> the modulus of the system by the resolution of the Y' partitions. On
>>> each
>>>> call:
>>>> 
>>>> On queue entry selector change clear the CPC (makes it empty)
>>>> Add the latest PID to the current queue entry.
>>>> Sum up the CPCs and check if the max (or min) estimate of unique counts
>>>> exceeds the limit for the user.
>>>> 
>>>> When the CPC returns a zero estimated count then the principal has gone
>>>> away and the principal/CPC-queue pair can be removed from the tracking
>>>> system.
>>>> 
>>>> I believe that this code solution is smaller and faster than the Bloom
>>>> filter implementation.
>>>> 
>>>> [1] https://datasketches.apache.org/docs/CPC/CPC.html
>>>> 
>>>> 
>>>> 
>>>> On Fri, Apr 12, 2024 at 3:10 PM Claude Warren <cla...@apache.org>
>>> wrote:
>>>> 
>>>>> I think there is an issue in the KIP.
>>>>> 
>>>>> Basically the kip says, if the PID is found in either of the Bloom
>>> filters
>>>>> then no action is taken
>>>>> If the PID is not found then it is added and the quota rating metrics
>>> are
>>>>> incremented.
>>>>> 
>>>>> In this case long running PIDs will be counted multiple times.
>>>>> 
>>>>> Let's assume a 30 minute window with 2 15-minute frames.  So for the
>>> first
>>>>> 15 minutes all PIDs are placed in the first Bloom filter and for the
>>> 2nd 15
>>>>> minutes all new PIDs are placed in the second bloom filter.  At the
>>> 3rd 15
>>>>> minutes the first filter is removed and a new empty one created.
>>>>> 
>>>>> Let's denote Bloom filters as BFn{} and indicate the contained pids
>>>>> between the braces.
>>>>> 
>>>>> 
>>>>> So at t0 lets insert PID0 and increment the rating metrics.  Thus we
>>> have
>>>>> BF0{PID0}
>>>>> at t0+5 let's insert PID1 and increment the rating metrics.  Thus we
>>> have
>>>>> BF0{PID0, PID1}
>>>>> at t0+10 we see PID0 again but no changes occur.
>>>>> at t0+15 we start t1  and we have BF0{PID0, PID1}, BF1{}
>>>>> at t1+5 we see PID2, increment the rating metrics, and we have
>>> BF0{PID0,
>>>>> PID1}, BF1{PID2}
>>>>> at t1+6 we see PID0 again and no changes occur
>>>>> at t1+7 we see PID1 again and no changes occur
>>>>> at t1+15 we start a new window and dispose of BF0.  Thus we have
>>>>> BF1{PID2}, BF2{}
>>>>> at t2+1 we see PID3, increment the rating metrics,  and we have we have
>>>>> BF1{PID2}, BF2{PID3}
>>>>> at t2+6 we see PID0 again but now it is not in the list so we increment
>>>>> the rating metrics and add it BF1{PID2}, BF2{PID3, PID0}
>>>>> 
>>>>> But we just saw PID0 15 minutes ago.  Well within the 30 minute window
>>> we
>>>>> are trying to track.  Or am I missing something?  It seems like we
>>> need to
>>>>> add each PID to the last bloom filter
>>>>> 
>>>>> On Fri, Apr 12, 2024 at 2:45 PM Claude Warren <cla...@apache.org>
>>> wrote:
>>>>> 
>>>>>> Initial code is available at
>>>>>> 
>>> https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java
>>>>>> 
>>>>>> On Tue, Apr 9, 2024 at 2:37 PM Claude Warren <cla...@apache.org>
>>> wrote:
>>>>>> 
>>>>>>> I should also note that the probability of false positives does not
>>> fall
>>>>>>> below shape.P because as it approaches shape.P a new layer is
>>> created and
>>>>>>> filters are added to that.  So no layer in the LayeredBloomFilter
>>> exceeds
>>>>>>> shape.P thus the entire filter does not exceed shape.P.
>>>>>>> 
>>>>>>> Claude
>>>>>>> 
>>>>>>> On Tue, Apr 9, 2024 at 2:26 PM Claude Warren <cla...@apache.org>
>>> wrote:
>>>>>>> 
>>>>>>>> The overall design for KIP-936 seems sound to me.  I would make the
>>>>>>>> following changes:
>>>>>>>> 
>>>>>>>> Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from
>>>>>>>> commons-collections v4.5
>>>>>>>> 
>>>>>>>> Define the producer.id.quota.window.size.seconds to be the length of
>>>>>>>> time that a Bloom filter of PIDs will exist.
>>>>>>>> Define a new configuration option "producer.id.quota.window.count"
>>> as
>>>>>>>> the number of windows active in window.size.seconds.
>>>>>>>> 
>>>>>>>> Define the "Shape" (See commons-collections bloomfilters v4.5) of
>>> the
>>>>>>>> bloom filter from the average number of PIDs expected in
>>>>>>>> window.size.seconds/window.count (call this N) and the probability
>>> of false
>>>>>>>> positives (call this P).  Due to the way the LayeredBloomFilter
>>> works the
>>>>>>>> number of items can be a lower number than the max.  I'll explain
>>> that in a
>>>>>>>> minute.
>>>>>>>> 
>>>>>>>> The LayeredBloomFilter implements the standard BloomFilter interface
>>>>>>>> but internally keeps an ordered list of filters (called layers)
>>> from oldest
>>>>>>>> created to newest.  It adds new layers when a specified Predicate
>>>>>>>> (checkExtend) returns true.  It will remove filters as defined by a
>>>>>>>> specified Consumer (filterCleanup).
>>>>>>>> 
>>>>>>>> Everytime a BloomFilter is merged into the LayeredBloomFilter the
>>>>>>>> filter checks to the "checkExtend" predicate.  If it fires the
>>>>>>>> "filterCleanup" is called to remove any layers that should be
>>> removed and a
>>>>>>>> new layer is added.
>>>>>>>> 
>>>>>>>> Define the layers of the LayeredBloomFilter to comprise a standard
>>>>>>>> BloomFilter and an associated expiration timestamp.
>>>>>>>> 
>>>>>>>> We can thus define
>>>>>>>> 
>>>>>>>>  - "checkExtend" to require a new layer window.size.seconds /
>>>>>>>>  window.count seconds or when the current layer contains shape.N
>>> items.
>>>>>>>>  - "filterCleanup" to start at the head of the list of layers and
>>>>>>>>  remove any expired filters, usually 0, every window.size.seconds
>>> 1,
>>>>>>>>  infrequently more than 1.
>>>>>>>> 
>>>>>>>> This system will correctly handle bursty loads.  There are 3 cases
>>> to
>>>>>>>> consider:
>>>>>>>> 
>>>>>>>>  1. If the producer is producing fewer than shape.N PIDs the layer
>>>>>>>>  will not fill up before the next layer is added.
>>>>>>>>  2. If the producer is producing shape.N PIDs the layer will be
>>>>>>>>  processed as either a 1 or a 3 depending on system timings.
>>>>>>>>  3. If the producer is producing more than shape.N PIDs the layer
>>>>>>>>  will fill up and a new layer will be created with an expiration
>>> timestamp
>>>>>>>>  window.size.seconds from when it was created.  This is the case
>>> that leads
>>>>>>>>  to the filterCleanup infrequently having more than 1 layer to
>>> remove.
>>>>>>>> 
>>>>>>>> The last case to consider is if a producer stops generating PIDs, in
>>>>>>>> this case we should walk the map of producers,  call
>>> "filterCleanup", and
>>>>>>>> then check to see if the LayeredBloomFilter is empty.  If so,
>>> remove it
>>>>>>>> from the map.  It will be empty if the producer has not produced a
>>> PID for
>>>>>>>> window.size.seconds.
>>>>>>>> 
>>>>>>>> I have this solution mostly coded, though I must admit I do not know
>>>>>>>> where to plugin the ProducerIdQuotaManager defined in the KIP
>>>>>>>> 
>>>>>>>> Claude
>>>>>>>> 
>>>>>>> 
>>> 
>>> 
>> 
>> --
>> LinkedIn: http://www.linkedin.com/in/claudewarren
>> 
> 
> 
> -- 
> LinkedIn: http://www.linkedin.com/in/claudewarren

Reply via email to