Initial code is available at
https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java

On Tue, Apr 9, 2024 at 2:37 PM Claude Warren <cla...@apache.org> wrote:

> I should also note that the probability of false positives does not fall
> below shape.P because as it approaches shape.P a new layer is created and
> filters are added to that.  So no layer in the LayeredBloomFilter exceeds
> shape.P thus the entire filter does not exceed shape.P.
>
> Claude
>
> On Tue, Apr 9, 2024 at 2:26 PM Claude Warren <cla...@apache.org> wrote:
>
>> The overall design for KIP-936 seems sound to me.  I would make the
>> following changes:
>>
>> Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from
>> commons-collections v4.5
>>
>> Define the producer.id.quota.window.size.seconds to be the length of time
>> that a Bloom filter of PIDs will exist.
>> Define a new configuration option "producer.id.quota.window.count" as the
>> number of windows active in window.size.seconds.
>>
>> Define the "Shape" (See commons-collections bloomfilters v4.5) of the
>> bloom filter from the average number of PIDs expected in
>> window.size.seconds/window.count (call this N) and the probability of false
>> positives (call this P).  Due to the way the LayeredBloomFilter works the
>> number of items can be a lower number than the max.  I'll explain that in a
>> minute.
>>
>> The LayeredBloomFilter implements the standard BloomFilter interface but
>> internally keeps an ordered list of filters (called layers) from oldest
>> created to newest.  It adds new layers when a specified Predicate
>> (checkExtend) returns true.  It will remove filters as defined by a
>> specified Consumer (filterCleanup).
>>
>> Everytime a BloomFilter is merged into the LayeredBloomFilter the filter
>> checks to the "checkExtend" predicate.  If it fires the "filterCleanup" is
>> called to remove any layers that should be removed and a new layer is added.
>>
>> Define the layers of the LayeredBloomFilter to comprise a standard
>> BloomFilter and an associated expiration timestamp.
>>
>> We can thus define
>>
>>    - "checkExtend" to require a new layer window.size.seconds /
>>    window.count seconds or when the current layer contains shape.N items.
>>    - "filterCleanup" to start at the head of the list of layers and
>>    remove any expired filters, usually 0, every window.size.seconds 1,
>>    infrequently more than 1.
>>
>> This system will correctly handle bursty loads.  There are 3 cases to
>> consider:
>>
>>    1. If the producer is producing fewer than shape.N PIDs the layer
>>    will not fill up before the next layer is added.
>>    2. If the producer is producing shape.N PIDs the layer will be
>>    processed as either a 1 or a 3 depending on system timings.
>>    3. If the producer is producing more than shape.N PIDs the layer will
>>    fill up and a new layer will be created with an expiration timestamp
>>    window.size.seconds from when it was created.  This is the case that leads
>>    to the filterCleanup infrequently having more than 1 layer to remove.
>>
>> The last case to consider is if a producer stops generating PIDs, in this
>> case we should walk the map of producers,  call "filterCleanup", and then
>> check to see if the LayeredBloomFilter is empty.  If so, remove it from the
>> map.  It will be empty if the producer has not produced a PID for
>> window.size.seconds.
>>
>> I have this solution mostly coded, though I must admit I do not know
>> where to plugin the ProducerIdQuotaManager defined in the KIP
>>
>> Claude
>>
>

Reply via email to