After thinking about his KIP over the weekend I think that there is another
lighter weight approach.

I think the question is not whether or not we have seen a given PID before
but rather how many unique PIDs did the principal create in the last hour.
Perhaps more exactly it is: did the Principal create more than X PIDS in
the last Y time units?

This question can be quickly answered by a CPC datasketch [1].  The
solution would be something like:
Break the Y time units into a set of Y' smaller partitions (e.g. 60
1-minute partitions for an hour).  Create a circular queue of Y' CPC
datasketches for each principal.  Implement a queue entry selector based on
the modulus of the system by the resolution of the Y' partitions. On each
call:

On queue entry selector change clear the CPC (makes it empty)
Add the latest PID to the current queue entry.
Sum up the CPCs and check if the max (or min) estimate of unique counts
exceeds the limit for the user.

When the CPC returns a zero estimated count then the principal has gone
away and the principal/CPC-queue pair can be removed from the tracking
system.

I believe that this code solution is smaller and faster than the Bloom
filter implementation.

[1] https://datasketches.apache.org/docs/CPC/CPC.html



On Fri, Apr 12, 2024 at 3:10 PM Claude Warren <cla...@apache.org> wrote:

> I think there is an issue in the KIP.
>
> Basically the kip says, if the PID is found in either of the Bloom filters
> then no action is taken
> If the PID is not found then it is added and the quota rating metrics are
> incremented.
>
> In this case long running PIDs will be counted multiple times.
>
> Let's assume a 30 minute window with 2 15-minute frames.  So for the first
> 15 minutes all PIDs are placed in the first Bloom filter and for the 2nd 15
> minutes all new PIDs are placed in the second bloom filter.  At the 3rd 15
> minutes the first filter is removed and a new empty one created.
>
> Let's denote Bloom filters as BFn{} and indicate the contained pids
> between the braces.
>
>
> So at t0 lets insert PID0 and increment the rating metrics.  Thus we have
> BF0{PID0}
> at t0+5 let's insert PID1 and increment the rating metrics.  Thus we have
> BF0{PID0, PID1}
> at t0+10 we see PID0 again but no changes occur.
> at t0+15 we start t1  and we have BF0{PID0, PID1}, BF1{}
> at t1+5 we see PID2, increment the rating metrics, and we have BF0{PID0,
> PID1}, BF1{PID2}
> at t1+6 we see PID0 again and no changes occur
> at t1+7 we see PID1 again and no changes occur
> at t1+15 we start a new window and dispose of BF0.  Thus we have
> BF1{PID2}, BF2{}
> at t2+1 we see PID3, increment the rating metrics,  and we have we have
> BF1{PID2}, BF2{PID3}
> at t2+6 we see PID0 again but now it is not in the list so we increment
> the rating metrics and add it BF1{PID2}, BF2{PID3, PID0}
>
> But we just saw PID0 15 minutes ago.  Well within the 30 minute window we
> are trying to track.  Or am I missing something?  It seems like we need to
> add each PID to the last bloom filter
>
> On Fri, Apr 12, 2024 at 2:45 PM Claude Warren <cla...@apache.org> wrote:
>
>> Initial code is available at
>> https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java
>>
>> On Tue, Apr 9, 2024 at 2:37 PM Claude Warren <cla...@apache.org> wrote:
>>
>>> I should also note that the probability of false positives does not fall
>>> below shape.P because as it approaches shape.P a new layer is created and
>>> filters are added to that.  So no layer in the LayeredBloomFilter exceeds
>>> shape.P thus the entire filter does not exceed shape.P.
>>>
>>> Claude
>>>
>>> On Tue, Apr 9, 2024 at 2:26 PM Claude Warren <cla...@apache.org> wrote:
>>>
>>>> The overall design for KIP-936 seems sound to me.  I would make the
>>>> following changes:
>>>>
>>>> Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from
>>>> commons-collections v4.5
>>>>
>>>> Define the producer.id.quota.window.size.seconds to be the length of
>>>> time that a Bloom filter of PIDs will exist.
>>>> Define a new configuration option "producer.id.quota.window.count" as
>>>> the number of windows active in window.size.seconds.
>>>>
>>>> Define the "Shape" (See commons-collections bloomfilters v4.5) of the
>>>> bloom filter from the average number of PIDs expected in
>>>> window.size.seconds/window.count (call this N) and the probability of false
>>>> positives (call this P).  Due to the way the LayeredBloomFilter works the
>>>> number of items can be a lower number than the max.  I'll explain that in a
>>>> minute.
>>>>
>>>> The LayeredBloomFilter implements the standard BloomFilter interface
>>>> but internally keeps an ordered list of filters (called layers) from oldest
>>>> created to newest.  It adds new layers when a specified Predicate
>>>> (checkExtend) returns true.  It will remove filters as defined by a
>>>> specified Consumer (filterCleanup).
>>>>
>>>> Everytime a BloomFilter is merged into the LayeredBloomFilter the
>>>> filter checks to the "checkExtend" predicate.  If it fires the
>>>> "filterCleanup" is called to remove any layers that should be removed and a
>>>> new layer is added.
>>>>
>>>> Define the layers of the LayeredBloomFilter to comprise a standard
>>>> BloomFilter and an associated expiration timestamp.
>>>>
>>>> We can thus define
>>>>
>>>>    - "checkExtend" to require a new layer window.size.seconds /
>>>>    window.count seconds or when the current layer contains shape.N items.
>>>>    - "filterCleanup" to start at the head of the list of layers and
>>>>    remove any expired filters, usually 0, every window.size.seconds 1,
>>>>    infrequently more than 1.
>>>>
>>>> This system will correctly handle bursty loads.  There are 3 cases to
>>>> consider:
>>>>
>>>>    1. If the producer is producing fewer than shape.N PIDs the layer
>>>>    will not fill up before the next layer is added.
>>>>    2. If the producer is producing shape.N PIDs the layer will be
>>>>    processed as either a 1 or a 3 depending on system timings.
>>>>    3. If the producer is producing more than shape.N PIDs the layer
>>>>    will fill up and a new layer will be created with an expiration 
>>>> timestamp
>>>>    window.size.seconds from when it was created.  This is the case that 
>>>> leads
>>>>    to the filterCleanup infrequently having more than 1 layer to remove.
>>>>
>>>> The last case to consider is if a producer stops generating PIDs, in
>>>> this case we should walk the map of producers,  call "filterCleanup", and
>>>> then check to see if the LayeredBloomFilter is empty.  If so, remove it
>>>> from the map.  It will be empty if the producer has not produced a PID for
>>>> window.size.seconds.
>>>>
>>>> I have this solution mostly coded, though I must admit I do not know
>>>> where to plugin the ProducerIdQuotaManager defined in the KIP
>>>>
>>>> Claude
>>>>
>>>

Reply via email to