Hi Claude, 
Thanks for the implementation of the LayeredBloomFilter in apache commons. 

> Define a new configuration option "producer.id.quota.window.count" as
> the number of windows active in window.size.seconds.
What is the different between “producer.id.quota.window.count” and 
producer.id.quota.window.num

> Basically the kip says, if the PID is found in either of the Bloom filters
> then no action is taken
> If the PID is not found then it is added and the quota rating metrics are
> incremented.
> In this case long running PIDs will be counted multiple times.

The PID is considered not encountered if both frames of the window don’t have 
it. If you checked the diagram of for `Caching layer to track active PIDs per 
KafkaPrincipal` you will see that each window will have 2 bloom layers and the 
first created one will be disposed only when we start the next window. Which 
means window2 is starting from the 2nd bloom. Basically the bloom filter in the 
KIP is trying to implement a sliding window pattern. 

>  think the question is not whether or not we have seen a given PID before
> but rather how many unique PIDs did the principal create in the last hour.
> Perhaps more exactly it is: did the Principal create more than X PIDS in
> the last Y time units?
We don’t really care about the count of unique PIDs per user. The KIP is trying 
to follow and build on top of ClientQuotaManager which already have a patter 
for throttling that the producer client is aware of so we don’t need to upgrade 
old clients for brokers to throttle them and they can respect the throttling. 

The pattern for throttling is that we record the activities by incrementing a 
metric sensor and only when we catch `QuotaViolationException` from the quota 
sensor we will be sending a throttleTimeMs to the client. 
For bandwidth throttling for example we increment the sensor by the size of the 
request. For PID the KIP is aiming to call 
`QuotaManagers::producerIdQuotaManager::maybeRecordAndGetThrottleTimeMs` to 
increment by +1 every time we encounter a new PID and if and if 
`Sensor::record` returned `QuotaViolationException` then we will send back to 
the producer the trolling time that the client should wait for before sending a 
new request with a new PID. 
I hope this make sense. 

> This question can be quickly answered by a CPC datasketch [1].  The
> solution would be something like:
> Break the Y time units into a set of Y' smaller partitions (e.g. 60
> 1-minute partitions for an hour).  Create a circular queue of Y' CPC
> datasketches for each principal.  Implement a queue entry selector based on
> the modulus of the system by the resolution of the Y' partitions. On each
> call:
I didn’t evaluate CPC datasketch or any counter solution as I explained above 
the aim is not to build a counter specially the Kafka Sensor can be enough to 
indicate if we are violating the quota or not. 

Thanks 
Omnia 

> On 15 Apr 2024, at 10:35, Claude Warren <cla...@apache.org> wrote:
> 
> After thinking about his KIP over the weekend I think that there is another
> lighter weight approach.
> 
> I think the question is not whether or not we have seen a given PID before
> but rather how many unique PIDs did the principal create in the last hour.
> Perhaps more exactly it is: did the Principal create more than X PIDS in
> the last Y time units?
> 
> This question can be quickly answered by a CPC datasketch [1].  The
> solution would be something like:
> Break the Y time units into a set of Y' smaller partitions (e.g. 60
> 1-minute partitions for an hour).  Create a circular queue of Y' CPC
> datasketches for each principal.  Implement a queue entry selector based on
> the modulus of the system by the resolution of the Y' partitions. On each
> call:
> 
> On queue entry selector change clear the CPC (makes it empty)
> Add the latest PID to the current queue entry.
> Sum up the CPCs and check if the max (or min) estimate of unique counts
> exceeds the limit for the user.
> 
> When the CPC returns a zero estimated count then the principal has gone
> away and the principal/CPC-queue pair can be removed from the tracking
> system.
> 
> I believe that this code solution is smaller and faster than the Bloom
> filter implementation.
> 
> [1] https://datasketches.apache.org/docs/CPC/CPC.html
> 
> 
> 
> On Fri, Apr 12, 2024 at 3:10 PM Claude Warren <cla...@apache.org> wrote:
> 
>> I think there is an issue in the KIP.
>> 
>> Basically the kip says, if the PID is found in either of the Bloom filters
>> then no action is taken
>> If the PID is not found then it is added and the quota rating metrics are
>> incremented.
>> 
>> In this case long running PIDs will be counted multiple times.
>> 
>> Let's assume a 30 minute window with 2 15-minute frames.  So for the first
>> 15 minutes all PIDs are placed in the first Bloom filter and for the 2nd 15
>> minutes all new PIDs are placed in the second bloom filter.  At the 3rd 15
>> minutes the first filter is removed and a new empty one created.
>> 
>> Let's denote Bloom filters as BFn{} and indicate the contained pids
>> between the braces.
>> 
>> 
>> So at t0 lets insert PID0 and increment the rating metrics.  Thus we have
>> BF0{PID0}
>> at t0+5 let's insert PID1 and increment the rating metrics.  Thus we have
>> BF0{PID0, PID1}
>> at t0+10 we see PID0 again but no changes occur.
>> at t0+15 we start t1  and we have BF0{PID0, PID1}, BF1{}
>> at t1+5 we see PID2, increment the rating metrics, and we have BF0{PID0,
>> PID1}, BF1{PID2}
>> at t1+6 we see PID0 again and no changes occur
>> at t1+7 we see PID1 again and no changes occur
>> at t1+15 we start a new window and dispose of BF0.  Thus we have
>> BF1{PID2}, BF2{}
>> at t2+1 we see PID3, increment the rating metrics,  and we have we have
>> BF1{PID2}, BF2{PID3}
>> at t2+6 we see PID0 again but now it is not in the list so we increment
>> the rating metrics and add it BF1{PID2}, BF2{PID3, PID0}
>> 
>> But we just saw PID0 15 minutes ago.  Well within the 30 minute window we
>> are trying to track.  Or am I missing something?  It seems like we need to
>> add each PID to the last bloom filter
>> 
>> On Fri, Apr 12, 2024 at 2:45 PM Claude Warren <cla...@apache.org> wrote:
>> 
>>> Initial code is available at
>>> https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java
>>> 
>>> On Tue, Apr 9, 2024 at 2:37 PM Claude Warren <cla...@apache.org> wrote:
>>> 
>>>> I should also note that the probability of false positives does not fall
>>>> below shape.P because as it approaches shape.P a new layer is created and
>>>> filters are added to that.  So no layer in the LayeredBloomFilter exceeds
>>>> shape.P thus the entire filter does not exceed shape.P.
>>>> 
>>>> Claude
>>>> 
>>>> On Tue, Apr 9, 2024 at 2:26 PM Claude Warren <cla...@apache.org> wrote:
>>>> 
>>>>> The overall design for KIP-936 seems sound to me.  I would make the
>>>>> following changes:
>>>>> 
>>>>> Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from
>>>>> commons-collections v4.5
>>>>> 
>>>>> Define the producer.id.quota.window.size.seconds to be the length of
>>>>> time that a Bloom filter of PIDs will exist.
>>>>> Define a new configuration option "producer.id.quota.window.count" as
>>>>> the number of windows active in window.size.seconds.
>>>>> 
>>>>> Define the "Shape" (See commons-collections bloomfilters v4.5) of the
>>>>> bloom filter from the average number of PIDs expected in
>>>>> window.size.seconds/window.count (call this N) and the probability of 
>>>>> false
>>>>> positives (call this P).  Due to the way the LayeredBloomFilter works the
>>>>> number of items can be a lower number than the max.  I'll explain that in 
>>>>> a
>>>>> minute.
>>>>> 
>>>>> The LayeredBloomFilter implements the standard BloomFilter interface
>>>>> but internally keeps an ordered list of filters (called layers) from 
>>>>> oldest
>>>>> created to newest.  It adds new layers when a specified Predicate
>>>>> (checkExtend) returns true.  It will remove filters as defined by a
>>>>> specified Consumer (filterCleanup).
>>>>> 
>>>>> Everytime a BloomFilter is merged into the LayeredBloomFilter the
>>>>> filter checks to the "checkExtend" predicate.  If it fires the
>>>>> "filterCleanup" is called to remove any layers that should be removed and 
>>>>> a
>>>>> new layer is added.
>>>>> 
>>>>> Define the layers of the LayeredBloomFilter to comprise a standard
>>>>> BloomFilter and an associated expiration timestamp.
>>>>> 
>>>>> We can thus define
>>>>> 
>>>>>   - "checkExtend" to require a new layer window.size.seconds /
>>>>>   window.count seconds or when the current layer contains shape.N items.
>>>>>   - "filterCleanup" to start at the head of the list of layers and
>>>>>   remove any expired filters, usually 0, every window.size.seconds 1,
>>>>>   infrequently more than 1.
>>>>> 
>>>>> This system will correctly handle bursty loads.  There are 3 cases to
>>>>> consider:
>>>>> 
>>>>>   1. If the producer is producing fewer than shape.N PIDs the layer
>>>>>   will not fill up before the next layer is added.
>>>>>   2. If the producer is producing shape.N PIDs the layer will be
>>>>>   processed as either a 1 or a 3 depending on system timings.
>>>>>   3. If the producer is producing more than shape.N PIDs the layer
>>>>>   will fill up and a new layer will be created with an expiration 
>>>>> timestamp
>>>>>   window.size.seconds from when it was created.  This is the case that 
>>>>> leads
>>>>>   to the filterCleanup infrequently having more than 1 layer to remove.
>>>>> 
>>>>> The last case to consider is if a producer stops generating PIDs, in
>>>>> this case we should walk the map of producers,  call "filterCleanup", and
>>>>> then check to see if the LayeredBloomFilter is empty.  If so, remove it
>>>>> from the map.  It will be empty if the producer has not produced a PID for
>>>>> window.size.seconds.
>>>>> 
>>>>> I have this solution mostly coded, though I must admit I do not know
>>>>> where to plugin the ProducerIdQuotaManager defined in the KIP
>>>>> 
>>>>> Claude
>>>>> 
>>>> 

Reply via email to