This is an automated email from the ASF dual-hosted git repository.

liuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 52c382a7e1e [improve][doc] Added docs for key shared subscription 
hashing schemes (#18878)
52c382a7e1e is described below

commit 52c382a7e1e6f44ec52ba3280d84d67bb4f70e1d
Author: Asaf Mesika <[email protected]>
AuthorDate: Wed Dec 14 09:07:43 2022 +0200

    [improve][doc] Added docs for key shared subscription hashing schemes 
(#18878)
---
 site2/docs/concepts-messaging.md | 185 +++++++++++++++++++++++++++++++++++----
 1 file changed, 166 insertions(+), 19 deletions(-)

diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index 92db99e33ca..24782cac3d0 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -22,18 +22,18 @@ If the consumption of a message fails and you want this 
message to be consumed a
 
 Messages are the basic "unit" of Pulsar. The following table lists the 
components of messages.
 
-Component | Description
-:---------|:-------
-Value / data payload | The data carried by the message. All Pulsar messages 
contain raw bytes, although message data can also conform to data 
[schemas](schema-get-started.md).
-Key | The key (string type) of the message. It is a short name of message key 
or partition key. Messages are optionally tagged with keys, which is useful for 
features like [topic compaction](concepts-topic-compaction.md).
-Properties | An optional key/value map of user-defined properties.
-Producer name | The name of the producer who produces the message. If you do 
not specify a producer name, the default name is used.
-Topic name | The name of the topic that the message is published to.
-Schema version | The version number of the schema that the message is produced 
with.
-Sequence ID | Each Pulsar message belongs to an ordered sequence on its topic. 
The sequence ID of a message is initially assigned by its producer, indicating 
its order in that sequence, and can also be customized.<br />Sequence ID can be 
used for message deduplication. If `brokerDeduplicationEnabled` is set to 
`true`, the sequence ID of each message is unique within a producer of a topic 
(non-partitioned) or a partition.
-Message ID | The message ID of a message is assigned by bookies as soon as the 
message is persistently stored. Message ID indicates a message’s specific 
position in a ledger and is unique within a Pulsar cluster.
-Publish time | The timestamp of when the message is published. The timestamp 
is automatically applied by the producer.
-Event time | An optional timestamp attached to a message by applications. For 
example, applications attach a timestamp on when the message is processed. If 
nothing is set to event time, the value is `0`.
+| Component            | Description                                           
                                                                                
                                                                                
                                                                                
                                                                                
                                     |
+|:---------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Value / data payload | The data carried by the message. All Pulsar messages 
contain raw bytes, although message data can also conform to data 
[schemas](schema-get-started.md).                                               
                                                                                
                                                                                
                                                    |
+| Key                  | The key (string type) of the message. It is a short 
name of message key or partition key. Messages are optionally tagged with keys, 
which is useful for features like [topic 
compaction](concepts-topic-compaction.md).                                      
                                                                                
                                                                              |
+| Properties           | An optional key/value map of user-defined properties. 
                                                                                
                                                                                
                                                                                
                                                                                
                                     |
+| Producer name        | The name of the producer who produces the message. If 
you do not specify a producer name, the default name is used.                   
                                                                                
                                                                                
                                                                                
                                     |
+| Topic name           | The name of the topic that the message is published 
to.                                                                             
                                                                                
                                                                                
                                                                                
                                       |
+| Schema version       | The version number of the schema that the message is 
produced with.                                                                  
                                                                                
                                                                                
                                                                                
                                      |
+| Sequence ID          | Each Pulsar message belongs to an ordered sequence on 
its topic. The sequence ID of a message is initially assigned by its producer, 
indicating its order in that sequence, and can also be customized.<br 
/>Sequence ID can be used for message deduplication. If 
`brokerDeduplicationEnabled` is set to `true`, the sequence ID of each message 
is unique within a producer of a topic (non-partitioned) or a partition. |
+| Message ID           | The message ID of a message is assigned by bookies as 
soon as the message is persistently stored. Message ID indicates a message’s 
specific position in a ledger and is unique within a Pulsar cluster.            
                                                                                
                                                                                
                                        |
+| Publish time         | The timestamp of when the message is published. The 
timestamp is automatically applied by the producer.                             
                                                                                
                                                                                
                                                                                
                                       |
+| Event time           | An optional timestamp attached to a message by 
applications. For example, applications attach a timestamp on when the message 
is processed. If nothing is set to event time, the value is `0`.                
                                                                                
                                                                                
                                             |
 
 The default size of a message is 5 MB. You can configure the max size of a 
message with the following configurations.
 
@@ -70,12 +70,12 @@ Producers send messages to brokers synchronously (sync) or 
asynchronously (async
 
 You can have different types of access modes on topics for producers.
 
-Access mode | Description
-:-----------|------------
-`Shared`           | Multiple producers can publish on a topic. <br /><br 
/>This is the **default** setting.
-`Exclusive`        | Only one producer can publish on a topic. <br /><br />If 
there is already a producer connected, other producers trying to publish on 
this topic get errors immediately.<br /><br />The "old" producer is evicted and 
a "new" producer is selected to be the next exclusive producer if the "old" 
producer experiences a network partition with the broker.
-`ExclusiveWithFencing`|Only one producer can publish on a topic. <br /><br 
/>If there is already a producer connected, it will be removed and invalidated 
immediately.
-`WaitForExclusive` | If there is already a producer connected, the producer 
creation is pending (rather than timing out) until the producer gets the 
`Exclusive` access.<br /><br />The producer that succeeds in becoming the 
exclusive one is treated as the leader. Consequently, if you want to implement 
a leader election scheme for your application, you can use this access mode. 
Note that the leader pattern scheme mentioned refers to using Pulsar as a 
Write-Ahead Log (WAL) which means the l [...]
+| Access mode            | Description                                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|:-----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| `Shared`               | Multiple producers can publish on a topic. <br 
/><br />This is the **default** setting.                                        
                                                                                
                                                                                
                                                                                
                                                                                
                   [...]
+| `Exclusive`            | Only one producer can publish on a topic. <br /><br 
/>If there is already a producer connected, other producers trying to publish 
on this topic get errors immediately.<br /><br />The "old" producer is evicted 
and a "new" producer is selected to be the next exclusive producer if the "old" 
producer experiences a network partition with the broker.                       
                                                                                
                 [...]
+| `ExclusiveWithFencing` | Only one producer can publish on a topic. <br /><br 
/>If there is already a producer connected, it will be removed and invalidated 
immediately.                                                                    
                                                                                
                                                                                
                                                                                
               [...]
+| `WaitForExclusive`     | If there is already a producer connected, the 
producer creation is pending (rather than timing out) until the producer gets 
the `Exclusive` access.<br /><br />The producer that succeeds in becoming the 
exclusive one is treated as the leader. Consequently, if you want to implement 
a leader election scheme for your application, you can use this access mode. 
Note that the leader pattern scheme mentioned refers to using Pulsar as a 
Write-Ahead Log (WAL) which means [...]
 
 :::note
 
@@ -598,10 +598,157 @@ In the diagram below, **Consumer A**, **Consumer B** and 
**Consumer C** are all
 
 #### Key_Shared
 
-In the *Key_Shared* type, multiple consumers can attach to the same 
subscription. Messages are delivered in distribution across consumers and 
messages with the same key or same ordering key are delivered to only one 
consumer. No matter how many times the message is re-delivered, it is delivered 
to the same consumer. When a consumer connects or disconnects, it causes the 
served consumer to change some message keys.
+In the *Key_Shared* type, multiple consumers can attach to the same 
subscription. Messages are delivered in distribution across consumers and 
messages with the same key or same ordering key are delivered to only one 
consumer. No matter how many times the message is re-delivered, it is delivered 
to the same consumer. 
 
 ![Key_Shared subscriptions](/assets/pulsar-key-shared-subscriptions.svg)
 
+There are three types of mapping algorithms dictating how to select a consumer 
for a given message key (or ordering key): Sticky, Auto-split Hash Range, and 
Auto-split Consistent Hashing. The steps for all algorithms are:
+1. The message key (or ordering key) is passed to a hash function (e.g., 
Murmur3 32-bit), yielding a 32-bit integer hash. 
+2. That hash number is fed to the algorithm to select a consumer from the 
existing connected consumers.
+
+```
+                      +--------------+                              
+-----------+
+Message Key ----->  / Hash Function / ----- hash (32-bit) -------> / Algorithm 
/ ----> Consumer   
+                   +---------------+                               +----------+
+```
+
+
+When a new consumer is connected and thus added to the list of connected 
consumers, the algorithm re-adjusts the mapping such that some keys currently 
mapped to existing consumers will be mapped to the newly added consumer. When a 
consumer is disconnected, thus removed from the list of connected consumers, 
keys mapped to it will be mapped to other consumers. The sections below will 
explain how a consumer is selected given the message hash and how the mapping 
is adjusted given a new consu [...]
+
+##### Auto-split Hash Range
+
+The algorithm assumes there is a range of numbers between 0 to 2^16 (65,536). 
Each consumer is mapped into a single region in this range, so all mapped 
regions cover the entire range, and no regions overlap. A consumer is selected 
for a given key by running a modulo operation on the message hash by the range 
size (65,536). The number received ( 0 <= i < 65,536) is contained within a 
single region. The consumer mapped to that region is the one selected.
+
+Example:
+
+Suppose we have 4 consumers (C1, C2, C3 and C4), then:
+
+```
+ 0               16,384            32,768           49,152             65,536
+ |------- C3 ------|------- C2 ------|------- C1 ------|------- C4 ------|
+```
+
+Given a message key `Order-3459134`, its hash would be 
`murmur32("Order-3459134") = 3112179635`, and its index in the range would be 
`3112179635 mod 65536 = 6067`. That index is contained within region `[0, 
16384)` thus consumer C1 will be mapped to this message key.
+
+When a new consumer is connected, the largest region is chosen and is then 
split in half - the lower half will be mapped to the newly added consumer and 
upper half will be mapped to the consumer owning that region. Here is how it 
looks like from 1 to 4 consumers:
+
+```
+C1 connected:
+|---------------------------------- C1 ---------------------------------|
+
+C2 connected:
+|--------------- C2 ----------------|---------------- C1 ---------------|
+
+C3 connected:
+|------- C3 ------|------- C2 ------|---------------- C1 ---------------|
+
+C4 connected:
+|------- C3 ------|------- C2 ------|------- C1 ------|------- C4 ------|
+```
+
+When a consumer is disconnected its region will be merged into the region on 
its right. Examples:
+
+C4 is disconnected:
+
+```
+|------- C3 ------|------- C2 ------|---------------- C1 ---------------|
+```
+
+C1 is disconnected:
+
+```
+|------- C3 ------|-------------------------- C2 -----------------------|
+```
+
+The advantages of this algorithm is that it affects only a single existing 
consumer upon add/delete consumer, at the expense of regions not evenly sized. 
Thi means some consumers gets more keys that others. The next algorithm does 
the other way around. 
+
+##### Auto-split Consistent Hashing
+
+This algorithm uses a Hash Ring. It's a range of number from 0 to MAX_INT 
(32-bit) in which if you traverse the range, when reaching MAX_INT, the next 
number would be zero. It is as if you took a line starting from 0 ending at 
MAX_INT and bent into a circle such that the end glues to the start:
+
+```
+ MAX_INT -----++--------- 0
+              ||
+         , - ~ ~ ~ - ,
+     , '               ' ,
+   ,                       ,
+  ,                         ,
+ ,                           ,
+ ,                           ,
+ ,                           ,
+  ,                         ,
+   ,                       ,
+     ,                  , '
+       ' - , _ _ _ ,  '
+```
+
+When adding a consumer, we mark 100 points on that circle and associate them 
to the newly added consumer. For each number between 1 and 100, we concatenate 
the consumer name to that number and run the hash function on it to get the 
location of the point on the circle that will be marked. For Example, if the 
consumer name is "orders-aggregator-pod-2345-consumer" then we would mark 100 
points on that circle:
+```
+    murmur32("orders-aggregator-pod-2345-consumer1") = 1003084738
+    murmur32("orders-aggregator-pod-2345-consumer2") = 373317202
+    ...
+    murmur32("orders-aggregator-pod-2345-consumer100") = 320276078
+```
+
+Since the hash function has the uniform distribution attribute, those points 
would be uniformly distributed across the circle.
+
+```
+    C1-100                 
+         , - ~ ~ ~ - ,   C1-1
+     , '               ' ,
+   ,                       ,
+  ,                         , C1-2
+ ,                           ,
+ ,                           ,
+ ,                           ,
+  ,                         ,  C1-3
+   ,                       ,
+     ,                  , '
+       ' - , _ _ _ ,  '      ...
+ 
+```
+
+A consumer is selected for a given message key by putting its hash on the 
circle then continue clock-wise on the circle until you reach a marking point. 
The point might have more than one consumer on it (hash function might have 
collisions) there for, we run the following operation to get a position within 
the list of consumers for that point, then we take the consumer in that 
position: `hash % consumer_list_size = index`.
+
+When a consumer is added, we add 100 marking points to the circle as explained 
before. Due to the uniform distribution of the hash function, those 100 points 
act as if the new consumer takes a small slice of keys out of each existing 
consumer. It maintains the even distribution, on the trade-off that it impacts 
all existing consumers. [This 
video](https://www.youtube.com/watch?v=zaRkONvyGr8) explains the concept of 
Consistent Hashing quite well (the only difference is that in Pulsar's ca [...]
+
+##### Sticky
+
+The algorithm assumes there is a range of numbers between 0 to 2^16 (65,536). 
Each consumer is mapped to a multiple regions in this range and there is no 
overlap between regions. The consumer is selected by running a modulo operation 
on the message hash by the range size (65,536), the number received (0 <= i < 
65,536), is contained within a single region. The consumer mapped to the region 
is the one selected.
+In this algorithm you have full control. Every newly added consumer specifies 
the ranges it wishes to be mapped to by using Consumer API. When the consumer 
object is constructed, you can specify the list of ranges. It's your 
responsibility to make sure there are no overlaps and all the range is covered 
by regions.
+
+Example:
+
+Suppose we have 2 consumers (C1 and C2) each specified their ranges, then:
+
+```
+C1 = [0, 16384), (32768, 49152]
+C2 = [16384, 32768), (49,152, 65536]
+ 
+ 0               16,384            32,768           49,152             65,536
+ |------- C1 ------|------- C2 ------|------- C1 ------|------- C2 ------|
+```
+
+Given a message key `Order-3459134`, it's hash would be 
`murmur32("Order-3459134") = 3112179635`, and it's index in the range would be 
`3112179635 mod 65536 = 6067`. That index is contained within `[0, 16384)` thus 
consumer C1 will map to this message key.
+
+If the newly connected consumer didn't supply their ranges, or they overlap 
with existing consumer ranges, it's disconnected, removed from the consumers 
list and reverted as if it never tried to connect.
+
+##### How to use them?
+
+When building the consumer, you can specify the Key Shared Mode: 
+* AUTO_SPLIT - Auto-split Hash Range
+* STICKY - Sticky
+
+Consistent Hashing will be used instead of Hash Range for Auto-split if the 
broker configuration `subscriptionKeySharedUseConsistentHashing` is enabled.
+
+##### Preserving order of processing
+
+Key Shared Subscription type guarantees a key will be processed by a *single* 
consumer at any given time. When a new consumer a connected, some key will be 
mapped to it from existing consumers. The broker will not deliver messages to 
the new consumer until all messages delivered up until connection time will be 
acknowledged. This will guarantee a certain key is processed by a single 
consumer at any given time. The trade-off is that if one of the existing 
consumers is stuck and no time-ou [...]
+
+That requirement can be relaxed by enabling `allowOutOfOrderDelivery` via the 
Consumer API. If set on the new consumer, then when it is connected, the broker 
will allow it to receive messages knowing some messages of that key may be 
still be processing in other consumers at the time, thus order may be affected 
for that short period of adding a new consumer.
+
+##### Batching for Key Shared Subscriptions
+
 :::note
 
 When the consumers are using the Key_Shared subscription type, you need to 
**disable batching** or **use key-based batching** for the producers. 

Reply via email to