Hi Girish,

Replies inline.

On Tue, 7 Nov 2023 at 15:26, Girish Sharma <scrapmachi...@gmail.com> wrote:
>
> Hello Lari, replies inline.
>
> I will also be going through some textbook rate limiters (the one you shared, 
> plus others) and propose the one that at least suits our needs in the next 
> reply.


sounds good. I've been also trying to find more rate limiter resources
that could be useful for our design.

Bucket4J documentation gives some good ideas and it's shows how the
token bucket algorithm could be varied. For example, the "Refill
styles" section [1] is useful to read as an inspiration.
In network routers, there's a concept of "dual token bucket"
algorithms and by googling you can find both Cisco and Juniper
documentation referencing this.
I also asked ChatGPT-4 to explain "dual token bucket" algorithm [2].

1 - https://bucket4j.com/8.6.0/toc.html#refill-types
2 - https://chat.openai.com/share/d4f4f740-f675-4233-964e-2910a7c8ed24

>>
>> It is bi-weekly on Thursdays. The meeting calendar, zoom link and
>> meeting notes can be found at
>> https://github.com/apache/pulsar/wiki/Community-Meetings .
>>
>
> Would it make sense for me to join this time given that you are skipping it?

Yes, it's worth joining regularly when one is participating in Pulsar
core development. There's usually a chance to discuss all topics that
Pulsar community members bring up to discussion. A few times there
haven't been any participants and in that case, it's good to ask on
the #dev channel on Pulsar Slack whether others are joining the
meeting.

>>
>> ok. btw. "metrics" doesn't necessarily mean providing the rate limiter
>> metrics via Prometheus. There might be other ways to provide this
>> information for components that could react to this.
>> For example, it could a be system topic where these rate limiters emit 
>> events.
>>
>
> Are there any other system topics than `tenent/namespace/__change_events` . 
> While it's an improvement over querying metrics, it would still mean one 
> consumer per namespace and would form a cyclic dependency - for example, in 
> case a broker is degrading due to mis-use of bursting, it might lead to 
> delays in the consumption of the event from the __change_events topic.

The number of events is a fairly low volume so the possible
degradation wouldn't necessarily impact the event communications for
this purpose. I'm not sure if there are currently any public event
topics in Pulsar that are supported in a way that an application could
directly read from the topic.  However since this is an advanced case,
I would assume that we don't need to focus on this in the first phase
of improving rate limiters.

>
>
>> I agree. I just brought up this example to ensure that your
>> expectation about bursting isn't about controlling the rate limits
>> based on situational information, such as end-to-end latency
>> information.
>> Such a feature could be useful, but it does complicate things.
>> However, I think it's good to keep this on the radar since this might
>> be needed to solve some advanced use cases.
>>
>
> I still envision auto-scaling to be admin API driven rather than produce 
> throughput driven. That way, it remains deterministic in nature. But it 
> probably doesn't make sense to even talk about it until (partition) 
> scale-down is possible.


I agree. It's better to exclude this from the discussion at the moment
so that it doesn't cause confusion. Modifying the rate limits up and
down automatically with some component could be considered out of
scope in the first phase. However, that might be controversial since
the externally observable behavior of rate limiting with bursting
support seems to be behave in a way where the rate limit changes
automatically. The "auto scaling" aspect of rate limiting, modifying
the rate limits up and down, might be necessary as part of a rate
limiter implementation eventually. More of that later.

>
> In all of the 3 cases that I listed, the current behavior, with precise rate 
> limiting enabled, is to pause the netty channel in case the throughput 
> breaches the set limits. This eventually leads to timeout at the client side 
> in case the burst is significantly greater than the configured timeout on the 
> producer side.


Makes sense.

>
> The desired behavior in all three situations is to have a multiplier based 
> bursting capability for a fixed duration. For example, it could be that a 
> pulsar topic would be able to support 1.5x of the set quota for a burst 
> duration of up to 5 minutes. There also needs to be a cooldown period in such 
> a case that it would only accept one such burst every X minutes, say every 1 
> hour.


Thanks for sharing this concrete example. It will help a lot when
starting to design a solution which achieves this. I think that a
token bucket algorithm based design can achieve something very close
to what you are describing. In the first part I shared the references
to Bucket4J's "refill styles" and the concept of "dual token bucket"
algorithms.
One possible solution is a dual token bucket where the first bucket
takes care of the burst of up to 5 minutes. The second token bucket
can cover the maximum rate of 1.5x. The cooldown period could be
handled in a way where the token bucket fills up the bucket once in
every 60 minutes. There could be implementation challenges related to
adjusting the enforced rate up and down since as you had also
observed, the token bucket algorithm will let all buffered tokens be
used.

>
>
>>
>>
>> >    - In a visitor based produce rate (where produce rate goes up in the day
>> >    and goes down in the night, think in terms of popular website hourly 
>> > view
>> >    counts pattern) , there are cases when, due to certain external/internal
>> >    triggers, the views - and thus - the produce rate spikes for a few 
>> > minutes.
>>
>> Again, please explain the current behavior and desired behavior.
>> Explicit example values of number of messages, bandwidth, etc. would
>> also be helpful details.
>
>
> Adding to what I wrote above, think of this pattern like the following: the 
> produce rate slowly increases from ~2MBps at around 4 AM to a known peak of 
> about 30MBps by 4 PM and stays around that peak until 9 PM after which is 
> again starts decreasing until it reaches ~2MBps around 2 AM.
> Now, due to some external triggers, maybe a scheduled sale event, at 10PM, 
> the quota may spike up to 40MBps for 4-5 minutes and then again go back down 
> to the usual ~20MBps . Here is a rough image showcasing the trend.
>
>
>>
>>
>> >    It is also important to keep this in check so as to not allow bots to do
>> >    DDOS into your system, while that might be a responsibility of an 
>> > upstream
>> >    system like API gateway, but we cannot be ignorant about that 
>> > completely.
>>
>> what would be the desired behavior?
>
>
> The desired behavior is that the burst support should be short lived (5-10 
> minutes) and limited to a fixed number of bursts in a duration (say - 1 burst 
> per hour). Obviously, all of these should be configurable, maybe at a broker 
> level and not a topic level.


That can be handled with a dual token bucket algorithm based rate limiter.

>
> I have yet to go in depth on both the textbook rate limiter models, but if 
> token bucket works on buffered tokens, it may not be much useful. Consider a 
> situation where the throughput is always at its absolute configured peak, 
> thus not reserving tokens at all. Again - this is the understanding based on 
> the above text. I am yet to go through the wiki page with full details. In 
> any case, I am very sure that an absolute maximum bursting limit is needed 
> per topic, since at the end of the day, we are limited by hardware resources 
> like network bandwidth, disk bandwidth etc.


Yes, if the actual throughput is always at the rate limiter's maximum
average rate, the tokens wouldn't be reserved at all in the default
token bucket algorithm. Is that a problem?
It might not be a problem at all. Spare tokens would be buffered
whenever the throughput is lower and therefore bursting could help
catch up to the average rate over a longer period of time.

Btw. The challenge of no bursting when the actual throughput is
already at the peak rate is the original reason why I introduced the
concept of automatically adjusting the rate limit up and down based on
end-to-end latency information. If the rate limiter had a feedback
loop, it could know whether it needs to do bursting to prevent the
latencies from increasing. That was one of the reasons why I was
thinking about the automatic adjustments of the rate limit. However,
it's better to keep that out of the first phase for now.

I believe that we will find many ways to apply the token bucket
algorithm to meet our requirements. A lot of flexibility can be added
with dual token buckets and by introducing new ways of adding/filling
tokens to the buckets. It's also possible to tweak a lot of small
details if that is necessary, such as smoothening out burst rates and
so on. However, it's better to keep things simple when possible.


> Suppose the SOP for bursting support is to support a burst of upto 1.5x 
> produce rate for upto 10 minutes, once in a window of 1 hour.
>
> Now if a particular producer is trying to burst to or beyond 1.5x for more 
> than 5 minutes - while we are not accepting the requests and the producer 
> client is continuously going into timeouts, there should be a way to 
> completely blacklist that producer so that even the client stops attempting 
> to produce and we unload the topics all together - thus, releasing open 
> connections and broker compute.


It seems that this part of the requirements is more challenging to
cover. We don't have to give up on trying to make progress on this
side of your requirements. It could be helpful to do thought
experiments on how some of the conditions could be detected. For
example, how would you detect if a particular producer is going beyond
the configured policy? It does seem intriguing to start adding the
support to the rate limiting area. The rate limiter has the
information about when bursting is actively happening. I would favor a
solution where the rate limiter could only emit events and not make
decisions about blocking a producer. However, I'm open to changing my
opinion on that detail of the design if it turns out to be a common
requirement, and it fits the overall architecture of the broker.

The information about timeouts is on the client side. Perhaps there
are ways to pass the timeout configuration information along with the
producer creation, for example. The Pulsar binary protocol could have
some sort of clock sync so that client and broker clock difference
could be calculated with sufficient accuracy. However, the broker
cannot know when messages timeout that aren't at the broker at all. If
the end-to-end latency of sent messages start approaching the timeout
value, it's a good indication on the broker side that the client side
is approaching the state where messages will start timing out.

In general, I think we should keep these requirements covered in the
design so that the rate limiter implementation could be extended to
support the use case in a way or another in the later phases.


>> > It's only async if the producers use `sendAsync` . Moreover, even if it is
>> > async in nature, practically it ends up being quite linear and
>> > well-homogenised. I am speaking from experience of running 1000s of
>> > partitioned topics in production
>>
>> I'd assume that most high traffic use cases would use asynchronous
>> sending in a way or another. If you'd be sending synchronously, it
>> would be extremely inefficient if messages are sent one-by-one. The
>> asyncronouscity could also come from multiple threads in an
>> application.
>>
>> You are right that a well homogenised workload reduces the severity of
>> the multiplexing problem. That's also why the problem of multiplexing
>> hasn't been fixed since the problem isn't visibile and it's also very
>> hard to observe.
>> In a case where 2 producers with very different rate limiting options
>> share a connection, it definetely is a problem in the current
>
>
> Yes actually, I was talking to the team and we did observe a case where there 
> was a client app with 2 producer objects writing to two different topics. 
> When one of their topics was breaching quota, the other one was also 
> observing rate limiting even though it was under quota. So eventually, this 
> surely needs to be solved. One short term solution is to at least not share 
> connections across producer objects. But I still feel it's out of scope of 
> this discussion here.

Thanks for sharing. The real stories from operating Pulsar at scale
are always interesting to hear!

> What do you think about the quick solution of not sharing connections across 
> producer objects? I can raise a github issue explaining the situation.

It should be very straightforward to add a flag to producer and
consumer builder to use an isolated (non-shared) broker connection.
It's not optimal to add such flags, but I haven't heard of other ways
to solve this challenge where the Pulsar binary protocol doesn't have
to be modified or the backpressure solution revisited in the broker.
The proper solution requires doing both, adding a permit-based flow
control for producers and revisiting the broker side backpressure/flow
control and therefore it won't happen quickly.
Please go ahead and create a GH issue and share your context. That
will be very helpful.

Some thoughts about proceeding to making things a reality so that it
gets implemented.

One important part of development will be how we could test or
simulate a scenario and validate that the rate limiting algorithm
makes sense.
On the lowest level of unit tests, it will be helpful to have a
solution where the clock source used in unit test can be run quickly
so that simulated scenarios don't take a long time to run, but could
cover the essential features.

It might be hard to get started on the rate limiter implementation by
looking at the existing code. The reason of the double methods in the
existing interface is due to it covering 2 completely different ways
of handling the rate limiting and trying to handle that in a single
concept. What is actually desired for at least the producing rate
limiting is that it's an asynchronous rate limiting where any messages
that have already arrived to the broker will be handled. The token
count could go to a negative value when implementing this in the token
bucket algorithm. If it goes below 0, the producers for the topic
should be backpressured by toggling the auto-read state and it should
schedule a job to resume the auto-reading after there are at least a
configurable amount of tokens available. There should be no need to
use the scheduler to add tokens to the bucket. Whenever tokens are
used, the new token count can be calculated based on the time since
tokens were last updated, by default the average max rate defines how
many tokens are added. The token count cannot increase larger than the
token bucket capacity. This is how simple a plain token bucket
algorithm is. That could be a starting point until we start covering
the advanced cases which require a dual token bucket algorithm.
If someone has the bandwidth, it would be fine to start experimenting
in this area with code to learn more.

After trying something out, it's easier to be more confident about the
design of dual token bucket algorithm. That's one of the assumptions
that I'm making that dual token bucket would be sufficient.

This has been a great discussion so far and it feels like we are
getting to the necessary level of detail.
Thanks for sharing many details of your requirements and what you have
experienced and learned when operating Pulsar at scale.
I feel excited about the upcoming improvements for rate limiting in
Pulsar. From my side I have limited availability for participating in
this work for the next 2-3 weeks. After that, I'll be ready to help in
making the improved rate limiting a reality.

-Lari

Reply via email to