Hello Lari,
I've now gone through a bunch of rate limiting algorithms, along with dual
rate, dual bucket algorithm.
Reply inline.

On Tue, Nov 7, 2023 at 11:32 PM Lari Hotari <lhot...@apache.org> wrote:

>
> Bucket4J documentation gives some good ideas and it's shows how the
> token bucket algorithm could be varied. For example, the "Refill
> styles" section [1] is useful to read as an inspiration.
> In network routers, there's a concept of "dual token bucket"
> algorithms and by googling you can find both Cisco and Juniper
> documentation referencing this.
> I also asked ChatGPT-4 to explain "dual token bucket" algorithm [2].
>
> 1 - https://bucket4j.com/8.6.0/toc.html#refill-types
> 2 - https://chat.openai.com/share/d4f4f740-f675-4233-964e-2910a7c8ed24
>
>
While dual-rate dual token bucket looks promising, there is still some
challenge with respect to allowing a certain peak burst for/up to a bigger
duration. I am explaining it below:

Assume a 10MBps topic. Bursting support of 1.5x upto 2 minutes, once every
10 minute interval.

The first bucket has the capacity of the consistent, long-term peak that
the topic would observe, so basically -
`limit.capacity(10_000_000).refillGreedy(1_000_000, ofMillis(100))` . Here,
I am not refilling every milli because the topic will never receive uniform
homogenous traffic at a millisecond level. The pattern would always be a
batch of one or more messages in an instant, then nothing for next few
instants, then another batch of one or more messages and so on.
This first bucket is good enough to handle rate limiting in normal cases
without bursting support. This is also better than the current precise rate
limiter in pulsar which allows for hotspotting of produce within a second
as this one spreads the quota over 100ms time windows.

The second bucket would have to have a slower refill rate and also a less
granular refill rate - think of something like
`limit.capacity(5_000_000).refillGreedy(5_000_000, ofMinutes(10))`.
Now the problem with this approach is that it would allow only 1 second
worth of additional 5MBps on the topic every 10 minutes.

Suppose I change the refill rate to
`limit.capacity(5_000_000).refillGreedy(5_000_000, ofSeconds(1))` - then it
does allow additional 5MBps burst every second, but now there is no cap
(the 2 minute cap).

I will have to think this through and do some more math to figure out if
this is possible at all using dual-rate double token method. Inputs
welcomed.

>
> >>
> >> ok. btw. "metrics" doesn't necessarily mean providing the rate limiter
> >> metrics via Prometheus. There might be other ways to provide this
> >> information for components that could react to this.
> >> For example, it could a be system topic where these rate limiters emit
> events.
> >>
> >
> > Are there any other system topics than
> `tenent/namespace/__change_events` . While it's an improvement over
> querying metrics, it would still mean one consumer per namespace and would
> form a cyclic dependency - for example, in case a broker is degrading due
> to mis-use of bursting, it might lead to delays in the consumption of the
> event from the __change_events topic.
>
> The number of events is a fairly low volume so the possible
> degradation wouldn't necessarily impact the event communications for
> this purpose. I'm not sure if there are currently any public event
> topics in Pulsar that are supported in a way that an application could
> directly read from the topic.  However since this is an advanced case,
> I would assume that we don't need to focus on this in the first phase
> of improving rate limiters.
>
>
While the number of events in the system topic would be fairly low per
namespace, the issue is that this system topic lies on the same broker
where the actual topic/partitions exist and those partitions are leading to
degradation of this particular broker.



> I agree. It's better to exclude this from the discussion at the moment
> so that it doesn't cause confusion. Modifying the rate limits up and
> down automatically with some component could be considered out of
> scope in the first phase. However, that might be controversial since
> the externally observable behavior of rate limiting with bursting
> support seems to be behave in a way where the rate limit changes
> automatically. The "auto scaling" aspect of rate limiting, modifying
> the rate limits up and down, might be necessary as part of a rate
> limiter implementation eventually. More of that later.
>
Not to drag on this topic in this discussion, but doesn't this basically
mean that there is no "rate limiting" at all? Basically as good as setting
it `-1` and just let the hardware handle it to its best extent.


> >
> > The desired behavior in all three situations is to have a multiplier
> based bursting capability for a fixed duration. For example, it could be
> that a pulsar topic would be able to support 1.5x of the set quota for a
> burst duration of up to 5 minutes. There also needs to be a cooldown period
> in such a case that it would only accept one such burst every X minutes,
> say every 1 hour.
>
>
> Thanks for sharing this concrete example. It will help a lot when
> starting to design a solution which achieves this. I think that a
> token bucket algorithm based design can achieve something very close
> to what you are describing. In the first part I shared the references
> to Bucket4J's "refill styles" and the concept of "dual token bucket"
> algorithms.
> One possible solution is a dual token bucket where the first bucket
> takes care of the burst of up to 5 minutes. The second token bucket
> can cover the maximum rate of 1.5x. The cooldown period could be
> handled in a way where the token bucket fills up the bucket once in
> every 60 minutes. There could be implementation challenges related to
> adjusting the enforced rate up and down since as you had also
> observed, the token bucket algorithm will let all buffered tokens be
> used.
>

Agreed, there is a challenge, as it's not as straightforward as I've
demonstrated in the example above.


> > Suppose the SOP for bursting support is to support a burst of upto 1.5x
> produce rate for upto 10 minutes, once in a window of 1 hour.
> >
> > Now if a particular producer is trying to burst to or beyond 1.5x for
> more than 5 minutes - while we are not accepting the requests and the
> producer client is continuously going into timeouts, there should be a way
> to completely blacklist that producer so that even the client stops
> attempting to produce and we unload the topics all together - thus,
> releasing open connections and broker compute.
>
>
> It seems that this part of the requirements is more challenging to
> cover. We don't have to give up on trying to make progress on this
> side of your requirements. It could be helpful to do thought
> experiments on how some of the conditions could be detected. For
> example, how would you detect if a particular producer is going beyond
> the configured policy? It does seem intriguing to start adding the
> support to the rate limiting area. The rate limiter has the
> information about when bursting is actively happening. I would favor a
> solution where the rate limiter could only emit events and not make
> decisions about blocking a producer. However, I'm open to changing my
> opinion on that detail of the design if it turns out to be a common
> requirement, and it fits the overall architecture of the broker.
>

There is clearly a gap with respect to understanding what the common
requirements are. I am here speaking only of my requirements. Some of these
may contradict requirements of others who haven't spoken up so far. Would
we be changing the rate limiter once again when someone else comes up with
new requirements? For instance, the "exponential decay" mentioned by Rajan
is completely different from any token based approach.


>
> The information about timeouts is on the client side. Perhaps there
> are ways to pass the timeout configuration information along with the
> producer creation, for example. The Pulsar binary protocol could have
> some sort of clock sync so that client and broker clock difference
> could be calculated with sufficient accuracy. However, the broker
> cannot know when messages timeout that aren't at the broker at all. If
> the end-to-end latency of sent messages start approaching the timeout
> value, it's a good indication on the broker side that the client side
> is approaching the state where messages will start timing out.
>

I think there is a need for a new protocol message from server to client
indicating rate-limiting (say, a 429 response, with an optional
retry-after). That should handle a lot of these things. For instance, we
can then build configurable logic in the client to stop producing all
together after a sufficient number of 429 received from the server. There
can also be a "BLOCKED" protocol that the server could indicate to the
client.

>From our experience, inferring these things via client-broker clock sync
assumption would not hold true. We regularly notice clock differences
between clients and brokers in our setup.


> > What do you think about the quick solution of not sharing connections
> across producer objects? I can raise a github issue explaining the
> situation.
>
> It should be very straightforward to add a flag to producer and
> consumer builder to use an isolated (non-shared) broker connection.
> It's not optimal to add such flags, but I haven't heard of other ways
> to solve this challenge where the Pulsar binary protocol doesn't have
> to be modified or the backpressure solution revisited in the broker.
> The proper solution requires doing both, adding a permit-based flow
>

When you say - adding a permit-based flow control - even if this is
implemented, multiplexing is still an issue as the tcp/ip channel itself is
put on pause at the netty level. Is there any other way of rate limiting
and rejecting packets from a channel selectively so as to contain the rate
limiting effect only to the specific partition out of all the partitions
being shared in the channel?

When I was doing poller vs precise testing wrt CPU, network, broker
latencies etc, one of the reason precise was much more CPU efficient and
had minimal impact on broker latencies was due to the fact that the netty
channel was being paused precisely.


> control for producers and revisiting the broker side backpressure/flow
> control and therefore it won't happen quickly.
> Please go ahead and create a GH issue and share your context. That
> will be very helpful.
>
>
I will do so. Does this need a PIP? To reiterate, I will be opening a GH
issue on the lines of "don't share connection to the broker across producer
objects"



> On the lowest level of unit tests, it will be helpful to have a
> solution where the clock source used in unit test can be run quickly
> so that simulated scenarios don't take a long time to run, but could
> cover the essential features.
>
>
Agreed.


> It might be hard to get started on the rate limiter implementation by
> looking at the existing code. The reason of the double methods in the
> existing interface is due to it covering 2 completely different ways
> of handling the rate limiting and trying to handle that in a single
> concept. What is actually desired for at least the producing rate
> limiting is that it's an asynchronous rate limiting where any messages
> that have already arrived to the broker will be handled. The token
> count could go to a negative value when implementing this in the token
> bucket algorithm. If it goes below 0, the producers for the topic
> should be backpressured by toggling the auto-read state and it should
> schedule a job to resume the auto-reading after there are at least a
> configurable amount of tokens available. There should be no need to
> use the scheduler to add tokens to the bucket. Whenever tokens are
> used, the new token count can be calculated based on the time since
> tokens were last updated, by default the average max rate defines how
> many tokens are added. The token count cannot increase larger than the
> token bucket capacity. This is how simple a plain token bucket
>

Are you suggesting that we first go ahead and convert the rate limiter in
pulsar to a simple, single-token based approach?
I personally do not see any benefit in this apart from code refactoring.
The precise rate limiter is basically doing that already- all be it
refilling only every 1 second, rather than distributing the tokens across
that second.



> algorithm is. That could be a starting point until we start covering
> the advanced cases which require a dual token bucket algorithm.
> If someone has the bandwidth, it would be fine to start experimenting
> in this area with code to learn more.
>
>
I think this is a big assumption. Since this is a critical use case in my
organisation, I will have to contribute everything here myself. Now I do
understand that reviews can take time in the OSS world, but this can't be
left at "simple token based approach and then letting anyone pick and
explore to extend/enhance it to dual token approach". This probably is one
of the main reasons why pluggability is important here :)


-- 
Girish Sharma

Reply via email to