lhotari commented on PR #25706: URL: https://github.com/apache/pulsar/pull/25706#issuecomment-4396605560
Thanks for the detailed response! I really appreciate you taking the time to think through the alternatives. Let me share some thoughts in return. I'd suggest validating Alternative 2 thoroughly before deciding to implement PIP-474. The reason is that Alternative 2 is the simplest approach, and by following this path, we avoid the new failure modes that the proposed PIP-474 implementation introduces. I'd also like to share some comments on your response. Rather than pushing back on the alternatives, I think it would be more useful to genuinely put effort into comparing them. > **Alternative 1** > > 1. The mark-delete gap problem is transferred, not eliminated. > > The auxiliary cursor would need to individually ack all non-hot-key messages (keeping only hot-key messages unacked). This means `individualDeletedMessages` on the auxiliary cursor would grow rapidly — if 3 out of 50 keys are hot, then 94% of messages need to be individually acked on the auxiliary cursor. This is exactly the same problem, just occurring on a different cursor. There seems to be a misconception here. The additional cursor would be handled on the broker side, and it would acknowledge all messages that don't contain hashes that are diverted. On the broker side, there would obviously need to be state about which hashes are diverted. This state could be stored in a similar way to how individual acks are stored in the managed ledger or the metadata store. When the additional cursor has caught up, it could be deleted. There could be a solution to address high volume traffic since the cursor would never catch up under continuous high volume traffic. > > 2. Storage amplification is far greater than with the Overflow ML. > > The auxiliary cursor prevents ledger GC at the **entire ledger granularity** — as long as the auxiliary cursor's mark-delete hasn't advanced past a ledger, the entire ledger is retained (containing messages for all keys). The Overflow ML, by contrast, only stores hot-key messages. If hot keys account for 5% of traffic: > > * Auxiliary cursor approach: retains 100% of the data > * Overflow ML approach: stores only ~5% additional data Is this really a problem in practice? How much data could the slow consumers actually accumulate in your use case? > 3. Dual-ack implementation is more invasive. > > Every normal message must be acked on **two cursors** (original + auxiliary), requiring changes throughout the dispatch-ack chain. I'm not sure I see the problem here. The required changes are relatively minor. > **Conclusion**: This approach cannot claim "no data duplication" — it trades broader ledger retention for not copying, and is more invasive to implement than Overflow. That said, the starting point (avoiding data duplication) is reasonable — it's just that the overall cost turns out to be higher. I'd respectfully disagree with this conclusion. There's no data duplication in this case. The data duplication actually happens in the currently proposed solution. For ledgers with multiple subscriptions, the proposed solution would duplicate data for all hot/slow keys. In this alternative, the only tradeoff is retention for as long as there are slow keys, and that's most likely not a real problem in practice. > > **Alternative 2** > > 1. Normal Read compression still exists. > > `getMaxEntriesReadLimit() = max(limit - replaySize, 1)`. Even with a limit of 1M, once replay grows to 500K, Normal Read batches are compressed to 500K. Over time it will still fill up — just taking tens of minutes instead of 2.7 minutes. This treats the symptom, not the cause. I'd see this differently. As long as the individual ack solution can scale to handle a peak, it's sufficient. I believe that the current limit for the compressed size of stored individual acks is about 5MB. A roaring bitmap (`managedLedgerPersistIndividualAckAsLongArray=true`, `managedLedgerUnackedRangesOpenCacheSetEnabled=true`) doesn't compress much further since it's already a compressed data structure. For the roaring bitmap serialization solution for individual acks, the worst-case scenario is one where every single message is acked. However, this is not a likely scenario. If this isn't sufficient, the consumer side should be improved. One possible reason could be that the consumer is handling messages serially, and a slow key could result in head-of-line blocking of further keys. In that case, the individual consumer should be vertically scaled with more CPU and memory to handle its workload, alongside using asynchronous message processing or MessageListener with Java 21 virtual threads to handle more keys in parallel for I/O-intensive workloads. Here's an example of how to achieve high parallelism (concurrency) using Pulsar MessageListener and Java 21 virtual threads: https://github.com/lhotari/dss25-mastering-key-ordered-demo/blob/740dd8d1b350b8201266fb62d0456f44806299ba/pulsar-listener/src/main/java/com/github/lhotari/dss25/listener/PulsarListenerApp.java#L145-L188. (inspired by @pdolif's work and implementation). This is especially suitable for workloads where the consumer handles processing through external services, e.g. calling REST APIs. In that case, the processing itself isn't compute-intensive. ([explained in my DSS 25 presentation)](https://youtu.be/AkSGvYP4r88?si=yro2ekDoA5eDKzWY&t=1370) There's a complete demonstration in https://github.com/lhotari/dss25-mastering-key-ordered-demo (it wasn't presented in DSS25 due to time constraints). The demonstration doesn't currently include a simulation of slow keys and that could be added. > > 2. The hidden cost of "the simplest solution" is underestimated. > > Scaling `individualDeletedMessages` to the millions involves: > > * Serialization/deserialization overhead for the RangeSet — every cursor persistence operation must handle millions of ranges (whether written to the cursor ledger in BK or the metadata store) > * Broker restart recovery time scanning millions of ack holes from mark-delete It's true that this could happen. For use cases where this is a challenge, the load balancer should be configured in ways that avoid moving the topic to another node. Broker restarts aren't a common scenario and typically only happen during cluster upgrades. Broker nodes should be sufficiently overprovisioned so that they can handle the load, and backlogs can be caught up in such scenarios. > 3. Backlog management "not changing" is actually a disadvantage. > > The backlog would include a large volume of stuck-key messages that are known to be unconsumable, misleading operational judgment. The Overflow approach isolates them so the backlog reflects only genuinely consumable messages — which is actually more accurate semantics. > > **Conclusion**: This approach is useful for mild scenarios (e.g., brief consumer restarts), but is ineffective for sustained hot keys. There are already stats for how many keys are backlogged as slow keys — it's currently the size of the replay queue. Since pending acks are kept in memory, we could also add a separate REST API to gain more details about the actual keys that are part of the replay queue, including the message keys and the cardinality for each consumer. > > **Alternative 3** > > This approach has the most fundamental contradiction — a stuck consumer cannot rescue itself. > > Client transparency is violated: all Pulsar client libraries (Java, C++, Python, Go, Node.js…) would need to implement hot-key detection + routing logic. This is a massive cross-language engineering effort and is invasive to users. > > **Conclusion**: This is the weakest alternative. It pushes a broker-side scheduling problem to the client, yet the client is precisely the victim of the problem (stuck consumer) and cannot rescue itself. I agree that this isn't easy to implement on the application side without support in the Pulsar client and broker. There are similarities to the problem I described in my DSS25 presentation, Challenge #2: Error handling in key-ordered message processing (https://youtu.be/AkSGvYP4r88?si=E8YwCCHp0t2UQO7G&t=478). Sending failed messages to a dead letter queue results in out-of-order message processing since messages are skipped. A similar problem is already present with negative-acks and DLQ handling for Key_Shared. This isn't an easy problem to solve. The solution for negative-acks is slightly simpler. One possible solution is to introduce client-side and broker-side changes to pause sending of hashes for a specific key, so that negative-ack usage wouldn't break ordering guarantees. The pause-sending-of-hashes solution could be extended to re-assign hashes of all other keys to another consumer with less load when a single consumer is occupied processing hot keys. The benefit of this would be that other keys in the consumer's hash range wouldn't cause ack holes when the hot key eventually causes head-of-line blocking once permits are exhausted. Thanks again for engaging deeply with this. I really do hope we can validate Alternative 2 properly before committing to PIP-474, since simpler solutions tend to have fewer surprises down the road.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
