I think there is an issue in the KIP.

Basically the kip says, if the PID is found in either of the Bloom filters
then no action is taken
If the PID is not found then it is added and the quota rating metrics are
incremented.

In this case long running PIDs will be counted multiple times.

Let's assume a 30 minute window with 2 15-minute frames.  So for the first
15 minutes all PIDs are placed in the first Bloom filter and for the 2nd 15
minutes all new PIDs are placed in the second bloom filter.  At the 3rd 15
minutes the first filter is removed and a new empty one created.

Let's denote Bloom filters as BFn{} and indicate the contained pids between
the braces.


So at t0 lets insert PID0 and increment the rating metrics.  Thus we have
BF0{PID0}
at t0+5 let's insert PID1 and increment the rating metrics.  Thus we have
BF0{PID0, PID1}
at t0+10 we see PID0 again but no changes occur.
at t0+15 we start t1  and we have BF0{PID0, PID1}, BF1{}
at t1+5 we see PID2, increment the rating metrics, and we have BF0{PID0,
PID1}, BF1{PID2}
at t1+6 we see PID0 again and no changes occur
at t1+7 we see PID1 again and no changes occur
at t1+15 we start a new window and dispose of BF0.  Thus we have BF1{PID2},
BF2{}
at t2+1 we see PID3, increment the rating metrics,  and we have we have
BF1{PID2}, BF2{PID3}
at t2+6 we see PID0 again but now it is not in the list so we increment the
rating metrics and add it BF1{PID2}, BF2{PID3, PID0}

But we just saw PID0 15 minutes ago.  Well within the 30 minute window we
are trying to track.  Or am I missing something?  It seems like we need to
add each PID to the last bloom filter

On Fri, Apr 12, 2024 at 2:45 PM Claude Warren <cla...@apache.org> wrote:

> Initial code is available at
> https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java
>
> On Tue, Apr 9, 2024 at 2:37 PM Claude Warren <cla...@apache.org> wrote:
>
>> I should also note that the probability of false positives does not fall
>> below shape.P because as it approaches shape.P a new layer is created and
>> filters are added to that.  So no layer in the LayeredBloomFilter exceeds
>> shape.P thus the entire filter does not exceed shape.P.
>>
>> Claude
>>
>> On Tue, Apr 9, 2024 at 2:26 PM Claude Warren <cla...@apache.org> wrote:
>>
>>> The overall design for KIP-936 seems sound to me.  I would make the
>>> following changes:
>>>
>>> Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from
>>> commons-collections v4.5
>>>
>>> Define the producer.id.quota.window.size.seconds to be the length of
>>> time that a Bloom filter of PIDs will exist.
>>> Define a new configuration option "producer.id.quota.window.count" as
>>> the number of windows active in window.size.seconds.
>>>
>>> Define the "Shape" (See commons-collections bloomfilters v4.5) of the
>>> bloom filter from the average number of PIDs expected in
>>> window.size.seconds/window.count (call this N) and the probability of false
>>> positives (call this P).  Due to the way the LayeredBloomFilter works the
>>> number of items can be a lower number than the max.  I'll explain that in a
>>> minute.
>>>
>>> The LayeredBloomFilter implements the standard BloomFilter interface but
>>> internally keeps an ordered list of filters (called layers) from oldest
>>> created to newest.  It adds new layers when a specified Predicate
>>> (checkExtend) returns true.  It will remove filters as defined by a
>>> specified Consumer (filterCleanup).
>>>
>>> Everytime a BloomFilter is merged into the LayeredBloomFilter the filter
>>> checks to the "checkExtend" predicate.  If it fires the "filterCleanup" is
>>> called to remove any layers that should be removed and a new layer is added.
>>>
>>> Define the layers of the LayeredBloomFilter to comprise a standard
>>> BloomFilter and an associated expiration timestamp.
>>>
>>> We can thus define
>>>
>>>    - "checkExtend" to require a new layer window.size.seconds /
>>>    window.count seconds or when the current layer contains shape.N items.
>>>    - "filterCleanup" to start at the head of the list of layers and
>>>    remove any expired filters, usually 0, every window.size.seconds 1,
>>>    infrequently more than 1.
>>>
>>> This system will correctly handle bursty loads.  There are 3 cases to
>>> consider:
>>>
>>>    1. If the producer is producing fewer than shape.N PIDs the layer
>>>    will not fill up before the next layer is added.
>>>    2. If the producer is producing shape.N PIDs the layer will be
>>>    processed as either a 1 or a 3 depending on system timings.
>>>    3. If the producer is producing more than shape.N PIDs the layer
>>>    will fill up and a new layer will be created with an expiration timestamp
>>>    window.size.seconds from when it was created.  This is the case that 
>>> leads
>>>    to the filterCleanup infrequently having more than 1 layer to remove.
>>>
>>> The last case to consider is if a producer stops generating PIDs, in
>>> this case we should walk the map of producers,  call "filterCleanup", and
>>> then check to see if the LayeredBloomFilter is empty.  If so, remove it
>>> from the map.  It will be empty if the producer has not produced a PID for
>>> window.size.seconds.
>>>
>>> I have this solution mostly coded, though I must admit I do not know
>>> where to plugin the ProducerIdQuotaManager defined in the KIP
>>>
>>> Claude
>>>
>>

Reply via email to