Hi Lari/Girish,

I am sorry for jumping late in the discussion but I would like to
acknowledge the requirement of pluggable publish rate-limiter and I had
also asked it during implementation of publish rate limiter as well. There
are trade-offs between different rate-limiter implementations based on
accuracy, n/w usage, simplification and user should be able to choose one
based on the requirement. However, we don't have correct and extensible
Publish rate limiter interface right now, and before making it pluggable we
have to make sure that it should support any type of implementation for
example: token based or sliding-window based throttling, support of various
decaying functions (eg: exponential decay:
https://en.wikipedia.org/wiki/Exponential_decay), etc.. I haven't seen such
interface details and design in the PIP:
https://github.com/apache/pulsar/pull/21399/. So, I would encourage to work
towards building pluggable rate-limiter but current PIP is not ready as it
doesn't cover such generic interfaces that can support different types of


On Tue, Nov 7, 2023 at 10:02 AM Lari Hotari <lhot...@apache.org> wrote:

> Hi Girish,
> Replies inline.
> On Tue, 7 Nov 2023 at 15:26, Girish Sharma <scrapmachi...@gmail.com>
> wrote:
> >
> > Hello Lari, replies inline.
> >
> > I will also be going through some textbook rate limiters (the one you
> shared, plus others) and propose the one that at least suits our needs in
> the next reply.
> sounds good. I've been also trying to find more rate limiter resources
> that could be useful for our design.
> Bucket4J documentation gives some good ideas and it's shows how the
> token bucket algorithm could be varied. For example, the "Refill
> styles" section [1] is useful to read as an inspiration.
> In network routers, there's a concept of "dual token bucket"
> algorithms and by googling you can find both Cisco and Juniper
> documentation referencing this.
> I also asked ChatGPT-4 to explain "dual token bucket" algorithm [2].
> 1 - https://bucket4j.com/8.6.0/toc.html#refill-types
> 2 - https://chat.openai.com/share/d4f4f740-f675-4233-964e-2910a7c8ed24
> >>
> >> It is bi-weekly on Thursdays. The meeting calendar, zoom link and
> >> meeting notes can be found at
> >> https://github.com/apache/pulsar/wiki/Community-Meetings .
> >>
> >
> > Would it make sense for me to join this time given that you are skipping
> it?
> Yes, it's worth joining regularly when one is participating in Pulsar
> core development. There's usually a chance to discuss all topics that
> Pulsar community members bring up to discussion. A few times there
> haven't been any participants and in that case, it's good to ask on
> the #dev channel on Pulsar Slack whether others are joining the
> meeting.
> >>
> >> ok. btw. "metrics" doesn't necessarily mean providing the rate limiter
> >> metrics via Prometheus. There might be other ways to provide this
> >> information for components that could react to this.
> >> For example, it could a be system topic where these rate limiters emit
> events.
> >>
> >
> > Are there any other system topics than
> `tenent/namespace/__change_events` . While it's an improvement over
> querying metrics, it would still mean one consumer per namespace and would
> form a cyclic dependency - for example, in case a broker is degrading due
> to mis-use of bursting, it might lead to delays in the consumption of the
> event from the __change_events topic.
> The number of events is a fairly low volume so the possible
> degradation wouldn't necessarily impact the event communications for
> this purpose. I'm not sure if there are currently any public event
> topics in Pulsar that are supported in a way that an application could
> directly read from the topic.  However since this is an advanced case,
> I would assume that we don't need to focus on this in the first phase
> of improving rate limiters.
> >
> >
> >> I agree. I just brought up this example to ensure that your
> >> expectation about bursting isn't about controlling the rate limits
> >> based on situational information, such as end-to-end latency
> >> information.
> >> Such a feature could be useful, but it does complicate things.
> >> However, I think it's good to keep this on the radar since this might
> >> be needed to solve some advanced use cases.
> >>
> >
> > I still envision auto-scaling to be admin API driven rather than produce
> throughput driven. That way, it remains deterministic in nature. But it
> probably doesn't make sense to even talk about it until (partition)
> scale-down is possible.
> I agree. It's better to exclude this from the discussion at the moment
> so that it doesn't cause confusion. Modifying the rate limits up and
> down automatically with some component could be considered out of
> scope in the first phase. However, that might be controversial since
> the externally observable behavior of rate limiting with bursting
> support seems to be behave in a way where the rate limit changes
> automatically. The "auto scaling" aspect of rate limiting, modifying
> the rate limits up and down, might be necessary as part of a rate
> limiter implementation eventually. More of that later.
> >
> > In all of the 3 cases that I listed, the current behavior, with precise
> rate limiting enabled, is to pause the netty channel in case the throughput
> breaches the set limits. This eventually leads to timeout at the client
> side in case the burst is significantly greater than the configured timeout
> on the producer side.
> Makes sense.
> >
> > The desired behavior in all three situations is to have a multiplier
> based bursting capability for a fixed duration. For example, it could be
> that a pulsar topic would be able to support 1.5x of the set quota for a
> burst duration of up to 5 minutes. There also needs to be a cooldown period
> in such a case that it would only accept one such burst every X minutes,
> say every 1 hour.
> Thanks for sharing this concrete example. It will help a lot when
> starting to design a solution which achieves this. I think that a
> token bucket algorithm based design can achieve something very close
> to what you are describing. In the first part I shared the references
> to Bucket4J's "refill styles" and the concept of "dual token bucket"
> algorithms.
> One possible solution is a dual token bucket where the first bucket
> takes care of the burst of up to 5 minutes. The second token bucket
> can cover the maximum rate of 1.5x. The cooldown period could be
> handled in a way where the token bucket fills up the bucket once in
> every 60 minutes. There could be implementation challenges related to
> adjusting the enforced rate up and down since as you had also
> observed, the token bucket algorithm will let all buffered tokens be
> used.
> >
> >
> >>
> >>
> >> >    - In a visitor based produce rate (where produce rate goes up in
> the day
> >> >    and goes down in the night, think in terms of popular website
> hourly view
> >> >    counts pattern) , there are cases when, due to certain
> external/internal
> >> >    triggers, the views - and thus - the produce rate spikes for a few
> minutes.
> >>
> >> Again, please explain the current behavior and desired behavior.
> >> Explicit example values of number of messages, bandwidth, etc. would
> >> also be helpful details.
> >
> >
> > Adding to what I wrote above, think of this pattern like the following:
> the produce rate slowly increases from ~2MBps at around 4 AM to a known
> peak of about 30MBps by 4 PM and stays around that peak until 9 PM after
> which is again starts decreasing until it reaches ~2MBps around 2 AM.
> > Now, due to some external triggers, maybe a scheduled sale event, at
> 10PM, the quota may spike up to 40MBps for 4-5 minutes and then again go
> back down to the usual ~20MBps . Here is a rough image showcasing the trend.
> >
> >
> >>
> >>
> >> >    It is also important to keep this in check so as to not allow bots
> to do
> >> >    DDOS into your system, while that might be a responsibility of an
> upstream
> >> >    system like API gateway, but we cannot be ignorant about that
> completely.
> >>
> >> what would be the desired behavior?
> >
> >
> > The desired behavior is that the burst support should be short lived
> (5-10 minutes) and limited to a fixed number of bursts in a duration (say -
> 1 burst per hour). Obviously, all of these should be configurable, maybe at
> a broker level and not a topic level.
> That can be handled with a dual token bucket algorithm based rate limiter.
> >
> > I have yet to go in depth on both the textbook rate limiter models, but
> if token bucket works on buffered tokens, it may not be much useful.
> Consider a situation where the throughput is always at its absolute
> configured peak, thus not reserving tokens at all. Again - this is the
> understanding based on the above text. I am yet to go through the wiki page
> with full details. In any case, I am very sure that an absolute maximum
> bursting limit is needed per topic, since at the end of the day, we are
> limited by hardware resources like network bandwidth, disk bandwidth etc.
> Yes, if the actual throughput is always at the rate limiter's maximum
> average rate, the tokens wouldn't be reserved at all in the default
> token bucket algorithm. Is that a problem?
> It might not be a problem at all. Spare tokens would be buffered
> whenever the throughput is lower and therefore bursting could help
> catch up to the average rate over a longer period of time.
> Btw. The challenge of no bursting when the actual throughput is
> already at the peak rate is the original reason why I introduced the
> concept of automatically adjusting the rate limit up and down based on
> end-to-end latency information. If the rate limiter had a feedback
> loop, it could know whether it needs to do bursting to prevent the
> latencies from increasing. That was one of the reasons why I was
> thinking about the automatic adjustments of the rate limit. However,
> it's better to keep that out of the first phase for now.
> I believe that we will find many ways to apply the token bucket
> algorithm to meet our requirements. A lot of flexibility can be added
> with dual token buckets and by introducing new ways of adding/filling
> tokens to the buckets. It's also possible to tweak a lot of small
> details if that is necessary, such as smoothening out burst rates and
> so on. However, it's better to keep things simple when possible.
> > Suppose the SOP for bursting support is to support a burst of upto 1.5x
> produce rate for upto 10 minutes, once in a window of 1 hour.
> >
> > Now if a particular producer is trying to burst to or beyond 1.5x for
> more than 5 minutes - while we are not accepting the requests and the
> producer client is continuously going into timeouts, there should be a way
> to completely blacklist that producer so that even the client stops
> attempting to produce and we unload the topics all together - thus,
> releasing open connections and broker compute.
> It seems that this part of the requirements is more challenging to
> cover. We don't have to give up on trying to make progress on this
> side of your requirements. It could be helpful to do thought
> experiments on how some of the conditions could be detected. For
> example, how would you detect if a particular producer is going beyond
> the configured policy? It does seem intriguing to start adding the
> support to the rate limiting area. The rate limiter has the
> information about when bursting is actively happening. I would favor a
> solution where the rate limiter could only emit events and not make
> decisions about blocking a producer. However, I'm open to changing my
> opinion on that detail of the design if it turns out to be a common
> requirement, and it fits the overall architecture of the broker.
> The information about timeouts is on the client side. Perhaps there
> are ways to pass the timeout configuration information along with the
> producer creation, for example. The Pulsar binary protocol could have
> some sort of clock sync so that client and broker clock difference
> could be calculated with sufficient accuracy. However, the broker
> cannot know when messages timeout that aren't at the broker at all. If
> the end-to-end latency of sent messages start approaching the timeout
> value, it's a good indication on the broker side that the client side
> is approaching the state where messages will start timing out.
> In general, I think we should keep these requirements covered in the
> design so that the rate limiter implementation could be extended to
> support the use case in a way or another in the later phases.
> >> > It's only async if the producers use `sendAsync` . Moreover, even if
> it is
> >> > async in nature, practically it ends up being quite linear and
> >> > well-homogenised. I am speaking from experience of running 1000s of
> >> > partitioned topics in production
> >>
> >> I'd assume that most high traffic use cases would use asynchronous
> >> sending in a way or another. If you'd be sending synchronously, it
> >> would be extremely inefficient if messages are sent one-by-one. The
> >> asyncronouscity could also come from multiple threads in an
> >> application.
> >>
> >> You are right that a well homogenised workload reduces the severity of
> >> the multiplexing problem. That's also why the problem of multiplexing
> >> hasn't been fixed since the problem isn't visibile and it's also very
> >> hard to observe.
> >> In a case where 2 producers with very different rate limiting options
> >> share a connection, it definetely is a problem in the current
> >
> >
> > Yes actually, I was talking to the team and we did observe a case where
> there was a client app with 2 producer objects writing to two different
> topics. When one of their topics was breaching quota, the other one was
> also observing rate limiting even though it was under quota. So eventually,
> this surely needs to be solved. One short term solution is to at least not
> share connections across producer objects. But I still feel it's out of
> scope of this discussion here.
> Thanks for sharing. The real stories from operating Pulsar at scale
> are always interesting to hear!
> > What do you think about the quick solution of not sharing connections
> across producer objects? I can raise a github issue explaining the
> situation.
> It should be very straightforward to add a flag to producer and
> consumer builder to use an isolated (non-shared) broker connection.
> It's not optimal to add such flags, but I haven't heard of other ways
> to solve this challenge where the Pulsar binary protocol doesn't have
> to be modified or the backpressure solution revisited in the broker.
> The proper solution requires doing both, adding a permit-based flow
> control for producers and revisiting the broker side backpressure/flow
> control and therefore it won't happen quickly.
> Please go ahead and create a GH issue and share your context. That
> will be very helpful.
> Some thoughts about proceeding to making things a reality so that it
> gets implemented.
> One important part of development will be how we could test or
> simulate a scenario and validate that the rate limiting algorithm
> makes sense.
> On the lowest level of unit tests, it will be helpful to have a
> solution where the clock source used in unit test can be run quickly
> so that simulated scenarios don't take a long time to run, but could
> cover the essential features.
> It might be hard to get started on the rate limiter implementation by
> looking at the existing code. The reason of the double methods in the
> existing interface is due to it covering 2 completely different ways
> of handling the rate limiting and trying to handle that in a single
> concept. What is actually desired for at least the producing rate
> limiting is that it's an asynchronous rate limiting where any messages
> that have already arrived to the broker will be handled. The token
> count could go to a negative value when implementing this in the token
> bucket algorithm. If it goes below 0, the producers for the topic
> should be backpressured by toggling the auto-read state and it should
> schedule a job to resume the auto-reading after there are at least a
> configurable amount of tokens available. There should be no need to
> use the scheduler to add tokens to the bucket. Whenever tokens are
> used, the new token count can be calculated based on the time since
> tokens were last updated, by default the average max rate defines how
> many tokens are added. The token count cannot increase larger than the
> token bucket capacity. This is how simple a plain token bucket
> algorithm is. That could be a starting point until we start covering
> the advanced cases which require a dual token bucket algorithm.
> If someone has the bandwidth, it would be fine to start experimenting
> in this area with code to learn more.
> After trying something out, it's easier to be more confident about the
> design of dual token bucket algorithm. That's one of the assumptions
> that I'm making that dual token bucket would be sufficient.
> This has been a great discussion so far and it feels like we are
> getting to the necessary level of detail.
> Thanks for sharing many details of your requirements and what you have
> experienced and learned when operating Pulsar at scale.
> I feel excited about the upcoming improvements for rate limiting in
> Pulsar. From my side I have limited availability for participating in
> this work for the next 2-3 weeks. After that, I'll be ready to help in
> making the improved rate limiting a reality.
> -Lari

Reply via email to