Hi Girish,

replies inline.

> > I'd pick the first bucket for handling the 10MB rate.
> > The capacity of the first bucket would be 15MB * 120=1800MB. The fill
> > would happen in special way. I'm not sure if Bucket4J has this at all.
> > So describing the way of adding tokens to the bucket: the tokens in
> > the bucket would remain the same when the rate is <10MB. As many
> >
>
> How is this special behavior (tokens in bucket remaining the same when rate
> is <10MB) achieved? I would assume that to even figure out that the rate is
> less than 10MB, there is some counter going around?

It's possible to be creative in implementing the token bucket algorithm.
New tokens will be added with the configured rate. When there actual
traffic rate is less than the token bucket token rate,
there will be left over tokens. One way to implement this is to
calculate the amount of new tokens
before using the tokens. The immediately used tokens are subtracted
from the new tokens and the left over tokens are added to the
separate "filling bucket". I explained earlier that this filling
bucket would be poured in the actual bucket every 10 minutes in the
example scenario.

>
>
> > tokens would be added to the bucket as are consumed by the actual
> > traffic. The left over tokens 10MB - actual rate would go to a
> > separate filling bucket that gets poured into the actual bucket every
> > 10 minutes.
> > This first bucket with this separate "filling bucket" would handle the
> > bursting up to 1800MB.
> >
>
> But this isn't the requirement? Let's assume that the actual traffic has
> been 5MB for a while and this 1800MB capacity bucket is all filled up now..
> What's the real use here for that at all?

How would the rate limiter know if 5MB traffic is degraded traffic
which would need to be allowed to burst?

> I think this approach of thinking about rate limiter - "earning the right
> to burst by letting tokens remain into the bucket, (by doing lower than
> 10MB for a while)" doesn't not fit well in a messaging use case in real
> world, or theoretic.

It might feel like it doesn't fit well, but this is how most rate
limiters work. The model works very well in practice.
Without "earning the right to burst", there would have to be some
other way to detect whether there's a need to burst.
The need to burst could be detected by calculating the time that the
message to be sent has been in queues until it's about to be sent.
In other words, from the end-to-end latency.

> For a 10MB topic, if the actual produce has been , say, 5MB for a long
> while, this shouldn't give the right to that topic to burst to 15MB for as
> much as tokens are present.. This is purely due to the fact that this will
> then start stressing the network and bookie disks.

Well, there's always an option to not configure bursting this way or
limiting the maximum rate of bursting, like it is possible in a dual
token bucket implementation.

> Imagine a 100 of such topics going around with similar configuration of
> fixed+burst limits and were doing way lower than the fixed rate for the
> past couple of hours. Now that they've earned enough tokens, if they all
> start bursting, this will bring down the system, which is probably not
> capable of supporting simultaneous peaks of all possible topics at all.

This is the challenge of capacity management and end-to-end flow
control and backpressure.
With proper system wide capacity management and end-to-end back
pressure, the system won't collapse.

As reference, let's take a look at the Confluent Kora paper [1]:
In 5.2.1 Back Pressure and Auto-Tuning:
"Backpressure is achieved via auto-tuning tenant quotas on the broker
such that the combined tenant usage remains below the broker-wide
limit. The tenant quotas are auto-tuned proportionally to their total
quota allocation on the broker. This mechanism ensures fair sharing of
resources among tenants during temporary overload and re-uses the
quota enforcement mechanism for backpressure."
"The broker-wide limits are generally defined by benchmarking brokers
across clouds. The CPU-related limit is unique because there is no
easy way to measure and attribute CPU usage to a tenant. Instead, the
quota is defned as the clock time the broker spends processing
requests and connections, and the safe limit is variable. So to
protect CPU, request backpressure is triggered when request queues
reache a certain threshold."
In 5.2.2 Dynamic Quota Management"
"A straightforward method for distributing tenant-level quotas among
the brokers hosting the tenant is to statically divide the quota
evenly across the brokers. This static approach, which was deployed
initially, worked reasonably well on lower subscribed clusters."...
"Kora addresses this issue by using a dynamic quota mechanism that
adjusts bandwidth distribution based on a tenant’s bandwidth
consumption. This is achieved through the use of a shared quota
service to manage quota distribution, a design similar to that used by
other storage systems "

1 - https://www.vldb.org/pvldb/vol16/p3822-povzner.pdf

Regarding system wide capacity management, the concept of dynamic
quota management is interesting. "The tenant quotas are auto-tuned
proportionally to their total quota allocation on the broker. This
mechanism ensures fair sharing of resources among tenants during
temporary overload and re-uses the quota enforcement mechanism for
backpressure."

I referred to useful resources about capacity management in one of my
previous emails. For example,"Amazon DynamoDB: A Scalable, Predictably
Performant, and
Fully Managed NoSQL Database Service" paper and presentation video [2]
is really useful. The "Global Admission Control" based on token bucket
based implementation and bursting solution is explained starting at
10:25 in the YouTube video [3].

2 - https://www.usenix.org/conference/atc22/presentation/elhemali
3 - https://www.youtube.com/watch?v=9AkgiEJ_dA4&t=625s

In Pulsar, we have "PIP 82: Tenant and namespace level rate limiting"
[4] which introduced the "resource group" concept. There is the
resourcegroup resource in the Admin REST API [5]. There's also a
resource called "resource-quota" in the Admin REST API [6], but that
seems to be abandoned.

4 - 
https://github.com/apache/pulsar/wiki/PIP-82:-Tenant-and-namespace-level-rate-limiting
5 - https://pulsar.apache.org/admin-rest-api/?version=3.1.1#tag/resourcegroups
6 - https://pulsar.apache.org/admin-rest-api/?version=3.1.1#tag/resource-quotas


> Now of course we can utilize a broker level fixed rate limiter to not allow
> the overall throughput of the system to go beyond a number, but at that
> point - all the earning semantic goes for a toss anyway since the behavior
> would be unknown wrt which topics are now going through with bursting and
> which are being blocked due to the broker level fixed rate limiting.
>
> As such, letting topics loose would not sit well with any sort of SLA
> guarantees to the end user.
>
> Moreover, contrary to the earning tokens logic, in reality a topic _should_
> be allowed to burst upto the SOP/SLA as soon as produce starts in the
> topic. It shouldn't _have_ to wait for tokens to fill up as it does
> below-fixed-rate for a while before it is allowed to burst. This is because
> there is no real benefit or reason to not let the topic do such as the
> hardware is already present and the topic is already provisioned
> (partitions, broker spread) accordingly, assuming the burst.
>
> In an algorithmic/academic/literature setting, token bucket sounds really
> promising.. but a platform with SLA to users would not run like that.

AWS DynamoDB uses token bucket based algorithms. Please check the
video I referred in one of the previous comments.
at 10:26 [1] "The simplest way to do this workload isolation is the
token bucket"
at 13:05 [2] "partitions will get throttled because they're constantly
running out of tokens from their bucket even though they had burst
capacity because after the burst burst capacity is over you had to
still take time to fill capacity in the token bucket while other
partitions actually had capacity in their token buckets so..."
at 13:53 [3] "GAC (Global Admission Control) is a service that builds
on the same token bucket mechanism so when a request router receives a
request it basically requests the GAC to kind of get some tokens and
once request router uses the local tokens to make admission control
decisions if it runs out of tokens it can request for more tokens"

1 - https://www.youtube.com/watch?v=9AkgiEJ_dA4&t=10m26s
2 - https://www.youtube.com/watch?v=9AkgiEJ_dA4&t=13m05s
3 - https://www.youtube.com/watch?v=9AkgiEJ_dA4&t=13m54s

Token bucket based solutions are widely used in the industry. I don't
see a reason why we should be choosing some other conceptual model as
the basis.
You can always explain a token bucket algorithm in a different way.
Someone might say that it's a sliding window based algorithm. The main
benefit of token bucket is the conceptual model which is very helpful
when a larger group of people, such as an opensource community, works
on a common solution. Concepts are the starting point for
abstractions. That's why it matters a lot.

>
>
>
> > In the current rate limiters in Pulsar, the implementation is not
> > optimized to how Pulsar uses rate limiting. There's no need to use a
> > scheduler for adding "permits" as it's called in the current rate
> > limiter. The new tokens to add can be calculated based on the elapsed
> > time. Resuming from the blocking state (auto read disabled -> enabled)
> > requires a scheduler. The duration of the pause should be calculated
> >
>
> The precise one already does this. There is no scheduler for adding the
> tokens, only scheduler to enable back the auto-read on netty channel.

Please check the source code again [1]. The scheduler is used to do
something that is equivalent of "adding the tokens".
The "createTask" method schedules the task to call the "renew" method.

1 - 
https://github.com/apache/pulsar/blob/3c067ce28025e116146977118312a1471ba284f5/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java#L260-L278

> It looks like your inclination is to simply remove the default "poller"
> implementation and just let precise one in the code base?

No it's not. It's about replacing the solution with an implementation
that is using the conceptual model of the token bucket. This will make
the implementation more maintainable and we will be able to clean up
the abstractions. Concepts and abstractions are the key of interface
design.
In the current state, the rate limiter is very hard to reason about.
There's no reason to keep it in the code base. I'll explain more about
that later.
Besides the maintainability, another clear problem in the current rate
limiter is the heavy usage of synchronization and the already
mentioned unnecessary use of the scheduler.

> This part is a bit unclear to me. When are you suggesting to enable back
> the auto-read on the channel? Ideally, it should open back up the next
> second. Time to the next second could vary from 1ms to 999ms.. I do not
> think that starting an on demand scheduler of delta ms would be accurate
> enough to achieve this.. A newly created scheduled task is anyways not this
> accurate.. With Java virtual threads coming up, I believe having one, or N,
> constantly running scheduled tasks of 1 second interval would not be heavy
> at all.
> In fact, even in Java 8, as per my benchmarks, it doesn't seem to take up
> much frames at all.

Optimally the pause would be such a duration that there's enough
tokens to send the next message.
One possible way to estimate the size of the next message is to use
the average size or a rolling average size as the basis of the
calculation.
It might be as short as the minimum time to pause for the timer.
Instead of using a generic scheduler for pausing, Netty's scheduler
should be used.
It is very efficient for high volumes. If there's a bottleneck, it's
possible to have some minimum duration to pause.

> "10 minutes" was just an example. Part of the problem is explained above..
> If a topic is allowed to just burst forever (irrespective of the fact that
> it has earned the tokens or not) - it will start competing with other
> topics that are also bursting. This is a problem from a capacity planning
> point of view. Generally this is how capacity planning works
>
>    1. We will first do redline testing to determine how much MBps a set of
>    single broker and an ensemble of bookies can achieve given a hardware
>    configuration of broker and bookies.
>    2. Assuming linear horizontal scaling, we then setup enough brokers and
>    bookies in order to sustain a required throughput in the cluster. Suppose,
>    for this example, the cluster can now do 300MBps
>
> Now, assuming 50% bursting is allowed in a topic - does that mean the sum
> of fixed rate of all topics in the cluster should not exceed 200MBps? - If
> yes - then there is literally no difference in this situation vs just
> having a 50% elevated fixed rate of all the topic with 0 bursting support.
> To actually take advantage of this bursting feature, the capacity reserved
> to handle topic bursts would not be equal to the allowed bursting, but much
> less. For example, in this cluster that can do 300MBps, we can let the sum
> of fixed rate of all topics go up to 270MBps and keep just 10% of the
> hardware (30MBps) for bursting needs. This way, we are able to sustain more
> topics in the same cluster, with support of burst. This would only work if
> all topics do not burst together at the same time - and that's where the
> need to restrict duration and frequency of burst comes into play.
>
> Ofcourse, there can be other ways to model this problem and requirement..
> and additional checks in place, but I believe this approach is the most
> deterministic approach with a good balance between efficient use of
> hardware and not being completely stringent wrt throughput on the topics.

I commented on these capacity management aspects in one of my previous
emails as well as in an earlier comment in this email.
Based on what you are describing as the scenario, I think you are
looking for system wide capacity management.
The Pulsar Resource Groups (PIP-82) [1] already has solutions in this
area and could be developed further.
AWS DynamoDb's Global Admission Control is a good example from the
industry how this problem was solve there [2].
It might be worth improving and updating the PIP-82 design based on
the learnings from Confluent Kora paper and AWS DynamoDB Admission
control paper and presentation.
PIP-82 focuses a lot on rate limiting. However, there's also a need
for something what Confluent Kora paper explains in "5.2.2 Dynamic
Quota Management" with  "The tenant quotas are auto-tuned
proportionally to their total quota allocation on the broker. This
mechanism ensures fair sharing of resources among tenants during
temporary overload and re-uses the quota enforcement mechanism for
backpressure."

1 - 
https://github.com/apache/pulsar/wiki/PIP-82:-Tenant-and-namespace-level-rate-limiting
2 - https://www.youtube.com/watch?v=9AkgiEJ_dA4&t=13m54s

> > The TCP/IP channel wouldn't be put on pause in that solution as the
> > primary way to handle flow control.
> > Yes, there are alternatives. It's the permit-based flow control. The
> > server would send out manageable amount of permits at a time. There
> > are challenges in making this work efficiently with low amounts of
> > memory. That's one of the tradeoffs.
> >
>
> Moreover, isn't this based on the assumption that the clients respect the
> tokens and actually behave in a good manner?
> If there is an implementation of a client that is not respecting the tokens
> (or, in this case, the lack of remaining tokens with the client), (very
> much possible as it can be an older java client itself) - then wouldn't the
> only way to actually throttle the client is to pause the TCP/IP channel?

Yes, it requires well behaving clients. In multiplexing pausing the
TCP/IP channel cannot be the primary way to handle flow control as we
have discussed. Pausing reads on the TCP/IP channel remains a way to
backpressure when it's really necessary.

> > I have described the benefits in some former comments. The code is not
> > maintainable at the moment other than by the people that have worked
> > with the code. Another point is getting rid of the separate "precise"
> > option. That leaks implementation details all the way to Pulsar users
> > that don't need to know what a "precise rate limiter" is. The default
> > Pulsar core rate limiter should do what it promises to do. That would
> >
>
> Is this even achievable? i.e. to completely remove the poller one and take
> away the knowledge about the precise one from users? This basically would
> then be a breaking change and would probably wait for pulsar 4.0.0 release?

It's not a breaking change to replace the existing rate limiters.
The rate limiter implementation is an internal implementation detail.
It would become a breaking change if the feature "contract" would be
changed drastically.
In this case, it's possible to make the "precise" option "precise" and
perhaps simulate the
way how the default rate limiter works.
I think that besides CPU consumption, the problem with the default
rate limiter is simply that it
contains a bug. The bug is in the tryAcquire method [1]. The default
handling should simply match what there is for
isDispatchOrPrecisePublishRateLimiter.
After this, the only difference between precise and default is in the
renew method where there's this calculation:
acquiredPermits = isDispatchOrPrecisePublishRateLimiter ? Math.max(0,
acquiredPermits - permits) : 0;
Again, setting to 0 doesn't make much sense to me. We could possibly
replace this with a solution that allows some bursting since that's
what I think that happens with the default implementation. It's
possible to check this in some test case or simulated scenario to see
that there's no breaking change aspects when the rate limiter gets
replaced. The new implementation can be compared to the old one.

1 - 
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java#L177-L204

>
>
> > be the starting point to match the existing feature set. After that
> > it's possible to make it the dual token bucket internally so that it's
> > possible to set a separate max rate since when the bucket is larger,
> > the bursts don't have a limit in a single token bucket (where the
> > bucket has a large capacity, therefore allows bursts when traffic
> > temporarily drops below configured rate). Yes, the precise rate
> > limiter is something like having a bucket capacity for 1 second and
> > that's why it doesn't allow bursts.
> >
>
> I believe here I (my organization) would have a difference in opinion due
> to the difference in goal and priority. I guess it's kind of a clash
> between your vision of the rate limiter in pulsar vs our goal right now and
> I am not sure how to proceed at this point.
> What I can promise is that I (my org) will take this to completion with
> respect to making the internal rate limiter supporting our use cases, but
> the way you envision it might not sit well with what we want and when we
> want it.

> I know and almost all OSS projects around the messaging world have very
> very basic rate limiters. That is exactly why I am originally requesting a
> pluggable rate limiter to support our use case as that's not a common thing
> to have (the level of complexity) in the OSS project itself. Although, I
> also acknowledge the need of a lot of refactoring and changes in the based
> design as we have been discussing here.

I appreciate your efforts on initiating PIP-310 and getting the ball
rolling. Since you are operating Pulsar at scale, your contributions
and feedback are very valuable in improving Pulsar's capacity manage.
I happen to have a different view of how a custom rate limiter
implemented with the possible pluggable interface could help with
overall capacity management in Pulsar.
We need to go beyond PIP-310 in solving multi-tenant capacity
management/SOP/SLA challenges with Pulsar. The resource groups work
started with PIP-81 is a good start point, but there's a need to
improve and revisit the design to be able to meet the competition, the
closed source Confluent Kora.

Thanks for providing such detailed and useful feedback! I think that
this has already been a valuable interaction.

The improvements happen one step at a time. We can make things happen
when we work together. I'm looking forward to that!

-Lari

Reply via email to