lhotari commented on code in PR #25706: URL: https://github.com/apache/pulsar/pull/25706#discussion_r3199689047
########## pip/pip-474.md: ########## @@ -0,0 +1,349 @@ +# PIP-474: Key_Shared Hot Key Overflow Mechanism + +# Background Knowledge + +## Key_Shared Subscription + +Key_Shared is a subscription type provided by Pulsar. It distributes messages in a Topic to multiple Consumers based on the Key dimension: messages with the same Key are always routed to the same Consumer, ensuring per-key ordering; messages with different Keys can be distributed to different Consumers, enabling parallel consumption. + +Specifically, the Broker computes a Murmur3 hash over each message's sticky key, maps it to the range `[0, 65535]`, and then uses `StickyKeyConsumerSelector` to bind each hash value to a Consumer. All Consumers under a Key_Shared subscription share a single `ManagedCursor`, which maintains the global consumption progress (the `mark-delete` position). + +## What Problem It Solves + +In traditional sequential consumption scenarios, achieving per-key ordering typically requires routing messages of the same Key to the same partition, which is then processed by that partition's sole consumer in order. This means **consumption parallelism is limited by the number of partitions**—to add consumers you must add partitions, but the partition count is a fundamental Topic configuration that cannot be adjusted freely or frequently. + +Key_Shared subscriptions break this constraint: within the same partition, messages with different Keys can be distributed to different Consumers. Consumption parallelism depends only on the number of Keys and the number of Consumers, and is independent of the partition count. + +## What It Achieves + +1. **Multiple Key-Consumer Binding Strategies**: Provides three Selector implementations—`HashRangeAutoSplit` (evenly split hash ranges automatically), `HashRangeExclusive` (manually specify ranges), and `ConsistentHashing` (consistent hashing)—to suit different scenarios. + +2. **Ordering Guarantee During Consumer Scaling**: In AUTO_SPLIT mode (and when out-of-order delivery is not enabled), the Draining Hashes mechanism ([PIP-379](https://github.com/apache/pulsar/issues/21199)) ensures ordering when Consumers join or leave and hash ranges are reassigned. Affected hashes enter a "draining" state—the new Consumer must wait until the old Consumer has finished processing all pending messages for that hash before it starts receiving, ensuring no reordering during scaling. + +## But It Has a Significant Problem + +**A hot Key (or a subset of Keys stuck in processing) will block consumption of *all other Keys*.** + +This problem stems from the interaction of three mechanisms within the Key_Shared Dispatcher: + +**Replay Queue**: When a message cannot be dispatched (Consumer has no permits, hash is blocked, etc.), its position is placed into the Replay queue (`MessageRedeliveryController`). Replay reads typically take priority over Normal reads—as long as the Replay queue is non-empty and has dispatchable messages, no new messages are read from the Topic. However, there is a look-ahead exception: when all messages in a replay round are filtered out (e.g., due to consumers lacking permits), the dispatcher skips the next replay and proceeds directly to a Normal Read to fetch new messages forward (bounded by `effectiveLookAheadLimit`). + +**`containsStickyKeyHash` Ordering Check**: If a hash has unprocessed messages in the Replay queue, new messages for that hash read via Normal Read are also intercepted and redirected into Replay—new messages cannot be dispatched before old ones are processed. This is the core mechanism for guaranteeing per-key ordering. + +**Look-Ahead Limit**: When the Replay queue size reaches `effectiveLookAheadLimit` (default: `min(perConsumer × consumerCount, perSubscription)`), Normal Read is completely disabled and the Cursor stops reading new messages forward. + +## A Concrete Scenario + +Consider a typical consumption scenario: + +**Configuration**: +- Partitioned Topic with 200 partitions, 100 new Keys per second, each Key sends at 50 TPS for 100 seconds then stops, with a maximum of `100 × 100 = 10,000` active Keys concurrently +- Steady-state total throughput: `10,000 × 50 = 500,000 msg/s` +- Keys are evenly distributed by hash across 200 partitions, ~50 active Keys per partition, `50 × 50 = 2,500 msg/s` +- 20 Consumers, each Consumer connects to all 200 partitions (under Key_Shared subscription, the client automatically creates an internal consumer per partition) +- Each partition's Dispatcher sees 20 Consumers, each responsible for ~2–3 Keys in that partition +- Each message takes 15ms to process (normal business processing speed) +- Default configuration: `receiverQueueSize = 1000`, `keySharedLookAheadMsgInReplayThresholdPerConsumer = 2000`, `keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000` +- Per-partition `effectiveLookAheadLimit = min(2000 × 20, 20000) = 20,000` + +### Normal Operation + +Consumption capacity covers production rate (`66.7 msg/s > 50 TPS`). Each partition has 20 Consumers processing ~50 Keys in parallel, yielding per-partition throughput of `50 × 66.7 ≈ 3,335 msg/s > 2,500 msg/s`. + +- Replay queues across all 200 partitions remain essentially empty; Normal Read advances normally +- **All 10,000 active Keys are consumed in real time; the system is healthy** + +### Single-Key Hot Spot / Partial Key Processing Stall + +Assume Consumer-1 hangs due to downstream service timeout (cannot ack any messages). Consumer-1 connects to all 200 partitions, responsible for ~2–3 Keys in each partition. + +**Within each partition**: + +**T = 0s onwards**: After Consumer-1's permits are exhausted, messages for its 2–3 Keys can no longer be dispatched and start flooding the partition's Replay queue. Each partition injects messages into Replay at ~125 msg/s (2.5 keys × 50 TPS)—messages that will never be acked. + +**T = 0–3 minutes**: Replay queues across all 200 partitions grow monotonically, **simultaneously** and **independently**, at ~125 msg/s. The `containsStickyKeyHash` check ensures that new messages for these hashes read via Normal Read are also intercepted into Replay. As Replay grows, `getMaxEntriesReadLimit()` returns progressively smaller values, Normal Read batches are compressed, and throughput for other Keys gradually degrades. + +**T ≈ 2.7 minutes — the fatal tipping point**: Replay queues across all 200 partitions fill to the 20,000-entry limit almost simultaneously (`20000 / 125 ≈ 160s ≈ 2.7min`). `isNormalReadAllowed()` returns false. Normal Read is permanently disabled. + +**T > 3 minutes — global blackout**: Dispatchers across all 200 partitions enter a pure Replay loop—but Replay is dominated by messages destined for Consumer-1, which is already stuck. **The remaining 19 Consumers have abundant free permits but cannot receive new messages on any partition. The vast majority of the 10,000 active Keys are starved.** + +``` +A single Consumer's failure spreads to all partitions in under 3 minutes, +turning "500K msg/s real-time consumption" into "global blackout." +``` + +This is the core problem this PIP aims to solve. + +# Motivation + +As illustrated above, the `containsStickyKeyHash` ordering check forms a **self-reinforcing positive feedback loop**: + +``` +hash in replay → Normal Read messages for same hash are blocked → enter replay → replay grows +→ Normal Read batch shrinks → replay reaches limit faster → Normal Read stops → global blackout +``` + +The original design intent of this check is entirely correct (M11 must not be dispatched before M10 is consumed). But under traffic skew or when some Consumers stall, this ordering mechanism becomes an avalanche accelerator. **A few Keys' problem escalates into a catastrophe for all Keys.** + +## Why Not Simply Increase the Replay Limit + +Increasing `keySharedLookAheadMsgInReplayThresholdPerConsumer` and `keySharedLookAheadMsgInReplayThresholdPerSubscription` may seem like the most straightforward mitigation, but there are three structural issues: + +1. **Amplified I/O waste**: A larger replay queue means more stuck-Key messages are read from BK each Replay Read cycle, yet all of them are returned unchanged because the target Consumer has no permits—pure I/O waste. Review Comment: After checking this again, the existing implementation is already skipping reading from cache / BK. This happens in ReplayPositionFilter: https://github.com/apache/pulsar/blob/138595f6256c301956f9d77fde8534699e992536/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L612-L620 This means that there's no "amplified I/O waste" in the way that has been described. However there are most likely some corner cases that can be optimized further. However PIP-430 (since 4.1.0) mitigates most of the issues of unnecessary re-reads. Please take a look at https://github.com/apache/pulsar/blob/master/pip/pip-430.md. The PIP-430 broker cache should be sufficiently tuned for high scale use cases to avoid unnecessary BK reads when using Key_Shared subscriptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
