Quick note: I renamed the example code. It is now at https://github.com/Claudenw/kafka/blob/KIP-936/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManagerCache.java
On Thu, May 2, 2024 at 10:47 AM Claude Warren, Jr <claude.war...@aiven.io> wrote: > Igor, thanks for taking the time to look and to review the code. I > regret that I have not pushed the latest code, but I will do so and will > see what I can do about answering your Bloom filter related questions here. > > How would an operator know or decide to change the configuration >> for the number layers – producer.id.quota.cache.layer.count – >> e.g. increasing from 4 to 5; and why? >> Do we need a new metric to indicate that change could be useful? > > > In our usage the layered Bloom filter [6] retains the record of a PID for > producer.id.quota.window.size.seconds. It breaks that window down into > multiple fragments, so 4 layers = 15 minute fragments. It "forgets" a > fragment worth of data when the fragment has been around for > window.size.seconds. The layers will determine how big a chunk of time is > deleted at once. Changing the layers to 10 will yield 6 minute fragments, > 60 will yield 1 minute fragments and so on. There are other idiosyncrasies > that I will get into later. I would not set the value lower than 3. If > you find that there are multiple reports of new PIDs because on average > they only ping every 50 minutes it might make sense to use more layers. If > you use too many layers then there will only be one PID in each layer, and > at that point a simple list of Filters would be faster to search, but in > reality does not make sense. If you have two layers then recurring PIDs > will be recorded in both layers. > > Is producer.id.quota.cache.cleanup.scheduler.interval.ms a >> guaranteed interval, or rather simply a delay between cleanups? >> How did you decide on the default value of 10ms? > > > In the code this is not used. Cleanups are amortized across inserts to > keep the layers balanced. There is a thread that does a cleanup every > producer.id.quota.window.size.seconds / > producer.id.quota.cache.layer.count seconds to detect principals that are > no longer sending data. This is a reasonable frequency as it will align > well with when the layers actually expire. > > Under "New ProducerIdQuotaManagerCache", the documentation for >> the constructor params for ProducerIDQuotaManagerCache does not >> match the constructor signature. > > > Sorry, this is because I did not push the changes. The constructor is > ProducerIDQuotaManagerCache(Double falsePositiveRate, long ttl, int > layerCount). Where falsePositiveRate is the Bloom filter false positive > rate, ttl is producer.id.quota.window.size.seconds in milliseconds, and the > layerCount is the desired number of layers. > > Under "New ProducerIdQuotaManagerCache": >> public boolean track(KafkaPrincipal principal, int producerIdRate, long >> pid) >> How is producerIdRate used? The reference implementation Claude shared >> does not use it. >> >> https://github.com/Claudenw/kafka/blob/49b6eb0fb5cfaf19b072fd87986072a683ab976c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java > > > Again, sorry for not updating the code. The producer rate is used to > build a Bloom filter of the proper size. The producer rate is the number > of PIDs per hour expected to be created by the principal. The Bloom filter > shape [1] is determined by the expected number of PIDs per layer > (producerRate * seconds_per_hour / producer.id.quota.window.size.seconds / > producer.id.quota.cache.layer.count) and the falsePositiveRate from the > constructor. These values are used to call the Shape.fromNP() method. > This is the Shape of the Bloom filters in the layer. It is only used when > the principal is not found in the cache. Thomas Hurst has provided a web > page [5] where you can explore the interaction between number of items and > false positive rate. > > I could not find a description or definition for >> TimestampedBloomFilter, could we add that to the KIP? > > > I will add it. It is simply an implementation of WrappedBloomFilter [2] > that adds the timestamp for when the filter was created. > > LayeredBloomFilter will have a fixed size (right?), but some >> users (KafkaPrincipal) might only use a small number of PIDs. >> It it worth having a dual strategy, where we simply keep a Set of >> PIDs until we reach certain size where it pays off to use >> the LayeredBloomFilter? > > > Each principal has its own Layered Bloom filter. > > Here come the idiosyncrasies and benefits of the layered Bloom filter. > The layered Bloom filter can be thought of as a collection of bloom filters > that are queried to see if the item being searched for (target) has been > seen. There are a bunch of ways that the layered filter could be used. > You could have a layer for each storage location in a multiple location > storage engine for example. But in our case the layer signifies a starting > time fragment. That fragment will be at most > producer.id.quota.window.size.seconds / producer.id.quota.cache.layer.count > seconds long. The earliest layers are at the lower indices, the latest one > at the highest. > > In general, when an item is added to the Layered Bloom filter the > following processes take place: > > - old layers filters are removed using the LayerManager.Cleanup [3] > instance. > - a new layer is added if necessary / requested using > the LayerManager.ExtendCheck [4] and a Supplier<BloomFilter>. > - The target is added to the last layer. > > In our case, the Cleanup starts at layer 0 and removes any layer for which > the timestamp has expired. > The ExtendCheck adds a layer if the current layer is full or > producer.id.quota.window.size.seconds / producer.id.quota.cache.layer.count > seconds has expired. > When a new layer is created it is created with the timestamp of now + ttl > (ttl from the constructor) > > So principals that are expected to produce fewer PIDs have smaller Bloom > filters in the layers than principals that are expected to produce more > PIDs. > > Since the producerIdRate is an estimation we need to look at 3 cases: > > 1. The number of PIDs is equal to the producerIdRate. They layered > Bloom filter works as explained above. > 2. The number of PIDs is smaller than the producer Rate. The number > of layers may decrease depending on the frequency of PIDs. If there is one > pid every 15 minutes (assuming the default settings) then there would be 4 > layers each having one PID stored in it. If the production was bursty and > there were 4 pids and then nothing for 50 minutes there would only be 2 > layers. (the intervening layers would have been disposed of). > 3. The number of PIDS is larger than the producer Rate. Then the > ExtendCheck will detect when the last layer has recorded producerIdRate > entries and create a new layer. This has two effects. First, it means > that the false positive rate is not exceeded. Second, the number of layers > will grow as needed and will shrink back to the 4 layer size when the > excess PIDs are producer.id.quota.window.size.seconds old. > > So in summary: > > producer.id.quota.window.size.seconds determines how long the record of a > PID being used is retained, how long it takes for bursts of excessive PIDs > (extra layers) to be removed from the system. It influences the number of > bits in the Bloom filters > > producer.id.quota.cache.layer.count identifies the ideal number of > layers. It influences the number of bits in the Bloom filters. > > producerIdRate strongly influences the number of bits in the Bloom filters > but is only used when the principal is not already active in the cache. > > producer.id.quota.cache.false.positive.rate influences the number of bits > in the Bloom filters. > > My statement "If you have two layers then recurring PIDs will be recorded > in both layers." needs some explanation. Let's look at the normal > processing of our default configuration. > > A PID is passed to the track() method. > If the PID is not found in the Bloom filter it is added to the last layer > and the caller is signaled that the PID is new. > If the PID is found in the Bloom filter the caller is signaled that the > PID has been seen. > > in an hour the layer with the PID will be cleared from the layered Bloom > filter. > If the PID is reported again it will be seen as a new PID. > > However, if the PID is seen at a time beyond 15 minutes after the original > recording, that sighting is not recorded (because the layered Bloom filter > says that there). This means that when the layer with PID recording is > removed the PID will be considered new even though it was seen within the > last hour. To solve this problem, when a PID is seen we verify that it is > recorded in a layer that is included in the last logical layer time > window. A logical layer time window is the expected length of time for the > layer (producer.id.quota.window.size.seconds / > producer.id.quota.cache.layer.count seconds), this just means that we are > accounting for the cases where the number of layers has increased due to > the volume of PIDs. > > I hope this answers some questions and doesn't open up too many more, > Claude > > [1] > https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/Shape.html > [2] > https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/WrappedBloomFilter.html > [3] > https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayerManager.Cleanup.html > [4] > https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayerManager.ExtendCheck.html > [5] https://hur.st/bloomfilter/ > [6] > https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/bloomfilter/LayeredBloomFilter.html > > On Wed, May 1, 2024 at 4:42 PM Igor Soarez <i...@soarez.me> wrote: > >> Hi Omnia, Hi Claude, >> >> Thanks for putting this KIP together. >> This is an important unresolved issue in Kafka, >> which I have witnessed several times in production. >> >> Please see my questions below: >> >> 10 Given the goal is to prevent OOMs, do we also need to >> limit the number of KafkaPrincipals in use? >> >> 11. How would an operator know or decide to change the configuration >> for the number layers – producer.id.quota.cache.layer.count – >> e.g. increasing from 4 to 5; and why? >> Do we need a new metric to indicate that change could be useful? >> >> 12. Is producer.id.quota.cache.cleanup.scheduler.interval.ms a >> guaranteed interval, or rather simply a delay between cleanups? >> How did you decide on the default value of 10ms? >> >> 13. Under "New ProducerIdQuotaManagerCache", the documentation for >> the constructor params for ProducerIDQuotaManagerCache does not >> match the constructor signature. >> >> 14. Under "New ProducerIdQuotaManagerCache": >> public boolean track(KafkaPrincipal principal, int producerIdRate, long >> pid) >> How is producerIdRate used? The reference implementation Claude shared >> does not use it. >> >> https://github.com/Claudenw/kafka/blob/49b6eb0fb5cfaf19b072fd87986072a683ab976c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java >> >> 15. I could not find a description or definition for >> TimestampedBloomFilter, could we add that to the KIP? >> >> 16. LayeredBloomFilter will have a fixed size (right?), but some >> users (KafkaPrincipal) might only use a small number of PIDs. >> It it worth having a dual strategy, where we simply keep a Set of >> PIDs until we reach certain size where it pays off to use >> the LayeredBloomFilter? >> >> 17. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID >> requests", >> the KIP states: >> >> a. INIT_PRODUCER_ID for idempotent producer request PIDs from >> random controller every time so if a client got throttled on >> one controller doesn't guarantee it will not go through on next >> controller causing OOM at the leader later. >> >> Is the INIT_PRODUCER_ID request really sent to a "random controller"? >> From a quick look at Sender.maybeSendAndPollTransactionalRequest, >> for an idempotent producer, targetNode is set to the broker with >> fewest outstanding requests. Am I looking at the wrong place? >> >> 18. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID >> requests", >> the KIP states: >> >> This solution might look simple however throttling the INIT_PRODUCER_ID >> doesn't guarantee the OOM wouldn't happened as >> (...) >> b. The problem happened on the activation of the PID when it >> produce and not at the initialisation. Which means Kafka wouldn't >> have OOM problem if the producer got assigned PID but crashed before >> producing anything. >> >> Point b. does not seem to support the claim above? >> >> 19. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID >> requests", >> the KIP states: >> >> c. Throttling producers that crash between initialisation and >> producing could slow them down when they recover/fix the >> problem that caused them to crash right after initialising PID. >> >> Doesn't it depend on the back-off time or how quotas are enforced? >> I’m not sure this would necessarily be a problem? >> >> 20. If the allocation of PIDs for idempotent producers was >> centralized, or otherwise the the targetNode for that request >> was predictable, would that make throttling INIT_PRODUCER_ID >> a viable solution? >> >> >> Best, >> >> -- >> Igor >> >> >>