lhotari commented on code in PR #25706: URL: https://github.com/apache/pulsar/pull/25706#discussion_r3199370516
########## pip/pip-474.md: ########## @@ -0,0 +1,349 @@ +# PIP-474: Key_Shared Hot Key Overflow Mechanism + +# Background Knowledge + +## Key_Shared Subscription + +Key_Shared is a subscription type provided by Pulsar. It distributes messages in a Topic to multiple Consumers based on the Key dimension: messages with the same Key are always routed to the same Consumer, ensuring per-key ordering; messages with different Keys can be distributed to different Consumers, enabling parallel consumption. + +Specifically, the Broker computes a Murmur3 hash over each message's sticky key, maps it to the range `[0, 65535]`, and then uses `StickyKeyConsumerSelector` to bind each hash value to a Consumer. All Consumers under a Key_Shared subscription share a single `ManagedCursor`, which maintains the global consumption progress (the `mark-delete` position). + +## What Problem It Solves + +In traditional sequential consumption scenarios, achieving per-key ordering typically requires routing messages of the same Key to the same partition, which is then processed by that partition's sole consumer in order. This means **consumption parallelism is limited by the number of partitions**—to add consumers you must add partitions, but the partition count is a fundamental Topic configuration that cannot be adjusted freely or frequently. + +Key_Shared subscriptions break this constraint: within the same partition, messages with different Keys can be distributed to different Consumers. Consumption parallelism depends only on the number of Keys and the number of Consumers, and is independent of the partition count. + +## What It Achieves + +1. **Multiple Key-Consumer Binding Strategies**: Provides three Selector implementations—`HashRangeAutoSplit` (evenly split hash ranges automatically), `HashRangeExclusive` (manually specify ranges), and `ConsistentHashing` (consistent hashing)—to suit different scenarios. + +2. **Ordering Guarantee During Consumer Scaling**: In AUTO_SPLIT mode (and when out-of-order delivery is not enabled), the Draining Hashes mechanism ([PIP-379](https://github.com/apache/pulsar/issues/21199)) ensures ordering when Consumers join or leave and hash ranges are reassigned. Affected hashes enter a "draining" state—the new Consumer must wait until the old Consumer has finished processing all pending messages for that hash before it starts receiving, ensuring no reordering during scaling. + +## But It Has a Significant Problem + +**A hot Key (or a subset of Keys stuck in processing) will block consumption of *all other Keys*.** + +This problem stems from the interaction of three mechanisms within the Key_Shared Dispatcher: + +**Replay Queue**: When a message cannot be dispatched (Consumer has no permits, hash is blocked, etc.), its position is placed into the Replay queue (`MessageRedeliveryController`). Replay reads typically take priority over Normal reads—as long as the Replay queue is non-empty and has dispatchable messages, no new messages are read from the Topic. However, there is a look-ahead exception: when all messages in a replay round are filtered out (e.g., due to consumers lacking permits), the dispatcher skips the next replay and proceeds directly to a Normal Read to fetch new messages forward (bounded by `effectiveLookAheadLimit`). + +**`containsStickyKeyHash` Ordering Check**: If a hash has unprocessed messages in the Replay queue, new messages for that hash read via Normal Read are also intercepted and redirected into Replay—new messages cannot be dispatched before old ones are processed. This is the core mechanism for guaranteeing per-key ordering. + +**Look-Ahead Limit**: When the Replay queue size reaches `effectiveLookAheadLimit` (default: `min(perConsumer × consumerCount, perSubscription)`), Normal Read is completely disabled and the Cursor stops reading new messages forward. + +## A Concrete Scenario + +Consider a typical consumption scenario: + +**Configuration**: +- Partitioned Topic with 200 partitions, 100 new Keys per second, each Key sends at 50 TPS for 100 seconds then stops, with a maximum of `100 × 100 = 10,000` active Keys concurrently +- Steady-state total throughput: `10,000 × 50 = 500,000 msg/s` +- Keys are evenly distributed by hash across 200 partitions, ~50 active Keys per partition, `50 × 50 = 2,500 msg/s` +- 20 Consumers, each Consumer connects to all 200 partitions (under Key_Shared subscription, the client automatically creates an internal consumer per partition) +- Each partition's Dispatcher sees 20 Consumers, each responsible for ~2–3 Keys in that partition +- Each message takes 15ms to process (normal business processing speed) +- Default configuration: `receiverQueueSize = 1000`, `keySharedLookAheadMsgInReplayThresholdPerConsumer = 2000`, `keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000` +- Per-partition `effectiveLookAheadLimit = min(2000 × 20, 20000) = 20,000` + +### Normal Operation + +Consumption capacity covers production rate (`66.7 msg/s > 50 TPS`). Each partition has 20 Consumers processing ~50 Keys in parallel, yielding per-partition throughput of `50 × 66.7 ≈ 3,335 msg/s > 2,500 msg/s`. + +- Replay queues across all 200 partitions remain essentially empty; Normal Read advances normally +- **All 10,000 active Keys are consumed in real time; the system is healthy** + +### Single-Key Hot Spot / Partial Key Processing Stall + +Assume Consumer-1 hangs due to downstream service timeout (cannot ack any messages). Consumer-1 connects to all 200 partitions, responsible for ~2–3 Keys in each partition. + +**Within each partition**: + +**T = 0s onwards**: After Consumer-1's permits are exhausted, messages for its 2–3 Keys can no longer be dispatched and start flooding the partition's Replay queue. Each partition injects messages into Replay at ~125 msg/s (2.5 keys × 50 TPS)—messages that will never be acked. + +**T = 0–3 minutes**: Replay queues across all 200 partitions grow monotonically, **simultaneously** and **independently**, at ~125 msg/s. The `containsStickyKeyHash` check ensures that new messages for these hashes read via Normal Read are also intercepted into Replay. As Replay grows, `getMaxEntriesReadLimit()` returns progressively smaller values, Normal Read batches are compressed, and throughput for other Keys gradually degrades. + +**T ≈ 2.7 minutes — the fatal tipping point**: Replay queues across all 200 partitions fill to the 20,000-entry limit almost simultaneously (`20000 / 125 ≈ 160s ≈ 2.7min`). `isNormalReadAllowed()` returns false. Normal Read is permanently disabled. + +**T > 3 minutes — global blackout**: Dispatchers across all 200 partitions enter a pure Replay loop—but Replay is dominated by messages destined for Consumer-1, which is already stuck. **The remaining 19 Consumers have abundant free permits but cannot receive new messages on any partition. The vast majority of the 10,000 active Keys are starved.** + +``` +A single Consumer's failure spreads to all partitions in under 3 minutes, +turning "500K msg/s real-time consumption" into "global blackout." +``` + +This is the core problem this PIP aims to solve. + +# Motivation + +As illustrated above, the `containsStickyKeyHash` ordering check forms a **self-reinforcing positive feedback loop**: + +``` +hash in replay → Normal Read messages for same hash are blocked → enter replay → replay grows +→ Normal Read batch shrinks → replay reaches limit faster → Normal Read stops → global blackout +``` + +The original design intent of this check is entirely correct (M11 must not be dispatched before M10 is consumed). But under traffic skew or when some Consumers stall, this ordering mechanism becomes an avalanche accelerator. **A few Keys' problem escalates into a catastrophe for all Keys.** + +## Why Not Simply Increase the Replay Limit + +Increasing `keySharedLookAheadMsgInReplayThresholdPerConsumer` and `keySharedLookAheadMsgInReplayThresholdPerSubscription` may seem like the most straightforward mitigation, but there are three structural issues: + +1. **Amplified I/O waste**: A larger replay queue means more stuck-Key messages are read from BK each Replay Read cycle, yet all of them are returned unchanged because the target Consumer has no permits—pure I/O waste. Review Comment: Thanks for pointing this out. I had plans to improve this area after PIP-379 and PIP-430, but it hasn't happened. It would be possible to avoid unnecessary reading from cache / BK by skipping reads for positions that are for consumers that don't have permits available and permits are taken into account for how many entries are read for each hash. This optimization should be done regardless of any change to optimize for hot keys. ########## pip/pip-474.md: ########## @@ -0,0 +1,349 @@ +# PIP-474: Key_Shared Hot Key Overflow Mechanism + +# Background Knowledge + +## Key_Shared Subscription + +Key_Shared is a subscription type provided by Pulsar. It distributes messages in a Topic to multiple Consumers based on the Key dimension: messages with the same Key are always routed to the same Consumer, ensuring per-key ordering; messages with different Keys can be distributed to different Consumers, enabling parallel consumption. + +Specifically, the Broker computes a Murmur3 hash over each message's sticky key, maps it to the range `[0, 65535]`, and then uses `StickyKeyConsumerSelector` to bind each hash value to a Consumer. All Consumers under a Key_Shared subscription share a single `ManagedCursor`, which maintains the global consumption progress (the `mark-delete` position). + +## What Problem It Solves + +In traditional sequential consumption scenarios, achieving per-key ordering typically requires routing messages of the same Key to the same partition, which is then processed by that partition's sole consumer in order. This means **consumption parallelism is limited by the number of partitions**—to add consumers you must add partitions, but the partition count is a fundamental Topic configuration that cannot be adjusted freely or frequently. + +Key_Shared subscriptions break this constraint: within the same partition, messages with different Keys can be distributed to different Consumers. Consumption parallelism depends only on the number of Keys and the number of Consumers, and is independent of the partition count. + +## What It Achieves + +1. **Multiple Key-Consumer Binding Strategies**: Provides three Selector implementations—`HashRangeAutoSplit` (evenly split hash ranges automatically), `HashRangeExclusive` (manually specify ranges), and `ConsistentHashing` (consistent hashing)—to suit different scenarios. + +2. **Ordering Guarantee During Consumer Scaling**: In AUTO_SPLIT mode (and when out-of-order delivery is not enabled), the Draining Hashes mechanism ([PIP-379](https://github.com/apache/pulsar/issues/21199)) ensures ordering when Consumers join or leave and hash ranges are reassigned. Affected hashes enter a "draining" state—the new Consumer must wait until the old Consumer has finished processing all pending messages for that hash before it starts receiving, ensuring no reordering during scaling. + +## But It Has a Significant Problem + +**A hot Key (or a subset of Keys stuck in processing) will block consumption of *all other Keys*.** + +This problem stems from the interaction of three mechanisms within the Key_Shared Dispatcher: + +**Replay Queue**: When a message cannot be dispatched (Consumer has no permits, hash is blocked, etc.), its position is placed into the Replay queue (`MessageRedeliveryController`). Replay reads typically take priority over Normal reads—as long as the Replay queue is non-empty and has dispatchable messages, no new messages are read from the Topic. However, there is a look-ahead exception: when all messages in a replay round are filtered out (e.g., due to consumers lacking permits), the dispatcher skips the next replay and proceeds directly to a Normal Read to fetch new messages forward (bounded by `effectiveLookAheadLimit`). + +**`containsStickyKeyHash` Ordering Check**: If a hash has unprocessed messages in the Replay queue, new messages for that hash read via Normal Read are also intercepted and redirected into Replay—new messages cannot be dispatched before old ones are processed. This is the core mechanism for guaranteeing per-key ordering. + +**Look-Ahead Limit**: When the Replay queue size reaches `effectiveLookAheadLimit` (default: `min(perConsumer × consumerCount, perSubscription)`), Normal Read is completely disabled and the Cursor stops reading new messages forward. + +## A Concrete Scenario + +Consider a typical consumption scenario: + +**Configuration**: +- Partitioned Topic with 200 partitions, 100 new Keys per second, each Key sends at 50 TPS for 100 seconds then stops, with a maximum of `100 × 100 = 10,000` active Keys concurrently +- Steady-state total throughput: `10,000 × 50 = 500,000 msg/s` +- Keys are evenly distributed by hash across 200 partitions, ~50 active Keys per partition, `50 × 50 = 2,500 msg/s` +- 20 Consumers, each Consumer connects to all 200 partitions (under Key_Shared subscription, the client automatically creates an internal consumer per partition) +- Each partition's Dispatcher sees 20 Consumers, each responsible for ~2–3 Keys in that partition +- Each message takes 15ms to process (normal business processing speed) +- Default configuration: `receiverQueueSize = 1000`, `keySharedLookAheadMsgInReplayThresholdPerConsumer = 2000`, `keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000` +- Per-partition `effectiveLookAheadLimit = min(2000 × 20, 20000) = 20,000` + +### Normal Operation + +Consumption capacity covers production rate (`66.7 msg/s > 50 TPS`). Each partition has 20 Consumers processing ~50 Keys in parallel, yielding per-partition throughput of `50 × 66.7 ≈ 3,335 msg/s > 2,500 msg/s`. + +- Replay queues across all 200 partitions remain essentially empty; Normal Read advances normally +- **All 10,000 active Keys are consumed in real time; the system is healthy** + +### Single-Key Hot Spot / Partial Key Processing Stall + +Assume Consumer-1 hangs due to downstream service timeout (cannot ack any messages). Consumer-1 connects to all 200 partitions, responsible for ~2–3 Keys in each partition. + +**Within each partition**: + +**T = 0s onwards**: After Consumer-1's permits are exhausted, messages for its 2–3 Keys can no longer be dispatched and start flooding the partition's Replay queue. Each partition injects messages into Replay at ~125 msg/s (2.5 keys × 50 TPS)—messages that will never be acked. + +**T = 0–3 minutes**: Replay queues across all 200 partitions grow monotonically, **simultaneously** and **independently**, at ~125 msg/s. The `containsStickyKeyHash` check ensures that new messages for these hashes read via Normal Read are also intercepted into Replay. As Replay grows, `getMaxEntriesReadLimit()` returns progressively smaller values, Normal Read batches are compressed, and throughput for other Keys gradually degrades. + +**T ≈ 2.7 minutes — the fatal tipping point**: Replay queues across all 200 partitions fill to the 20,000-entry limit almost simultaneously (`20000 / 125 ≈ 160s ≈ 2.7min`). `isNormalReadAllowed()` returns false. Normal Read is permanently disabled. + +**T > 3 minutes — global blackout**: Dispatchers across all 200 partitions enter a pure Replay loop—but Replay is dominated by messages destined for Consumer-1, which is already stuck. **The remaining 19 Consumers have abundant free permits but cannot receive new messages on any partition. The vast majority of the 10,000 active Keys are starved.** + +``` +A single Consumer's failure spreads to all partitions in under 3 minutes, +turning "500K msg/s real-time consumption" into "global blackout." +``` + +This is the core problem this PIP aims to solve. + +# Motivation + +As illustrated above, the `containsStickyKeyHash` ordering check forms a **self-reinforcing positive feedback loop**: + +``` +hash in replay → Normal Read messages for same hash are blocked → enter replay → replay grows +→ Normal Read batch shrinks → replay reaches limit faster → Normal Read stops → global blackout +``` + +The original design intent of this check is entirely correct (M11 must not be dispatched before M10 is consumed). But under traffic skew or when some Consumers stall, this ordering mechanism becomes an avalanche accelerator. **A few Keys' problem escalates into a catastrophe for all Keys.** + +## Why Not Simply Increase the Replay Limit + +Increasing `keySharedLookAheadMsgInReplayThresholdPerConsumer` and `keySharedLookAheadMsgInReplayThresholdPerSubscription` may seem like the most straightforward mitigation, but there are three structural issues: + +1. **Amplified I/O waste**: A larger replay queue means more stuck-Key messages are read from BK each Replay Read cycle, yet all of them are returned unchanged because the target Consumer has no permits—pure I/O waste. +2. **Normal Read batch compression**: `getMaxEntriesReadLimit()` returns `max(effectiveLimit - replaySize, 1)`—larger replay means smaller Normal Read, merely delaying the blackout rather than solving it. +3. **Mark-delete gap inflation**: Stuck-Key messages remain unacked for extended periods, preventing mark-delete from advancing. `individualDeletedMessages` grows continuously and may exceed `managedLedgerMaxUnackedRangesToPersist`, causing confirmed ranges to be discarded and messages to be redelivered; on broker restart, the system must rescan all gaps from mark-delete. + +**We need a mechanism to completely remove stuck-Key messages from the main dispatch path so that neither Normal Read nor mark-delete is affected by them.** Review Comment: This sentence goes directly to a single solution ("remove stuck-key messages from the main dispatch path"). We should be open for multiple options and choose the one that solves the problem. ########## pip/pip-474.md: ########## @@ -0,0 +1,349 @@ +# PIP-474: Key_Shared Hot Key Overflow Mechanism + +# Background Knowledge + +## Key_Shared Subscription + +Key_Shared is a subscription type provided by Pulsar. It distributes messages in a Topic to multiple Consumers based on the Key dimension: messages with the same Key are always routed to the same Consumer, ensuring per-key ordering; messages with different Keys can be distributed to different Consumers, enabling parallel consumption. + +Specifically, the Broker computes a Murmur3 hash over each message's sticky key, maps it to the range `[0, 65535]`, and then uses `StickyKeyConsumerSelector` to bind each hash value to a Consumer. All Consumers under a Key_Shared subscription share a single `ManagedCursor`, which maintains the global consumption progress (the `mark-delete` position). + +## What Problem It Solves + +In traditional sequential consumption scenarios, achieving per-key ordering typically requires routing messages of the same Key to the same partition, which is then processed by that partition's sole consumer in order. This means **consumption parallelism is limited by the number of partitions**—to add consumers you must add partitions, but the partition count is a fundamental Topic configuration that cannot be adjusted freely or frequently. + +Key_Shared subscriptions break this constraint: within the same partition, messages with different Keys can be distributed to different Consumers. Consumption parallelism depends only on the number of Keys and the number of Consumers, and is independent of the partition count. + +## What It Achieves + +1. **Multiple Key-Consumer Binding Strategies**: Provides three Selector implementations—`HashRangeAutoSplit` (evenly split hash ranges automatically), `HashRangeExclusive` (manually specify ranges), and `ConsistentHashing` (consistent hashing)—to suit different scenarios. + +2. **Ordering Guarantee During Consumer Scaling**: In AUTO_SPLIT mode (and when out-of-order delivery is not enabled), the Draining Hashes mechanism ([PIP-379](https://github.com/apache/pulsar/issues/21199)) ensures ordering when Consumers join or leave and hash ranges are reassigned. Affected hashes enter a "draining" state—the new Consumer must wait until the old Consumer has finished processing all pending messages for that hash before it starts receiving, ensuring no reordering during scaling. + +## But It Has a Significant Problem + +**A hot Key (or a subset of Keys stuck in processing) will block consumption of *all other Keys*.** + +This problem stems from the interaction of three mechanisms within the Key_Shared Dispatcher: + +**Replay Queue**: When a message cannot be dispatched (Consumer has no permits, hash is blocked, etc.), its position is placed into the Replay queue (`MessageRedeliveryController`). Replay reads typically take priority over Normal reads—as long as the Replay queue is non-empty and has dispatchable messages, no new messages are read from the Topic. However, there is a look-ahead exception: when all messages in a replay round are filtered out (e.g., due to consumers lacking permits), the dispatcher skips the next replay and proceeds directly to a Normal Read to fetch new messages forward (bounded by `effectiveLookAheadLimit`). + +**`containsStickyKeyHash` Ordering Check**: If a hash has unprocessed messages in the Replay queue, new messages for that hash read via Normal Read are also intercepted and redirected into Replay—new messages cannot be dispatched before old ones are processed. This is the core mechanism for guaranteeing per-key ordering. + +**Look-Ahead Limit**: When the Replay queue size reaches `effectiveLookAheadLimit` (default: `min(perConsumer × consumerCount, perSubscription)`), Normal Read is completely disabled and the Cursor stops reading new messages forward. + +## A Concrete Scenario + +Consider a typical consumption scenario: + +**Configuration**: +- Partitioned Topic with 200 partitions, 100 new Keys per second, each Key sends at 50 TPS for 100 seconds then stops, with a maximum of `100 × 100 = 10,000` active Keys concurrently +- Steady-state total throughput: `10,000 × 50 = 500,000 msg/s` +- Keys are evenly distributed by hash across 200 partitions, ~50 active Keys per partition, `50 × 50 = 2,500 msg/s` +- 20 Consumers, each Consumer connects to all 200 partitions (under Key_Shared subscription, the client automatically creates an internal consumer per partition) +- Each partition's Dispatcher sees 20 Consumers, each responsible for ~2–3 Keys in that partition +- Each message takes 15ms to process (normal business processing speed) +- Default configuration: `receiverQueueSize = 1000`, `keySharedLookAheadMsgInReplayThresholdPerConsumer = 2000`, `keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000` +- Per-partition `effectiveLookAheadLimit = min(2000 × 20, 20000) = 20,000` + +### Normal Operation + +Consumption capacity covers production rate (`66.7 msg/s > 50 TPS`). Each partition has 20 Consumers processing ~50 Keys in parallel, yielding per-partition throughput of `50 × 66.7 ≈ 3,335 msg/s > 2,500 msg/s`. + +- Replay queues across all 200 partitions remain essentially empty; Normal Read advances normally +- **All 10,000 active Keys are consumed in real time; the system is healthy** + +### Single-Key Hot Spot / Partial Key Processing Stall + +Assume Consumer-1 hangs due to downstream service timeout (cannot ack any messages). Consumer-1 connects to all 200 partitions, responsible for ~2–3 Keys in each partition. + +**Within each partition**: + +**T = 0s onwards**: After Consumer-1's permits are exhausted, messages for its 2–3 Keys can no longer be dispatched and start flooding the partition's Replay queue. Each partition injects messages into Replay at ~125 msg/s (2.5 keys × 50 TPS)—messages that will never be acked. + +**T = 0–3 minutes**: Replay queues across all 200 partitions grow monotonically, **simultaneously** and **independently**, at ~125 msg/s. The `containsStickyKeyHash` check ensures that new messages for these hashes read via Normal Read are also intercepted into Replay. As Replay grows, `getMaxEntriesReadLimit()` returns progressively smaller values, Normal Read batches are compressed, and throughput for other Keys gradually degrades. + +**T ≈ 2.7 minutes — the fatal tipping point**: Replay queues across all 200 partitions fill to the 20,000-entry limit almost simultaneously (`20000 / 125 ≈ 160s ≈ 2.7min`). `isNormalReadAllowed()` returns false. Normal Read is permanently disabled. + +**T > 3 minutes — global blackout**: Dispatchers across all 200 partitions enter a pure Replay loop—but Replay is dominated by messages destined for Consumer-1, which is already stuck. **The remaining 19 Consumers have abundant free permits but cannot receive new messages on any partition. The vast majority of the 10,000 active Keys are starved.** + +``` +A single Consumer's failure spreads to all partitions in under 3 minutes, +turning "500K msg/s real-time consumption" into "global blackout." +``` + +This is the core problem this PIP aims to solve. + +# Motivation + +As illustrated above, the `containsStickyKeyHash` ordering check forms a **self-reinforcing positive feedback loop**: + +``` +hash in replay → Normal Read messages for same hash are blocked → enter replay → replay grows +→ Normal Read batch shrinks → replay reaches limit faster → Normal Read stops → global blackout Review Comment: "global blackout" is confusing. A better wording would be "no progress for any consumer" ########## pip/pip-474.md: ########## @@ -0,0 +1,349 @@ +# PIP-474: Key_Shared Hot Key Overflow Mechanism + +# Background Knowledge + +## Key_Shared Subscription + +Key_Shared is a subscription type provided by Pulsar. It distributes messages in a Topic to multiple Consumers based on the Key dimension: messages with the same Key are always routed to the same Consumer, ensuring per-key ordering; messages with different Keys can be distributed to different Consumers, enabling parallel consumption. + +Specifically, the Broker computes a Murmur3 hash over each message's sticky key, maps it to the range `[0, 65535]`, and then uses `StickyKeyConsumerSelector` to bind each hash value to a Consumer. All Consumers under a Key_Shared subscription share a single `ManagedCursor`, which maintains the global consumption progress (the `mark-delete` position). + +## What Problem It Solves + +In traditional sequential consumption scenarios, achieving per-key ordering typically requires routing messages of the same Key to the same partition, which is then processed by that partition's sole consumer in order. This means **consumption parallelism is limited by the number of partitions**—to add consumers you must add partitions, but the partition count is a fundamental Topic configuration that cannot be adjusted freely or frequently. + +Key_Shared subscriptions break this constraint: within the same partition, messages with different Keys can be distributed to different Consumers. Consumption parallelism depends only on the number of Keys and the number of Consumers, and is independent of the partition count. + +## What It Achieves + +1. **Multiple Key-Consumer Binding Strategies**: Provides three Selector implementations—`HashRangeAutoSplit` (evenly split hash ranges automatically), `HashRangeExclusive` (manually specify ranges), and `ConsistentHashing` (consistent hashing)—to suit different scenarios. + +2. **Ordering Guarantee During Consumer Scaling**: In AUTO_SPLIT mode (and when out-of-order delivery is not enabled), the Draining Hashes mechanism ([PIP-379](https://github.com/apache/pulsar/issues/21199)) ensures ordering when Consumers join or leave and hash ranges are reassigned. Affected hashes enter a "draining" state—the new Consumer must wait until the old Consumer has finished processing all pending messages for that hash before it starts receiving, ensuring no reordering during scaling. + +## But It Has a Significant Problem + +**A hot Key (or a subset of Keys stuck in processing) will block consumption of *all other Keys*.** + +This problem stems from the interaction of three mechanisms within the Key_Shared Dispatcher: + +**Replay Queue**: When a message cannot be dispatched (Consumer has no permits, hash is blocked, etc.), its position is placed into the Replay queue (`MessageRedeliveryController`). Replay reads typically take priority over Normal reads—as long as the Replay queue is non-empty and has dispatchable messages, no new messages are read from the Topic. However, there is a look-ahead exception: when all messages in a replay round are filtered out (e.g., due to consumers lacking permits), the dispatcher skips the next replay and proceeds directly to a Normal Read to fetch new messages forward (bounded by `effectiveLookAheadLimit`). + +**`containsStickyKeyHash` Ordering Check**: If a hash has unprocessed messages in the Replay queue, new messages for that hash read via Normal Read are also intercepted and redirected into Replay—new messages cannot be dispatched before old ones are processed. This is the core mechanism for guaranteeing per-key ordering. + +**Look-Ahead Limit**: When the Replay queue size reaches `effectiveLookAheadLimit` (default: `min(perConsumer × consumerCount, perSubscription)`), Normal Read is completely disabled and the Cursor stops reading new messages forward. + +## A Concrete Scenario + +Consider a typical consumption scenario: + +**Configuration**: +- Partitioned Topic with 200 partitions, 100 new Keys per second, each Key sends at 50 TPS for 100 seconds then stops, with a maximum of `100 × 100 = 10,000` active Keys concurrently +- Steady-state total throughput: `10,000 × 50 = 500,000 msg/s` +- Keys are evenly distributed by hash across 200 partitions, ~50 active Keys per partition, `50 × 50 = 2,500 msg/s` +- 20 Consumers, each Consumer connects to all 200 partitions (under Key_Shared subscription, the client automatically creates an internal consumer per partition) +- Each partition's Dispatcher sees 20 Consumers, each responsible for ~2–3 Keys in that partition +- Each message takes 15ms to process (normal business processing speed) +- Default configuration: `receiverQueueSize = 1000`, `keySharedLookAheadMsgInReplayThresholdPerConsumer = 2000`, `keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000` +- Per-partition `effectiveLookAheadLimit = min(2000 × 20, 20000) = 20,000` + +### Normal Operation + +Consumption capacity covers production rate (`66.7 msg/s > 50 TPS`). Each partition has 20 Consumers processing ~50 Keys in parallel, yielding per-partition throughput of `50 × 66.7 ≈ 3,335 msg/s > 2,500 msg/s`. + +- Replay queues across all 200 partitions remain essentially empty; Normal Read advances normally +- **All 10,000 active Keys are consumed in real time; the system is healthy** + +### Single-Key Hot Spot / Partial Key Processing Stall + +Assume Consumer-1 hangs due to downstream service timeout (cannot ack any messages). Consumer-1 connects to all 200 partitions, responsible for ~2–3 Keys in each partition. + +**Within each partition**: + +**T = 0s onwards**: After Consumer-1's permits are exhausted, messages for its 2–3 Keys can no longer be dispatched and start flooding the partition's Replay queue. Each partition injects messages into Replay at ~125 msg/s (2.5 keys × 50 TPS)—messages that will never be acked. + +**T = 0–3 minutes**: Replay queues across all 200 partitions grow monotonically, **simultaneously** and **independently**, at ~125 msg/s. The `containsStickyKeyHash` check ensures that new messages for these hashes read via Normal Read are also intercepted into Replay. As Replay grows, `getMaxEntriesReadLimit()` returns progressively smaller values, Normal Read batches are compressed, and throughput for other Keys gradually degrades. + +**T ≈ 2.7 minutes — the fatal tipping point**: Replay queues across all 200 partitions fill to the 20,000-entry limit almost simultaneously (`20000 / 125 ≈ 160s ≈ 2.7min`). `isNormalReadAllowed()` returns false. Normal Read is permanently disabled. + +**T > 3 minutes — global blackout**: Dispatchers across all 200 partitions enter a pure Replay loop—but Replay is dominated by messages destined for Consumer-1, which is already stuck. **The remaining 19 Consumers have abundant free permits but cannot receive new messages on any partition. The vast majority of the 10,000 active Keys are starved.** + +``` +A single Consumer's failure spreads to all partitions in under 3 minutes, +turning "500K msg/s real-time consumption" into "global blackout." +``` + +This is the core problem this PIP aims to solve. + +# Motivation + +As illustrated above, the `containsStickyKeyHash` ordering check forms a **self-reinforcing positive feedback loop**: + +``` +hash in replay → Normal Read messages for same hash are blocked → enter replay → replay grows +→ Normal Read batch shrinks → replay reaches limit faster → Normal Read stops → global blackout +``` + +The original design intent of this check is entirely correct (M11 must not be dispatched before M10 is consumed). But under traffic skew or when some Consumers stall, this ordering mechanism becomes an avalanche accelerator. **A few Keys' problem escalates into a catastrophe for all Keys.** Review Comment: "catastrophe" is confusing. It's better to describe the actual impact. "A single hot key exhausting a single consumer could result in blocking progress for all consumers." ########## pip/pip-474.md: ########## @@ -0,0 +1,349 @@ +# PIP-474: Key_Shared Hot Key Overflow Mechanism + +# Background Knowledge + +## Key_Shared Subscription + +Key_Shared is a subscription type provided by Pulsar. It distributes messages in a Topic to multiple Consumers based on the Key dimension: messages with the same Key are always routed to the same Consumer, ensuring per-key ordering; messages with different Keys can be distributed to different Consumers, enabling parallel consumption. + +Specifically, the Broker computes a Murmur3 hash over each message's sticky key, maps it to the range `[0, 65535]`, and then uses `StickyKeyConsumerSelector` to bind each hash value to a Consumer. All Consumers under a Key_Shared subscription share a single `ManagedCursor`, which maintains the global consumption progress (the `mark-delete` position). + +## What Problem It Solves + +In traditional sequential consumption scenarios, achieving per-key ordering typically requires routing messages of the same Key to the same partition, which is then processed by that partition's sole consumer in order. This means **consumption parallelism is limited by the number of partitions**—to add consumers you must add partitions, but the partition count is a fundamental Topic configuration that cannot be adjusted freely or frequently. + +Key_Shared subscriptions break this constraint: within the same partition, messages with different Keys can be distributed to different Consumers. Consumption parallelism depends only on the number of Keys and the number of Consumers, and is independent of the partition count. + +## What It Achieves + +1. **Multiple Key-Consumer Binding Strategies**: Provides three Selector implementations—`HashRangeAutoSplit` (evenly split hash ranges automatically), `HashRangeExclusive` (manually specify ranges), and `ConsistentHashing` (consistent hashing)—to suit different scenarios. + +2. **Ordering Guarantee During Consumer Scaling**: In AUTO_SPLIT mode (and when out-of-order delivery is not enabled), the Draining Hashes mechanism ([PIP-379](https://github.com/apache/pulsar/issues/21199)) ensures ordering when Consumers join or leave and hash ranges are reassigned. Affected hashes enter a "draining" state—the new Consumer must wait until the old Consumer has finished processing all pending messages for that hash before it starts receiving, ensuring no reordering during scaling. + +## But It Has a Significant Problem + +**A hot Key (or a subset of Keys stuck in processing) will block consumption of *all other Keys*.** + +This problem stems from the interaction of three mechanisms within the Key_Shared Dispatcher: + +**Replay Queue**: When a message cannot be dispatched (Consumer has no permits, hash is blocked, etc.), its position is placed into the Replay queue (`MessageRedeliveryController`). Replay reads typically take priority over Normal reads—as long as the Replay queue is non-empty and has dispatchable messages, no new messages are read from the Topic. However, there is a look-ahead exception: when all messages in a replay round are filtered out (e.g., due to consumers lacking permits), the dispatcher skips the next replay and proceeds directly to a Normal Read to fetch new messages forward (bounded by `effectiveLookAheadLimit`). + +**`containsStickyKeyHash` Ordering Check**: If a hash has unprocessed messages in the Replay queue, new messages for that hash read via Normal Read are also intercepted and redirected into Replay—new messages cannot be dispatched before old ones are processed. This is the core mechanism for guaranteeing per-key ordering. + +**Look-Ahead Limit**: When the Replay queue size reaches `effectiveLookAheadLimit` (default: `min(perConsumer × consumerCount, perSubscription)`), Normal Read is completely disabled and the Cursor stops reading new messages forward. + +## A Concrete Scenario + +Consider a typical consumption scenario: + +**Configuration**: +- Partitioned Topic with 200 partitions, 100 new Keys per second, each Key sends at 50 TPS for 100 seconds then stops, with a maximum of `100 × 100 = 10,000` active Keys concurrently +- Steady-state total throughput: `10,000 × 50 = 500,000 msg/s` +- Keys are evenly distributed by hash across 200 partitions, ~50 active Keys per partition, `50 × 50 = 2,500 msg/s` +- 20 Consumers, each Consumer connects to all 200 partitions (under Key_Shared subscription, the client automatically creates an internal consumer per partition) +- Each partition's Dispatcher sees 20 Consumers, each responsible for ~2–3 Keys in that partition +- Each message takes 15ms to process (normal business processing speed) +- Default configuration: `receiverQueueSize = 1000`, `keySharedLookAheadMsgInReplayThresholdPerConsumer = 2000`, `keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000` +- Per-partition `effectiveLookAheadLimit = min(2000 × 20, 20000) = 20,000` + +### Normal Operation + +Consumption capacity covers production rate (`66.7 msg/s > 50 TPS`). Each partition has 20 Consumers processing ~50 Keys in parallel, yielding per-partition throughput of `50 × 66.7 ≈ 3,335 msg/s > 2,500 msg/s`. + +- Replay queues across all 200 partitions remain essentially empty; Normal Read advances normally +- **All 10,000 active Keys are consumed in real time; the system is healthy** + +### Single-Key Hot Spot / Partial Key Processing Stall + +Assume Consumer-1 hangs due to downstream service timeout (cannot ack any messages). Consumer-1 connects to all 200 partitions, responsible for ~2–3 Keys in each partition. + +**Within each partition**: + +**T = 0s onwards**: After Consumer-1's permits are exhausted, messages for its 2–3 Keys can no longer be dispatched and start flooding the partition's Replay queue. Each partition injects messages into Replay at ~125 msg/s (2.5 keys × 50 TPS)—messages that will never be acked. + +**T = 0–3 minutes**: Replay queues across all 200 partitions grow monotonically, **simultaneously** and **independently**, at ~125 msg/s. The `containsStickyKeyHash` check ensures that new messages for these hashes read via Normal Read are also intercepted into Replay. As Replay grows, `getMaxEntriesReadLimit()` returns progressively smaller values, Normal Read batches are compressed, and throughput for other Keys gradually degrades. + +**T ≈ 2.7 minutes — the fatal tipping point**: Replay queues across all 200 partitions fill to the 20,000-entry limit almost simultaneously (`20000 / 125 ≈ 160s ≈ 2.7min`). `isNormalReadAllowed()` returns false. Normal Read is permanently disabled. + +**T > 3 minutes — global blackout**: Dispatchers across all 200 partitions enter a pure Replay loop—but Replay is dominated by messages destined for Consumer-1, which is already stuck. **The remaining 19 Consumers have abundant free permits but cannot receive new messages on any partition. The vast majority of the 10,000 active Keys are starved.** + +``` +A single Consumer's failure spreads to all partitions in under 3 minutes, +turning "500K msg/s real-time consumption" into "global blackout." +``` + +This is the core problem this PIP aims to solve. + +# Motivation + +As illustrated above, the `containsStickyKeyHash` ordering check forms a **self-reinforcing positive feedback loop**: + +``` +hash in replay → Normal Read messages for same hash are blocked → enter replay → replay grows +→ Normal Read batch shrinks → replay reaches limit faster → Normal Read stops → global blackout +``` + +The original design intent of this check is entirely correct (M11 must not be dispatched before M10 is consumed). But under traffic skew or when some Consumers stall, this ordering mechanism becomes an avalanche accelerator. **A few Keys' problem escalates into a catastrophe for all Keys.** + +## Why Not Simply Increase the Replay Limit + +Increasing `keySharedLookAheadMsgInReplayThresholdPerConsumer` and `keySharedLookAheadMsgInReplayThresholdPerSubscription` may seem like the most straightforward mitigation, but there are three structural issues: + +1. **Amplified I/O waste**: A larger replay queue means more stuck-Key messages are read from BK each Replay Read cycle, yet all of them are returned unchanged because the target Consumer has no permits—pure I/O waste. +2. **Normal Read batch compression**: `getMaxEntriesReadLimit()` returns `max(effectiveLimit - replaySize, 1)`—larger replay means smaller Normal Read, merely delaying the blackout rather than solving it. Review Comment: "blackout" is confusing here. "delaying the situation where all consumers get blocked" ########## pip/pip-474.md: ########## @@ -0,0 +1,349 @@ +# PIP-474: Key_Shared Hot Key Overflow Mechanism + +# Background Knowledge + +## Key_Shared Subscription + +Key_Shared is a subscription type provided by Pulsar. It distributes messages in a Topic to multiple Consumers based on the Key dimension: messages with the same Key are always routed to the same Consumer, ensuring per-key ordering; messages with different Keys can be distributed to different Consumers, enabling parallel consumption. + +Specifically, the Broker computes a Murmur3 hash over each message's sticky key, maps it to the range `[0, 65535]`, and then uses `StickyKeyConsumerSelector` to bind each hash value to a Consumer. All Consumers under a Key_Shared subscription share a single `ManagedCursor`, which maintains the global consumption progress (the `mark-delete` position). + +## What Problem It Solves + +In traditional sequential consumption scenarios, achieving per-key ordering typically requires routing messages of the same Key to the same partition, which is then processed by that partition's sole consumer in order. This means **consumption parallelism is limited by the number of partitions**—to add consumers you must add partitions, but the partition count is a fundamental Topic configuration that cannot be adjusted freely or frequently. + +Key_Shared subscriptions break this constraint: within the same partition, messages with different Keys can be distributed to different Consumers. Consumption parallelism depends only on the number of Keys and the number of Consumers, and is independent of the partition count. + +## What It Achieves + +1. **Multiple Key-Consumer Binding Strategies**: Provides three Selector implementations—`HashRangeAutoSplit` (evenly split hash ranges automatically), `HashRangeExclusive` (manually specify ranges), and `ConsistentHashing` (consistent hashing)—to suit different scenarios. + +2. **Ordering Guarantee During Consumer Scaling**: In AUTO_SPLIT mode (and when out-of-order delivery is not enabled), the Draining Hashes mechanism ([PIP-379](https://github.com/apache/pulsar/issues/21199)) ensures ordering when Consumers join or leave and hash ranges are reassigned. Affected hashes enter a "draining" state—the new Consumer must wait until the old Consumer has finished processing all pending messages for that hash before it starts receiving, ensuring no reordering during scaling. + +## But It Has a Significant Problem + +**A hot Key (or a subset of Keys stuck in processing) will block consumption of *all other Keys*.** + +This problem stems from the interaction of three mechanisms within the Key_Shared Dispatcher: + +**Replay Queue**: When a message cannot be dispatched (Consumer has no permits, hash is blocked, etc.), its position is placed into the Replay queue (`MessageRedeliveryController`). Replay reads typically take priority over Normal reads—as long as the Replay queue is non-empty and has dispatchable messages, no new messages are read from the Topic. However, there is a look-ahead exception: when all messages in a replay round are filtered out (e.g., due to consumers lacking permits), the dispatcher skips the next replay and proceeds directly to a Normal Read to fetch new messages forward (bounded by `effectiveLookAheadLimit`). + +**`containsStickyKeyHash` Ordering Check**: If a hash has unprocessed messages in the Replay queue, new messages for that hash read via Normal Read are also intercepted and redirected into Replay—new messages cannot be dispatched before old ones are processed. This is the core mechanism for guaranteeing per-key ordering. + +**Look-Ahead Limit**: When the Replay queue size reaches `effectiveLookAheadLimit` (default: `min(perConsumer × consumerCount, perSubscription)`), Normal Read is completely disabled and the Cursor stops reading new messages forward. + +## A Concrete Scenario + +Consider a typical consumption scenario: + +**Configuration**: +- Partitioned Topic with 200 partitions, 100 new Keys per second, each Key sends at 50 TPS for 100 seconds then stops, with a maximum of `100 × 100 = 10,000` active Keys concurrently +- Steady-state total throughput: `10,000 × 50 = 500,000 msg/s` +- Keys are evenly distributed by hash across 200 partitions, ~50 active Keys per partition, `50 × 50 = 2,500 msg/s` +- 20 Consumers, each Consumer connects to all 200 partitions (under Key_Shared subscription, the client automatically creates an internal consumer per partition) +- Each partition's Dispatcher sees 20 Consumers, each responsible for ~2–3 Keys in that partition +- Each message takes 15ms to process (normal business processing speed) +- Default configuration: `receiverQueueSize = 1000`, `keySharedLookAheadMsgInReplayThresholdPerConsumer = 2000`, `keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000` +- Per-partition `effectiveLookAheadLimit = min(2000 × 20, 20000) = 20,000` + +### Normal Operation + +Consumption capacity covers production rate (`66.7 msg/s > 50 TPS`). Each partition has 20 Consumers processing ~50 Keys in parallel, yielding per-partition throughput of `50 × 66.7 ≈ 3,335 msg/s > 2,500 msg/s`. + +- Replay queues across all 200 partitions remain essentially empty; Normal Read advances normally +- **All 10,000 active Keys are consumed in real time; the system is healthy** + +### Single-Key Hot Spot / Partial Key Processing Stall + +Assume Consumer-1 hangs due to downstream service timeout (cannot ack any messages). Consumer-1 connects to all 200 partitions, responsible for ~2–3 Keys in each partition. + +**Within each partition**: + +**T = 0s onwards**: After Consumer-1's permits are exhausted, messages for its 2–3 Keys can no longer be dispatched and start flooding the partition's Replay queue. Each partition injects messages into Replay at ~125 msg/s (2.5 keys × 50 TPS)—messages that will never be acked. + +**T = 0–3 minutes**: Replay queues across all 200 partitions grow monotonically, **simultaneously** and **independently**, at ~125 msg/s. The `containsStickyKeyHash` check ensures that new messages for these hashes read via Normal Read are also intercepted into Replay. As Replay grows, `getMaxEntriesReadLimit()` returns progressively smaller values, Normal Read batches are compressed, and throughput for other Keys gradually degrades. + +**T ≈ 2.7 minutes — the fatal tipping point**: Replay queues across all 200 partitions fill to the 20,000-entry limit almost simultaneously (`20000 / 125 ≈ 160s ≈ 2.7min`). `isNormalReadAllowed()` returns false. Normal Read is permanently disabled. + +**T > 3 minutes — global blackout**: Dispatchers across all 200 partitions enter a pure Replay loop—but Replay is dominated by messages destined for Consumer-1, which is already stuck. **The remaining 19 Consumers have abundant free permits but cannot receive new messages on any partition. The vast majority of the 10,000 active Keys are starved.** + +``` +A single Consumer's failure spreads to all partitions in under 3 minutes, +turning "500K msg/s real-time consumption" into "global blackout." +``` + +This is the core problem this PIP aims to solve. + +# Motivation + +As illustrated above, the `containsStickyKeyHash` ordering check forms a **self-reinforcing positive feedback loop**: + +``` +hash in replay → Normal Read messages for same hash are blocked → enter replay → replay grows +→ Normal Read batch shrinks → replay reaches limit faster → Normal Read stops → global blackout +``` + +The original design intent of this check is entirely correct (M11 must not be dispatched before M10 is consumed). But under traffic skew or when some Consumers stall, this ordering mechanism becomes an avalanche accelerator. **A few Keys' problem escalates into a catastrophe for all Keys.** + +## Why Not Simply Increase the Replay Limit + +Increasing `keySharedLookAheadMsgInReplayThresholdPerConsumer` and `keySharedLookAheadMsgInReplayThresholdPerSubscription` may seem like the most straightforward mitigation, but there are three structural issues: + +1. **Amplified I/O waste**: A larger replay queue means more stuck-Key messages are read from BK each Replay Read cycle, yet all of them are returned unchanged because the target Consumer has no permits—pure I/O waste. +2. **Normal Read batch compression**: `getMaxEntriesReadLimit()` returns `max(effectiveLimit - replaySize, 1)`—larger replay means smaller Normal Read, merely delaying the blackout rather than solving it. +3. **Mark-delete gap inflation**: Stuck-Key messages remain unacked for extended periods, preventing mark-delete from advancing. `individualDeletedMessages` grows continuously and may exceed `managedLedgerMaxUnackedRangesToPersist`, causing confirmed ranges to be discarded and messages to be redelivered; on broker restart, the system must rescan all gaps from mark-delete. + +**We need a mechanism to completely remove stuck-Key messages from the main dispatch path so that neither Normal Read nor mark-delete is affected by them.** + +# Goals + +## In Scope + +- Completely remove stuck-Key messages from the main dispatch path so that Normal Read is no longer blocked and other Consumers are unaffected Review Comment: Same comment as above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
