I just want to add one thing to the mix here.

You can see by the amount of plugin interfaces Pulsar has, somebody "left
the door open" for too long.
You can agree with me that the number of those interfaces is not normal for
any open source software. I know HBase for example, or Kafka - never seen
so many in them.

You can also see the lack of attention to code quality and high level
overview by the poor implementation of current rate limiter.

The feeling is: I just need this tiny little thing and I don't have time -
so over time Pulsar got into this unmaintainable mess of public APIs and
some parts are simply unreadable - such as the rate limiters. I *still*
don't understand how rate limiting works in Pulsar, even when I read the
background  and browsed quickly through the code.

I can see the people on this thread are highly talented - let's use this to
make Pulsar better, both from a bird's-eye view and your own
personal requirement.


On Tue, Nov 7, 2023 at 3:26 PM Girish Sharma <scrapmachi...@gmail.com>
wrote:

> Hello Lari, replies inline.
>
> I will also be going through some textbook rate limiters (the one you
> shared, plus others) and propose the one that at least suits our needs in
> the next reply.
>
> On Tue, Nov 7, 2023 at 2:49 PM Lari Hotari <lhot...@apache.org> wrote:
>
>>
>> It is bi-weekly on Thursdays. The meeting calendar, zoom link and
>> meeting notes can be found at
>> https://github.com/apache/pulsar/wiki/Community-Meetings .
>>
>>
> Would it make sense for me to join this time given that you are skipping
> it?
>
>
>>
>> ok. btw. "metrics" doesn't necessarily mean providing the rate limiter
>> metrics via Prometheus. There might be other ways to provide this
>> information for components that could react to this.
>> For example, it could a be system topic where these rate limiters emit
>> events.
>>
>>
> Are there any other system topics than `tenent/namespace/__change_events`
> . While it's an improvement over querying metrics, it would still mean one
> consumer per namespace and would form a cyclic dependency - for example, in
> case a broker is degrading due to mis-use of bursting, it might lead to
> delays in the consumption of the event from the __change_events topic.
>
> I agree. I just brought up this example to ensure that your
>> expectation about bursting isn't about controlling the rate limits
>> based on situational information, such as end-to-end latency
>> information.
>> Such a feature could be useful, but it does complicate things.
>> However, I think it's good to keep this on the radar since this might
>> be needed to solve some advanced use cases.
>>
>>
> I still envision auto-scaling to be admin API driven rather than produce
> throughput driven. That way, it remains deterministic in nature. But it
> probably doesn't make sense to even talk about it until (partition)
> scale-down is possible.
>
>
>>
>> >    - A producer(s) is producing at a near constant rate into a topic,
>> with
>> >    equal distribution among partitions. Due to a hiccup in their
>> downstream
>> >    component, the produce rate goes to 0 for a few seconds, and thus, to
>> >    compensate, in the next few seconds, the produce rate tries to
>> double up.
>>
>> Could you also elaborate on details such as what is the current
>> behavior of Pulsar rate limiting / throttling solution and what would
>> be the desired behavior?
>> Just guessing that you mean that the desired behavior would be to
>> allow the produce rate to double up for some time (configurable)?
>> Compared to what rate is it doubled?
>> Please explain in detail what the current and desired behaviors would
>> be so that it's easier to understand the gap.
>>
>
> In all of the 3 cases that I listed, the current behavior, with precise
> rate limiting enabled, is to pause the netty channel in case the throughput
> breaches the set limits. This eventually leads to timeout at the client
> side in case the burst is significantly greater than the configured timeout
> on the producer side.
>
> The desired behavior in all three situations is to have a multiplier based
> bursting capability for a fixed duration. For example, it could be that a
> pulsar topic would be able to support 1.5x of the set quota for a burst
> duration of up to 5 minutes. There also needs to be a cooldown period in
> such a case that it would only accept one such burst every X minutes, say
> every 1 hour.
>
>
>>
>> >    - In a visitor based produce rate (where produce rate goes up in the
>> day
>> >    and goes down in the night, think in terms of popular website hourly
>> view
>> >    counts pattern) , there are cases when, due to certain
>> external/internal
>> >    triggers, the views - and thus - the produce rate spikes for a few
>> minutes.
>>
>> Again, please explain the current behavior and desired behavior.
>> Explicit example values of number of messages, bandwidth, etc. would
>> also be helpful details.
>>
>
> Adding to what I wrote above, think of this pattern like the following:
> the produce rate slowly increases from ~2MBps at around 4 AM to a known
> peak of about 30MBps by 4 PM and stays around that peak until 9 PM after
> which is again starts decreasing until it reaches ~2MBps around 2 AM.
> Now, due to some external triggers, maybe a scheduled sale event, at 10PM,
> the quota may spike up to 40MBps for 4-5 minutes and then again go back
> down to the usual ~20MBps . Here is a rough image showcasing the trend.
> [image: image.png]
>
>
>>
>> >    It is also important to keep this in check so as to not allow bots
>> to do
>> >    DDOS into your system, while that might be a responsibility of an
>> upstream
>> >    system like API gateway, but we cannot be ignorant about that
>> completely.
>>
>> what would be the desired behavior?
>>
>
> The desired behavior is that the burst support should be short lived (5-10
> minutes) and limited to a fixed number of bursts in a duration (say - 1
> burst per hour). Obviously, all of these should be configurable, maybe at a
> broker level and not a topic level.
>
>
>>
>> >    - In streaming systems, where there are micro batches, there might be
>> >    constant fluctuations in produce rate from time to time, based on
>> batch
>> >    failure or retries.
>>
>> could you share and examples with numbers about this use case too?
>> explaining current behavior and desired behavior?
>>
>
> It is very common in Direct Input Stream spark jobs to consume messages in
> micro batches, process the messages together and then move on to the next
> batch. Thus, it is not a uniform consume at a second or sub-minute level.
> Now, in this situation itself, if a few micro-batches fail due to hardware
> issues, the spark scheduler may schedule more than usual batches together
> after the issue is resolved, leading to increased consume.
> While I am talking mostly about consume, the same job's sink may also be a
> pulsar topic, leading to same produce pattern.
>
>
>
>>
>>
>> >
>> > In all of these situations, setting the throughput of the topic to be
>> the
>> > absolute maximum of the various spikes observed during the day is very
>> > suboptimal.
>>
>> btw. A plain token bucket algorithm implementation doesn't have an
>> absolute maximum. The maximum average rate is controlled with the rate
>> of tokens added to the bucket. The capacity of the bucket controls how
>> much buffer there is to spend on spikes. If there's a need to also set
>> an absolute maximum rate, 2 token buckets could be chained to handle
>> that case. The second rate limiter could have an average rate of the
>> absolute maximum with a relatively small buffer (token bucket
>> capacity). There might also be more sophisticated algorithms which
>> vary the maximum rate in some way to smoothen spikes, but that might
>> just be completely unnecessary in Pulsar rate limiters.
>> Switching to use a conceptual model of token buckets will make it a
>> lot easier to reason about the implementation when we come to that
>> point. In the design we can try to "meet in the middle" where the
>> requirements and the design and implementation meet so that the
>> implementation is as simple as possible.
>>
>
> I have yet to go in depth on both the textbook rate limiter models, but if
> token bucket works on buffered tokens, it may not be much useful. Consider
> a situation where the throughput is always at its absolute configured peak,
> thus not reserving tokens at all. Again - this is the understanding based
> on the above text. I am yet to go through the wiki page with full details.
> In any case, I am very sure that an absolute maximum bursting limit is
> needed per topic, since at the end of the day, we are limited by hardware
> resources like network bandwidth, disk bandwidth etc.
>
>
>> > Moreover, in each of these situations, once bursting support is present
>> in
>> > the system, it would also need to have proper checks in place to
>> penalize
>> > the producers from trying to mis-use the system. In a true multi-tenant
>> > platform, this is very critical. Thus, blacklisting actually goes hand
>> in
>> > hand here. Explained more below.
>>
>> I don't yet fully understand this part. What would be an example of a
>> case where a producer should be penalized?
>> When would an attempt to mis-use happen and why? How would you detect
>> mis-use and how would you penalize it?
>>
>> Suppose the SOP for bursting support is to support a burst of upto 1.5x
> produce rate for upto 10 minutes, once in a window of 1 hour.
>
> Now if a particular producer is trying to burst to or beyond 1.5x for more
> than 5 minutes - while we are not accepting the requests and the producer
> client is continuously going into timeouts, there should be a way to
> completely blacklist that producer so that even the client stops attempting
> to produce and we unload the topics all together - thus, releasing open
> connections and broker compute.
>
>
> > This would be a much simpler and efficient model if it were reactive,
>> > based/triggered directly from within the rate limiter component. It can
>> be
>> > fast and responsive, which is very critical when trying to prevent the
>> > system from abuse.
>>
>> I agree that it could be simpler and efficient from some perspectives.
>> Do you have examples of what type of behaviors would you like to have
>> for handling this?
>> A concrete example with numbers could be very useful.
>>
>
> I believe the above example is a good one in terms of expectation, with
> numbers.
>
>
>> > It's only async if the producers use `sendAsync` . Moreover, even if it
>> is
>> > async in nature, practically it ends up being quite linear and
>> > well-homogenised. I am speaking from experience of running 1000s of
>> > partitioned topics in production
>>
>> I'd assume that most high traffic use cases would use asynchronous
>> sending in a way or another. If you'd be sending synchronously, it
>> would be extremely inefficient if messages are sent one-by-one. The
>> asyncronouscity could also come from multiple threads in an
>> application.
>>
>> You are right that a well homogenised workload reduces the severity of
>> the multiplexing problem. That's also why the problem of multiplexing
>> hasn't been fixed since the problem isn't visibile and it's also very
>> hard to observe.
>> In a case where 2 producers with very different rate limiting options
>> share a connection, it definetely is a problem in the current
>>
>
> Yes actually, I was talking to the team and we did observe a case where
> there was a client app with 2 producer objects writing to two different
> topics. When one of their topics was breaching quota, the other one was
> also observing rate limiting even though it was under quota. So eventually,
> this surely needs to be solved. One short term solution is to at least not
> share connections across producer objects. But I still feel it's out of
> scope of this discussion here.
>
> What do you think about the quick solution of not sharing connections
> across producer objects? I can raise a github issue explaining the
> situation.
>
>
>
> --
> Girish Sharma
>

Reply via email to