Initial code is available at https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java
On Tue, Apr 9, 2024 at 2:37 PM Claude Warren <cla...@apache.org> wrote: > I should also note that the probability of false positives does not fall > below shape.P because as it approaches shape.P a new layer is created and > filters are added to that. So no layer in the LayeredBloomFilter exceeds > shape.P thus the entire filter does not exceed shape.P. > > Claude > > On Tue, Apr 9, 2024 at 2:26 PM Claude Warren <cla...@apache.org> wrote: > >> The overall design for KIP-936 seems sound to me. I would make the >> following changes: >> >> Replace the "TimedBloomFilter" with a "LayeredBloomFilter" from >> commons-collections v4.5 >> >> Define the producer.id.quota.window.size.seconds to be the length of time >> that a Bloom filter of PIDs will exist. >> Define a new configuration option "producer.id.quota.window.count" as the >> number of windows active in window.size.seconds. >> >> Define the "Shape" (See commons-collections bloomfilters v4.5) of the >> bloom filter from the average number of PIDs expected in >> window.size.seconds/window.count (call this N) and the probability of false >> positives (call this P). Due to the way the LayeredBloomFilter works the >> number of items can be a lower number than the max. I'll explain that in a >> minute. >> >> The LayeredBloomFilter implements the standard BloomFilter interface but >> internally keeps an ordered list of filters (called layers) from oldest >> created to newest. It adds new layers when a specified Predicate >> (checkExtend) returns true. It will remove filters as defined by a >> specified Consumer (filterCleanup). >> >> Everytime a BloomFilter is merged into the LayeredBloomFilter the filter >> checks to the "checkExtend" predicate. If it fires the "filterCleanup" is >> called to remove any layers that should be removed and a new layer is added. >> >> Define the layers of the LayeredBloomFilter to comprise a standard >> BloomFilter and an associated expiration timestamp. >> >> We can thus define >> >> - "checkExtend" to require a new layer window.size.seconds / >> window.count seconds or when the current layer contains shape.N items. >> - "filterCleanup" to start at the head of the list of layers and >> remove any expired filters, usually 0, every window.size.seconds 1, >> infrequently more than 1. >> >> This system will correctly handle bursty loads. There are 3 cases to >> consider: >> >> 1. If the producer is producing fewer than shape.N PIDs the layer >> will not fill up before the next layer is added. >> 2. If the producer is producing shape.N PIDs the layer will be >> processed as either a 1 or a 3 depending on system timings. >> 3. If the producer is producing more than shape.N PIDs the layer will >> fill up and a new layer will be created with an expiration timestamp >> window.size.seconds from when it was created. This is the case that leads >> to the filterCleanup infrequently having more than 1 layer to remove. >> >> The last case to consider is if a producer stops generating PIDs, in this >> case we should walk the map of producers, call "filterCleanup", and then >> check to see if the LayeredBloomFilter is empty. If so, remove it from the >> map. It will be empty if the producer has not produced a PID for >> window.size.seconds. >> >> I have this solution mostly coded, though I must admit I do not know >> where to plugin the ProducerIdQuotaManager defined in the KIP >> >> Claude >> >