Thanks Claude for the feedback and the raising this implementation to
Apache commons-collections.
I had a look into your layered bloom filter and at first glance, I think it
would be a better improvement, however, regarding the following suggestion

> By hashing the principal and PID into the filter as a single hash only
one Bloom filter is required.

I am not sure how this will work when we have different producer-id-rate
for different KafkaPrincipal as proposed in the KIP.
For example `userA` had producer-id-rate of 1000 per hour while `user2` has
a quota of 100 producer ids per hour. How will we configure the max entries
for the Shape?

The only thing that comes to my mind to maintain this desired behavior in
the KIP is to NOT hash PID with KafkaPrincipal and keep a
Map<KafkaPrincipal, LayeredBloomFilter>
then each one of these bloom filters is controlled with
`Shape(<PRODUER_ID_RATE_FOR_THIE_KafkaPrincipal>, 0.1)`.

Does that make sense? WDYT?

Also regarding the eviction function
> (evict function)
> The evict function will determine if it has been a minute since the last
> time we created a new layer, if so create a new layer and evict all layers
> older than 1 hour.  Since the layers are a time ordered list this is
simply
> removing the elderly layers from the front of the list.

Maybe am missing something here but I can't find anything in the
`LayerManager` code that point to how often will the eviction function
runs. Do you mean that the eviction function runs every minute? If so, can
we control this?

Cheers,
Omnia

On Wed, Jun 21, 2023 at 11:43 AM Claude Warren <cla...@xenei.com> wrote:

> I think that the either using a Stable bloom filter or a Layered bloom
> filter constructed as follows:
>
>
>    - Each layer is configured for the maximum number of principal-PID pairs
>    expected in a single minute.
>    - Expect 60 layers (one for each minute)
>    - If the layer becomes fully populated add another layer.
>    - When the time to insert into the current layer expires, remove the
>    layers that are older than an hour.
>
> This will provide a sliding window of one hour with the ability to handle
> bursts above the expected rate of inserts without additional false
> positives.
>
> By hashing the principal and PID into the filter as a single hash only one
> Bloom filter is required.
>
> The code I pointed to earlier uses the common-collections4 Bloom filter
> implementation.  So a rough pseudo code for the implementation is:
>
> Shape shap = Shape.fromNP( 1000, 0.1 ); // 1000 entries, 0.1 false positive
> rate
> LayeredBloomFilter filter = LayeredBloomFilter( shape, 60, evictFunc ); //
> 60 initial layers, eviction function.
>
> (on PID)
>
> long[2] buff = Murmur3Hash.hash128x64( String.format("%s%s", principal, PID
> ).getBytes(java.nio.charset.Charset.UTF8));
> Hasher hasher = new EnhancedDoubleHasher( buff[0], buff[1] );
> if (filter.contains(hasher)) {
>     // handle case where principal-pid was already seen
> }
> filter.merge( hasher ); // ensures that principal-pid remains in seen for
> the next hour.
>
> (evict function)
> The evict function will determine if it has been a minute since the last
> time we created a new layer, if so create a new layer and evict all layers
> older than 1 hour.  Since the layers are a time ordered list this is simply
> removing the elderly layers from the front of the list.
>
> if it has not been an hour and the current filter is full (e.g. has 1000
> entries) create a new layer
>
> This should be very fast and space efficient.
>
>
> On Wed, Jun 21, 2023 at 11:13 AM Claude Warren <cla...@xenei.com> wrote:
>
> > I have an implementation of a layered Bloom filter in [1] (note the
> > layered branch).  This should handle the layering Bloom filter and allow
> > for layers that
> >
> >    1. Do not become over populated and thus yield too many false
> >    positives.
> >    2. Expire and are removed automatically.
> >
> > The layered Bloom filter also does not need another thread to remove the
> > old filters as this is amortized across all the inserts.
> >
> > In addition, the number of Layered Bloom filter instances can be reduced
> > by hashing the Kafka Principal and the ID together into the Bloom filter
> to
> > look for.
> >
> > [1] https://github.com/Claudenw/BloomFilters/tree/layered
> >
> > On Sun, Jun 18, 2023 at 10:18 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
> > wrote:
> >
> >> Hi Haruki, Thanks for having a look at the KIP.
> >> > 1. Do you have any memory-footprint estimation for
> >> TimeControlledBloomFilter?
> >> I don't at the moment have any estimate as I don't have a full
> >> implementation of this one at the moment. I can work on one if it's
> >> required.
> >>
> >> > * If I read the KIP correctly, TimeControlledBloomFilter will be
> >> > allocated per KafkaPrincipal so the size should be reasonably small
> >> > considering clusters which have a large number of users.
> >> The Map stored in the cache has 2 dimensions one is vertical which is
> >> KafkaPrincipal (producers only) and the second horizontal which is the
> >> time
> >> of the windows.
> >> - Horizontally we will add only PIDs to the TimeControlledBloomFilter
> only
> >> if KafkaPrincipal didn't hit the quota and we control the bloom filter
> by
> >> time to expire the oldest set at some point when it's not needed
> anymore.
> >> - Vertically is the tricky one if the cluster has an insane number of
> >> KafkaPrincipals used for producing. And if the number of KafkaPrincipals
> >> is
> >> huge we can control the memory used by the cache by throttling more
> >> aggressively and I would argue that they will never going to be an
> insane
> >> number that could cause OOM.
> >>
> >>  >* i.e. What false-positive rate do you plan to choose as the default?
> >> Am planning on using 0.1 as default.
> >>
> >> > 2. What do you think about rotating windows on produce-requests
> arrival
> >> instead of scheduler?
> >> > * If we do rotation in scheduler threads, my concern is potential
> >> > scheduler threads occupation which could make other background tasks
> to
> >> > delay
> >> This is a valid concern. We can consider disposing of the oldest bloom
> >> when
> >> we add a new PID to the TimeControlledBloomFilter. However, I would
> still
> >> need a scheduler to clean up any inactive KafkaPrincipal from the cache
> >> layer `i.e. ProducerIdQuotaManagerCache`. Do you have the same concern
> >> about this one too?
> >>
> >> > 3. Why the default producer.id.quota.window.size.seconds is 1 hour?
> >> >  * Unlike other quota types (1 second)
> >> Mostly because 1 sec doesn't make sense for this type of quota.
> >> Misconfigured or misbehaving producers usually don't allocate new PIDs
> on
> >> the leader every sec but over a period of time.
> >>
> >> Thanks
> >>
> >> On Tue, Jun 6, 2023 at 5:21 PM Haruki Okada <ocadar...@gmail.com>
> wrote:
> >>
> >> > Hi, Omnia.
> >> >
> >> > Thanks for the KIP.
> >> > The feature sounds indeed helpful and the strategy to use bloom-filter
> >> > looks good.
> >> >
> >> > I have three questions:
> >> >
> >> > 1. Do you have any memory-footprint estimation
> >> > for TimeControlledBloomFilter?
> >> >     * If I read the KIP correctly, TimeControlledBloomFilter will be
> >> > allocated per KafkaPrincipal so the size should be reasonably small
> >> > considering clusters which have a large number of users.
> >> >     * i.e. What false-positive rate do you plan to choose as the
> >> default?
> >> > 2. What do you think about rotating windows on produce-requests
> arrival
> >> > instead of scheduler?
> >> >     * If we do rotation in scheduler threads, my concern is potential
> >> > scheduler threads occupation which could make other background tasks
> to
> >> > delay
> >> > 3. Why the default producer.id.quota.window.size.seconds is 1 hour?
> >> >     * Unlike other quota types (1 second)
> >> >
> >> > Thanks,
> >> >
> >> > 2023年6月6日(火) 23:55 Omnia Ibrahim <o.g.h.ibra...@gmail.com>:
> >> >
> >> > > Hi everyone,
> >> > > I want to start the discussion of the KIP-936 to throttle the number
> >> of
> >> > > active PIDs per KafkaPrincipal. The proposal is here
> >> > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
> >> > > <
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
> >> > > >
> >> > >
> >> > > Thanks for your time and feedback.
> >> > > Omnia
> >> > >
> >> >
> >> >
> >> > --
> >> > ========================
> >> > Okada Haruki
> >> > ocadar...@gmail.com
> >> > ========================
> >> >
> >>
> >
> >
> > --
> > LinkedIn: http://www.linkedin.com/in/claudewarren
> >
>
>
> --
> LinkedIn: http://www.linkedin.com/in/claudewarren
>

Reply via email to