lhotari commented on code in PR #25706:
URL: https://github.com/apache/pulsar/pull/25706#discussion_r3199689047


##########
pip/pip-474.md:
##########
@@ -0,0 +1,349 @@
+# PIP-474: Key_Shared Hot Key Overflow Mechanism
+
+# Background Knowledge
+
+## Key_Shared Subscription
+
+Key_Shared is a subscription type provided by Pulsar. It distributes messages 
in a Topic to multiple Consumers based on the Key dimension: messages with the 
same Key are always routed to the same Consumer, ensuring per-key ordering; 
messages with different Keys can be distributed to different Consumers, 
enabling parallel consumption.
+
+Specifically, the Broker computes a Murmur3 hash over each message's sticky 
key, maps it to the range `[0, 65535]`, and then uses 
`StickyKeyConsumerSelector` to bind each hash value to a Consumer. All 
Consumers under a Key_Shared subscription share a single `ManagedCursor`, which 
maintains the global consumption progress (the `mark-delete` position).
+
+## What Problem It Solves
+
+In traditional sequential consumption scenarios, achieving per-key ordering 
typically requires routing messages of the same Key to the same partition, 
which is then processed by that partition's sole consumer in order. This means 
**consumption parallelism is limited by the number of partitions**—to add 
consumers you must add partitions, but the partition count is a fundamental 
Topic configuration that cannot be adjusted freely or frequently.
+
+Key_Shared subscriptions break this constraint: within the same partition, 
messages with different Keys can be distributed to different Consumers. 
Consumption parallelism depends only on the number of Keys and the number of 
Consumers, and is independent of the partition count.
+
+## What It Achieves
+
+1. **Multiple Key-Consumer Binding Strategies**: Provides three Selector 
implementations—`HashRangeAutoSplit` (evenly split hash ranges automatically), 
`HashRangeExclusive` (manually specify ranges), and `ConsistentHashing` 
(consistent hashing)—to suit different scenarios.
+
+2. **Ordering Guarantee During Consumer Scaling**: In AUTO_SPLIT mode (and 
when out-of-order delivery is not enabled), the Draining Hashes mechanism 
([PIP-379](https://github.com/apache/pulsar/issues/21199)) ensures ordering 
when Consumers join or leave and hash ranges are reassigned. Affected hashes 
enter a "draining" state—the new Consumer must wait until the old Consumer has 
finished processing all pending messages for that hash before it starts 
receiving, ensuring no reordering during scaling.
+
+## But It Has a Significant Problem
+
+**A hot Key (or a subset of Keys stuck in processing) will block consumption 
of *all other Keys*.**
+
+This problem stems from the interaction of three mechanisms within the 
Key_Shared Dispatcher:
+
+**Replay Queue**: When a message cannot be dispatched (Consumer has no 
permits, hash is blocked, etc.), its position is placed into the Replay queue 
(`MessageRedeliveryController`). Replay reads typically take priority over 
Normal reads—as long as the Replay queue is non-empty and has dispatchable 
messages, no new messages are read from the Topic. However, there is a 
look-ahead exception: when all messages in a replay round are filtered out 
(e.g., due to consumers lacking permits), the dispatcher skips the next replay 
and proceeds directly to a Normal Read to fetch new messages forward (bounded 
by `effectiveLookAheadLimit`).
+
+**`containsStickyKeyHash` Ordering Check**: If a hash has unprocessed messages 
in the Replay queue, new messages for that hash read via Normal Read are also 
intercepted and redirected into Replay—new messages cannot be dispatched before 
old ones are processed. This is the core mechanism for guaranteeing per-key 
ordering.
+
+**Look-Ahead Limit**: When the Replay queue size reaches 
`effectiveLookAheadLimit` (default: `min(perConsumer × consumerCount, 
perSubscription)`), Normal Read is completely disabled and the Cursor stops 
reading new messages forward.
+
+## A Concrete Scenario
+
+Consider a typical consumption scenario:
+
+**Configuration**:
+- Partitioned Topic with 200 partitions, 100 new Keys per second, each Key 
sends at 50 TPS for 100 seconds then stops, with a maximum of `100 × 100 = 
10,000` active Keys concurrently
+- Steady-state total throughput: `10,000 × 50 = 500,000 msg/s`
+- Keys are evenly distributed by hash across 200 partitions, ~50 active Keys 
per partition, `50 × 50 = 2,500 msg/s`
+- 20 Consumers, each Consumer connects to all 200 partitions (under Key_Shared 
subscription, the client automatically creates an internal consumer per 
partition)
+- Each partition's Dispatcher sees 20 Consumers, each responsible for ~2–3 
Keys in that partition
+- Each message takes 15ms to process (normal business processing speed)
+- Default configuration: `receiverQueueSize = 1000`, 
`keySharedLookAheadMsgInReplayThresholdPerConsumer = 2000`, 
`keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000`
+- Per-partition `effectiveLookAheadLimit = min(2000 × 20, 20000) = 20,000`
+
+### Normal Operation
+
+Consumption capacity covers production rate (`66.7 msg/s > 50 TPS`). Each 
partition has 20 Consumers processing ~50 Keys in parallel, yielding 
per-partition throughput of `50 × 66.7 ≈ 3,335 msg/s > 2,500 msg/s`.
+
+- Replay queues across all 200 partitions remain essentially empty; Normal 
Read advances normally
+- **All 10,000 active Keys are consumed in real time; the system is healthy**
+
+### Single-Key Hot Spot / Partial Key Processing Stall
+
+Assume Consumer-1 hangs due to downstream service timeout (cannot ack any 
messages). Consumer-1 connects to all 200 partitions, responsible for ~2–3 Keys 
in each partition.
+
+**Within each partition**:
+
+**T = 0s onwards**: After Consumer-1's permits are exhausted, messages for its 
2–3 Keys can no longer be dispatched and start flooding the partition's Replay 
queue. Each partition injects messages into Replay at ~125 msg/s (2.5 keys × 50 
TPS)—messages that will never be acked.
+
+**T = 0–3 minutes**: Replay queues across all 200 partitions grow 
monotonically, **simultaneously** and **independently**, at ~125 msg/s. The 
`containsStickyKeyHash` check ensures that new messages for these hashes read 
via Normal Read are also intercepted into Replay. As Replay grows, 
`getMaxEntriesReadLimit()` returns progressively smaller values, Normal Read 
batches are compressed, and throughput for other Keys gradually degrades.
+
+**T ≈ 2.7 minutes — the fatal tipping point**: Replay queues across all 200 
partitions fill to the 20,000-entry limit almost simultaneously (`20000 / 125 ≈ 
160s ≈ 2.7min`). `isNormalReadAllowed()` returns false. Normal Read is 
permanently disabled.
+
+**T > 3 minutes — global blackout**: Dispatchers across all 200 partitions 
enter a pure Replay loop—but Replay is dominated by messages destined for 
Consumer-1, which is already stuck. **The remaining 19 Consumers have abundant 
free permits but cannot receive new messages on any partition. The vast 
majority of the 10,000 active Keys are starved.**
+
+```
+A single Consumer's failure spreads to all partitions in under 3 minutes,
+turning "500K msg/s real-time consumption" into "global blackout."
+```
+
+This is the core problem this PIP aims to solve.
+
+# Motivation
+
+As illustrated above, the `containsStickyKeyHash` ordering check forms a 
**self-reinforcing positive feedback loop**:
+
+```
+hash in replay → Normal Read messages for same hash are blocked → enter replay 
→ replay grows
+→ Normal Read batch shrinks → replay reaches limit faster → Normal Read stops 
→ global blackout
+```
+
+The original design intent of this check is entirely correct (M11 must not be 
dispatched before M10 is consumed). But under traffic skew or when some 
Consumers stall, this ordering mechanism becomes an avalanche accelerator. **A 
few Keys' problem escalates into a catastrophe for all Keys.**
+
+## Why Not Simply Increase the Replay Limit
+
+Increasing `keySharedLookAheadMsgInReplayThresholdPerConsumer` and 
`keySharedLookAheadMsgInReplayThresholdPerSubscription` may seem like the most 
straightforward mitigation, but there are three structural issues:
+
+1. **Amplified I/O waste**: A larger replay queue means more stuck-Key 
messages are read from BK each Replay Read cycle, yet all of them are returned 
unchanged because the target Consumer has no permits—pure I/O waste.

Review Comment:
   After checking this again, the existing implementation is already skipping 
reading from cache / BK.
   This happens in ReplayPositionFilter:
   
https://github.com/apache/pulsar/blob/138595f6256c301956f9d77fde8534699e992536/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L612-L620
   
   This means that there's no "amplified I/O waste" in the way that has been 
described. However there are most likely some corner cases that can be 
optimized further. However PIP-430 (since 4.1.0) mitigates most of the issues 
of unnecessary re-reads. Please take a look at 
https://github.com/apache/pulsar/blob/master/pip/pip-430.md. 
   
   The PIP-430 broker cache should be sufficiently tuned for high scale use 
cases to avoid unnecessary BK reads when using Key_Shared subscriptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to