shibd commented on code in PR #23945:
URL: https://github.com/apache/pulsar/pull/23945#discussion_r1993214735


##########
pip/pip-406.md:
##########
@@ -0,0 +1,152 @@
+# PIP-406: Introduce metrics related to dispatch_throttled_count
+
+# Background knowledge
+
+## Motivation
+
+Currently, users can monitor subscription backlogs using the 
`pulsar_subscription_back_log_no_delayed` metric. 
+However, if [dispatch 
throttling](https://pulsar.apache.org/docs/next/concepts-throttling/) is 
configured at the broker/topic/subscription level,
+this metric may not accurately reflect whether the backlog is due to 
insufficient consumer capacity, as it could be caused by dispatch throttling.
+
+## Goals
+
+Introduce metrics to indicate the count of `messages/bytes throttled` for 
**broker/topic/subscription** level rate limit. 
+This allows users to write PromQL queries to identify subscriptions with high 
backlogs but low or no throttling, pinpointing backlogs caused by insufficient 
consumer capacity.
+
+## In Scope
+
+Broker Level:
+- Introduce the metric `pulsar_broker_dispatch_throttled_msg_count` to 
represent the total count of messages throttled for a broker.
+- Introduce the metric `pulsar_broker_dispatch_throttled_bytes_count` to 
represent the total count of bytes throttled for a broker.
+
+Topic Level:
+- Introduce the metric `pulsar_dispatch_throttled_msg_count` to represent the 
total count of messages throttled for a topic.
+- Introduce the metric `pulsar_dispatch_throttled_bytes_count` to represent 
the total count of bytes throttled for a topic.
+ 
+Subscription Level:
+- Introduce the metric `pulsar_subscription_dispatch_throttled_msg_count` to 
represent the total count of messages throttled for a subscription.
+- Introduce the metric `pulsar_subscription_dispatch_throttled_bytes_count` to 
represent the total count of bytes throttled for a subscription.
+ 
+
+## Out of Scope
+- These states are not persistent and will reset upon broker restart/ topic 
re-load / subscription reconnected.
+
+# High Level Design
+1. Maintain `dispatchThrottleMsgCount` and `dispatchThrottleBytesCount` in 
`DispatchRateLimiter`. Increase these values in the `consumeDispatchQuota` 
method when the TokenBucket for messages or bytes is insufficient.
+2. Output these fields when retrieving metrics.
+
+
+# Detailed Design
+
+## Design & Implementation Details
+1. Maintain `dispatchThrottleMsgCount` and `dispatchThrottleBytesCount` in 
`DispatchRateLimiter`:
+```java
+    private final LongAdder dispatchThrottleMsgCount = new LongAdder();
+    private final LongAdder dispatchThrottleBytesCount = new LongAdder();
+```
+
+2. During each 
[consumeDispatchQuota](https://github.com/apache/pulsar/blob/c4cff0ab3dac169c0a1418ef2f63f61604f6278e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java#L97-L104),
+if token bucket is insufficient, increase these fields accordingly.
+```diff
+     public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
+         AsyncTokenBucket localDispatchRateLimiterOnMessage = 
dispatchRateLimiterOnMessage;
+         if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != 
null) {
+-            localDispatchRateLimiterOnMessage.consumeTokens(numberOfMessages);
++            if 
(!localDispatchRateLimiterOnMessage.consumeTokensAndCheckIfContainsTokens(numberOfMessages))
 {
++                dispatchThrottleMsgCount.increment();
++            }
+         }
+         AsyncTokenBucket localDispatchRateLimiterOnByte = 
dispatchRateLimiterOnByte;
+         if (byteSize > 0 && localDispatchRateLimiterOnByte != null) {
+-            localDispatchRateLimiterOnByte.consumeTokens(byteSize);
++            if 
(!localDispatchRateLimiterOnByte.consumeTokensAndCheckIfContainsTokens(byteSize))
 {
++                dispatchThrottleBytesCount.increment();
++            }

Review Comment:
   Hi, Lari.
   Thanks for reviewing. It seems like adding labels is a good idea. We can add 
a label to distinguish whether throttling is caused by a subscription, broker, 
or topic.
   
   By doing this, we actually only need two metrics:
   - `pulsar_subscription_dispatch_throttled_msg_events`
     - Labels:
       - topic
       - subscription
       - `reason`: broker/topic/subscription
   - `pulsar_subscription_dispatch_throttled_bytes_events`
     - Labels:
       - topic
       - subscription
       - `reason`: broker/topic/subscription
   
   When a user wants to view all throttling occurrences for a subscription, 
they just need to use a query like the one below (without the reason label).
   ```java
           double subscriptionAllDispatchThrottledMsgCount = 
subscriptionDispatchThrottledMsgCountMetrics.stream()
                   .filter(m -> m.tags.get("subscription").equals(subName) && 
m.tags.get("topic").equals(topic))
                   .mapToDouble(m-> m.value).sum();
   ```
   
   if the user want get a subscription throttle events by topic rate limit:
   ```java
           double subscriptionDispatchThrottledBytesCount = 
subscriptionDispatchThrottledBytesCountMetrics.stream()
                   .filter(m -> m.tags.get("subscription").equals(subName)
                           && m.tags.get("topic").equals(topic) && 
m.tags.get("reason").equals("topic"))
                   .mapToDouble(m-> m.value).sum();
   ```
   
   This way, we don’t need to add extra throttle metrics for `topics` and 
`brokers`; using this one metric can handle all scenarios.
   
   For example, If the user wants to obtain the metric for the total number of 
times the broker rate limit was subscription throttled.
   ```java
           double brokerAllDispatchThrottledMsgCount = 
subscriptionDispatchThrottledMsgCountMetrics.stream()
                   .filter(m -> m.tags.get("reason").equals("broker"))
                   .mapToDouble(m-> m.value).sum();
   ```
   
   
   What do you think? I’ve drafted a PR, and you can take a look at the 
implementation(You can review user scenarios through unit tests)
   
   https://github.com/shibd/pulsar/pull/56
   
   If there are no issues, I'll start modifying the PIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to