shibd commented on code in PR #23945: URL: https://github.com/apache/pulsar/pull/23945#discussion_r1993214735
########## pip/pip-406.md: ########## @@ -0,0 +1,152 @@ +# PIP-406: Introduce metrics related to dispatch_throttled_count + +# Background knowledge + +## Motivation + +Currently, users can monitor subscription backlogs using the `pulsar_subscription_back_log_no_delayed` metric. +However, if [dispatch throttling](https://pulsar.apache.org/docs/next/concepts-throttling/) is configured at the broker/topic/subscription level, +this metric may not accurately reflect whether the backlog is due to insufficient consumer capacity, as it could be caused by dispatch throttling. + +## Goals + +Introduce metrics to indicate the count of `messages/bytes throttled` for **broker/topic/subscription** level rate limit. +This allows users to write PromQL queries to identify subscriptions with high backlogs but low or no throttling, pinpointing backlogs caused by insufficient consumer capacity. + +## In Scope + +Broker Level: +- Introduce the metric `pulsar_broker_dispatch_throttled_msg_count` to represent the total count of messages throttled for a broker. +- Introduce the metric `pulsar_broker_dispatch_throttled_bytes_count` to represent the total count of bytes throttled for a broker. + +Topic Level: +- Introduce the metric `pulsar_dispatch_throttled_msg_count` to represent the total count of messages throttled for a topic. +- Introduce the metric `pulsar_dispatch_throttled_bytes_count` to represent the total count of bytes throttled for a topic. + +Subscription Level: +- Introduce the metric `pulsar_subscription_dispatch_throttled_msg_count` to represent the total count of messages throttled for a subscription. +- Introduce the metric `pulsar_subscription_dispatch_throttled_bytes_count` to represent the total count of bytes throttled for a subscription. + + +## Out of Scope +- These states are not persistent and will reset upon broker restart/ topic re-load / subscription reconnected. + +# High Level Design +1. Maintain `dispatchThrottleMsgCount` and `dispatchThrottleBytesCount` in `DispatchRateLimiter`. Increase these values in the `consumeDispatchQuota` method when the TokenBucket for messages or bytes is insufficient. +2. Output these fields when retrieving metrics. + + +# Detailed Design + +## Design & Implementation Details +1. Maintain `dispatchThrottleMsgCount` and `dispatchThrottleBytesCount` in `DispatchRateLimiter`: +```java + private final LongAdder dispatchThrottleMsgCount = new LongAdder(); + private final LongAdder dispatchThrottleBytesCount = new LongAdder(); +``` + +2. During each [consumeDispatchQuota](https://github.com/apache/pulsar/blob/c4cff0ab3dac169c0a1418ef2f63f61604f6278e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java#L97-L104), +if token bucket is insufficient, increase these fields accordingly. +```diff + public void consumeDispatchQuota(long numberOfMessages, long byteSize) { + AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage; + if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null) { +- localDispatchRateLimiterOnMessage.consumeTokens(numberOfMessages); ++ if (!localDispatchRateLimiterOnMessage.consumeTokensAndCheckIfContainsTokens(numberOfMessages)) { ++ dispatchThrottleMsgCount.increment(); ++ } + } + AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte; + if (byteSize > 0 && localDispatchRateLimiterOnByte != null) { +- localDispatchRateLimiterOnByte.consumeTokens(byteSize); ++ if (!localDispatchRateLimiterOnByte.consumeTokensAndCheckIfContainsTokens(byteSize)) { ++ dispatchThrottleBytesCount.increment(); ++ } Review Comment: Hi, Lari. Thanks for reviewing. It seems like adding labels is a good idea. We can add a label to distinguish whether throttling is caused by a subscription, broker, or topic. By doing this, we actually only need two metrics: - `pulsar_subscription_dispatch_throttled_msg_events` - Labels: - topic - subscription - `reason`: broker/topic/subscription - `pulsar_subscription_dispatch_throttled_bytes_events` - Labels: - topic - subscription - `reason`: broker/topic/subscription When a user wants to view all throttling occurrences for a subscription, they just need to use a query like the one below (without the reason label). ```java double subscriptionAllDispatchThrottledMsgCount = subscriptionDispatchThrottledMsgCountMetrics.stream() .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic)) .mapToDouble(m-> m.value).sum(); ``` if the user want get a subscription throttle events by topic rate limit: ```java double subscriptionDispatchThrottledBytesCount = subscriptionDispatchThrottledBytesCountMetrics.stream() .filter(m -> m.tags.get("subscription").equals(subName) && m.tags.get("topic").equals(topic) && m.tags.get("reason").equals("topic")) .mapToDouble(m-> m.value).sum(); ``` This way, we don’t need to add extra throttle metrics for `topics` and `brokers`; using this one metric can handle all scenarios. For example, If the user wants to obtain the metric for the total number of times the broker rate limit was subscription throttled. ```java double brokerAllDispatchThrottledMsgCount = subscriptionDispatchThrottledMsgCountMetrics.stream() .filter(m -> m.tags.get("reason").equals("broker")) .mapToDouble(m-> m.value).sum(); ``` What do you think? I’ve drafted a PR, and you can take a look at the implementation(You can review user scenarios through unit tests) https://github.com/shibd/pulsar/pull/56 If there are no issues, I'll start modifying the PIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
