lhotari opened a new pull request, #23429:
URL: https://github.com/apache/pulsar/pull/23429

   ### Motivation
   
   [PIP-379: Key_Shared Draining Hashes for Improved Message 
Ordering](https://github.com/apache/pulsar/blob/master/pip/pip-379.md) was 
implemented in #23352.
   
   One of the major benefits of PIP-379 is the easy-to-understand model of when 
a hash is blocked.
   
   When a new consumer is added, hash range assignments move from existing 
consumers to the new consumer. (In some cases, hash range assignments can move 
between existing consumers after a consumer is added or removed.)
   
   The PIP-379 implementation ensures that no new messages for the hash ranges 
that were moved can be delivered until all unacknowledged messages for a 
specific hash are cleared with acknowledgements or when the consumer 
disconnects.
   This applies to the AUTO_SPLIT ordered mode of the Key_Shared subscription 
type.
   
   There's a concept of "draining hashes" in PIP-379 which is now reflected in 
the consumer stats. This is an intentionally exposed internal detail since the 
user must have the information available for understanding why messages don't 
get delivered.
   
   Since there's no mapping between external and internal concepts, the 
abstraction isn't leaky. The user doesn't need to know about the internal 
details of how the draining hashes are implemented, but they need to know that 
the consumer is blocked on unacknowledged messages for a specific hash range. 
This is all relevant information and doesn't contain unnecessary implementation 
details.
   
   This PR contains the "consumer stats" changes that provide the information 
in a clear way.
   
   ### Modifications
   
   Added consumer-level stats:
   
   - `drainingHashesCount` - the current number of hashes in the draining state 
for this consumer
   - `drainingHashesClearedTotal` - the total number of hashes cleared from the 
draining state since the consumer connected
   - `drainingHashesUnackedMessages` - the total number of unacknowledged 
messages for all draining hashes for this consumer
   - `drainingHashes` - draining hashes information for this consumer
     - `hash` - the sticky key hash which is draining
     - `unackMsgs` - the number of unacknowledged messages for this hash
     - `blockedAttempts` - the number of times the hash has blocked an 
attempted delivery of a message
   
   In addition:
   
   - `keyHashRangeArrays` - the consumer's hash range assignments in a list of 
lists where each item contains the start and end as elements.
     - example `[ [ 2960, 5968 ], [ 22258, 43033 ], [ 49261, 54464 ], [ 55155, 
61273 ] ]`
   
   It was necessary to add this field with a new name `keyHashRangeArrays` 
since there's already an existing `keyHashRange` field. Changing that isn't 
possible since it would break compatibility. A newer admin client couldn't read 
stats from an older broker and vice-versa.
   The previous `keyHashRange` is now deprecated. The field format was 
different.
   
   Example of both fields where the difference is visible:
   
   ```json
   {
           "keyHashRangeArrays" : [ [ 2960, 5968 ], [ 22258, 43033 ], [ 49261, 
54464 ], [ 55155, 61273 ] ],
           "keyHashRanges" : [ "[2960, 5968]", "[22258, 43033]", "[49261, 
54464]", "[55155, 61273]" ],
   }
   ```
   
   The field `keyHashRanges` contains the information as a list of string 
values, which isn't very usable for most use cases since it would need to be 
parsed before it can be used.
   
   The stats will continue to contain `keyHashRange` and 
`readPositionWhenJoining` when the "classic" (3.3.x) implementation of 
Key_Shared is used by configuring 
`subscriptionKeySharedUseClassicPersistentImplementation=true` ("classic" 
support was added in #23424).
   In the default configuration, the fields are removed from the topic stats 
output, but the client continues to support the fields for backward and forward 
compatibility.
   
   ### Example of consumer stats for a subscription
   
   ```json
   {      
         "consumers" : [ {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "bytesOutCounter" : 1560,
           "msgOutCounter" : 30,
           "msgRateRedeliver" : 0.0,
           "messageAckRate" : 0.0,
           "chunkedMessageRate" : 0.0,
           "consumerName" : "c1",
           "availablePermits" : 70,
           "unackedMessages" : 30,
           "avgMessagesPerEntry" : 1,
           "blockedConsumerOnUnackedMsgs" : false,
           "drainingHashesCount" : 5,
           "drainingHashesClearedTotal" : 0,
           "drainingHashesUnackedMessages" : 10,
           "drainingHashes" : [ {
             "hash" : 2862,
             "unackMsgs" : 2,
             "blockedAttempts" : 5
           }, {
             "hash" : 11707,
             "unackMsgs" : 2,
             "blockedAttempts" : 9
           }, {
             "hash" : 15786,
             "unackMsgs" : 2,
             "blockedAttempts" : 6
           }, {
             "hash" : 43539,
             "unackMsgs" : 2,
             "blockedAttempts" : 6
           }, {
             "hash" : 45436,
             "unackMsgs" : 2,
             "blockedAttempts" : 9
           } ],
           "address" : "/127.0.0.1:55829",
           "connectedSince" : "2024-10-10T05:39:39.077284+03:00",
           "clientVersion" : "Pulsar-Java-v4.0.0-SNAPSHOT",
           "lastAckedTimestamp" : 0,
           "lastConsumedTimestamp" : 1728527979411,
           "lastConsumedFlowTimestamp" : 1728527979106,
           "keyHashRangeArrays" : [ [ 2960, 5968 ], [ 22258, 43033 ], [ 49261, 
54464 ], [ 55155, 61273 ] ],
           "metadata" : { },
           "lastAckedTime" : "1970-01-01T02:00:00+02:00",
           "lastConsumedTime" : "2024-10-10T05:39:39.411+03:00"
         }, {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "bytesOutCounter" : 0,
           "msgOutCounter" : 0,
           "msgRateRedeliver" : 0.0,
           "messageAckRate" : 0.0,
           "chunkedMessageRate" : 0.0,
           "consumerName" : "c2",
           "availablePermits" : 1000,
           "unackedMessages" : 0,
           "avgMessagesPerEntry" : 0,
           "blockedConsumerOnUnackedMsgs" : false,
           "drainingHashesCount" : 0,
           "drainingHashesClearedTotal" : 0,
           "drainingHashesUnackedMessages" : 0,
           "drainingHashes" : [ ],
           "address" : "/127.0.0.1:55829",
           "connectedSince" : "2024-10-10T05:39:39.294216+03:00",
           "clientVersion" : "Pulsar-Java-v4.0.0-SNAPSHOT",
           "lastAckedTimestamp" : 0,
           "lastConsumedTimestamp" : 0,
           "lastConsumedFlowTimestamp" : 1728527979297,
           "keyHashRangeArrays" : [ [ 1, 2959 ], [ 5969, 22257 ], [ 43034, 
49260 ], [ 54465, 55154 ], [ 61274, 65535 ] ],
           "metadata" : { },
           "lastAckedTime" : "1970-01-01T02:00:00+02:00",
           "lastConsumedTime" : "1970-01-01T02:00:00+02:00"
         } ]
   }
   ```
   
   Relevant information for consumer c1:
   
   ```json
   {
           "drainingHashesCount" : 5,
           "drainingHashesClearedTotal" : 0,
           "drainingHashesUnackedMessages" : 10,
           "drainingHashes" : [ {
             "hash" : 2862,
             "unackMsgs" : 2,
             "blockedAttempts" : 5
           }, {
             "hash" : 11707,
             "unackMsgs" : 2,
             "blockedAttempts" : 9
           }, {
             "hash" : 15786,
             "unackMsgs" : 2,
             "blockedAttempts" : 6
           }, {
             "hash" : 43539,
             "unackMsgs" : 2,
             "blockedAttempts" : 6
           }, {
             "hash" : 45436,
             "unackMsgs" : 2,
             "blockedAttempts" : 9
           } ],
   }
   ```
   
   Relevant information in this case about consumer c2:
   
   ```json
   {
           "keyHashRangeArrays" : [ [ 1, 2959 ], [ 5969, 22257 ], [ 43034, 
49260 ], [ 54465, 55154 ], [ 61274, 65535 ] ],
   }
   ```
   
   The PIP-379 implementation will only block hashes that are necessary. For 
each hash, there's a way to get detailed information to find out why the 
delivery is blocked.
   The major difference from the previous `readPositionWhenJoining` solution is 
that it's possible to automate and build CLI and web user interface tools to 
assist a user, making it very easy to troubleshoot issues when message delivery 
is blocked by unacknowledged messages in Key_Shared subscriptions.
   
   Client-side tooling could already use the information provided in this PR to 
determine which consumer is blocked by a hash in the case that there would be 
multiple consumers.
   In the above example, the hash `2862` is contained in the hash range `[1, 
2959]`, which means 2 unacknowledged messages for that hash are preventing 
further messages with hash `2862` from being delivered to consumer `c2`.
   The `blockedAttempts` field contains a counter that increments each time the 
dispatcher skips delivery to a consumer due to this hash. Using this 
information alone, it's very convenient to observe Key_Shared AUTO_SPLIT 
subscriptions and find out the causes.
   
   A future improvement will be to add a REST API for finding out the 
unacknowledged message ID information of the unacknowledged message for a hash. 
Using this information, it's possible to find out the details of the message 
that is blocking a particular hash.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [x] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to