lhotari commented on PR #25706:
URL: https://github.com/apache/pulsar/pull/25706#issuecomment-4396605560

   Thanks for the detailed response! I really appreciate you taking the time to 
think through the alternatives. Let me share some thoughts in return.
   
   I'd suggest validating Alternative 2 thoroughly before deciding to implement 
PIP-474. The reason is that Alternative 2 is the simplest approach, and by 
following this path, we avoid the new failure modes that the proposed PIP-474 
implementation introduces.
   
   I'd also like to share some comments on your response. Rather than pushing 
back on the alternatives, I think it would be more useful to genuinely put 
effort into comparing them.
   
   > **Alternative 1**
   > 
   > 1. The mark-delete gap problem is transferred, not eliminated.
   > 
   > The auxiliary cursor would need to individually ack all non-hot-key 
messages (keeping only hot-key messages unacked). This means 
`individualDeletedMessages` on the auxiliary cursor would grow rapidly — if 3 
out of 50 keys are hot, then 94% of messages need to be individually acked on 
the auxiliary cursor. This is exactly the same problem, just occurring on a 
different cursor.
   
   There seems to be a misconception here. The additional cursor would be 
handled on the broker side, and it would acknowledge all messages that don't 
contain hashes that are diverted. On the broker side, there would obviously 
need to be state about which hashes are diverted. This state could be stored in 
a similar way to how individual acks are stored in the managed ledger or the 
metadata store. When the additional cursor has caught up, it could be deleted. 
There could be a solution to address high volume traffic since the cursor would 
never catch up under continuous high volume traffic.
   
   > 
   > 2. Storage amplification is far greater than with the Overflow ML.
   > 
   > The auxiliary cursor prevents ledger GC at the **entire ledger 
granularity** — as long as the auxiliary cursor's mark-delete hasn't advanced 
past a ledger, the entire ledger is retained (containing messages for all 
keys). The Overflow ML, by contrast, only stores hot-key messages. If hot keys 
account for 5% of traffic:
   > 
   > * Auxiliary cursor approach: retains 100% of the data
   > * Overflow ML approach: stores only ~5% additional data
   
   Is this really a problem in practice? How much data could the slow consumers 
actually accumulate in your use case?
   
   > 3. Dual-ack implementation is more invasive.
   > 
   > Every normal message must be acked on **two cursors** (original + 
auxiliary), requiring changes throughout the dispatch-ack chain.
   
   I'm not sure I see the problem here. The required changes are relatively 
minor.
   
   > **Conclusion**: This approach cannot claim "no data duplication" — it 
trades broader ledger retention for not copying, and is more invasive to 
implement than Overflow. That said, the starting point (avoiding data 
duplication) is reasonable — it's just that the overall cost turns out to be 
higher.
   
   I'd respectfully disagree with this conclusion. There's no data duplication 
in this case. The data duplication actually happens in the currently proposed 
solution. For ledgers with multiple subscriptions, the proposed solution would 
duplicate data for all hot/slow keys. In this alternative, the only tradeoff is 
retention for as long as there are slow keys, and that's most likely not a real 
problem in practice.
   
   > 
   > **Alternative 2**
   > 
   > 1. Normal Read compression still exists.
   > 
   > `getMaxEntriesReadLimit() = max(limit - replaySize, 1)`. Even with a limit 
of 1M, once replay grows to 500K, Normal Read batches are compressed to 500K. 
Over time it will still fill up — just taking tens of minutes instead of 2.7 
minutes. This treats the symptom, not the cause.
   
   I'd see this differently. As long as the individual ack solution can scale 
to handle a peak, it's sufficient.
   
   I believe that the current limit for the compressed size of stored 
individual acks is about 5MB. A roaring bitmap 
(`managedLedgerPersistIndividualAckAsLongArray=true`, 
`managedLedgerUnackedRangesOpenCacheSetEnabled=true`) doesn't compress much 
further since it's already a compressed data structure. For the roaring bitmap 
serialization solution for individual acks, the worst-case scenario is one 
where every single message is acked. However, this is not a likely scenario.
   
   If this isn't sufficient, the consumer side should be improved. One possible 
reason could be that the consumer is handling messages serially, and a slow key 
could result in head-of-line blocking of further keys. In that case, the 
individual consumer should be vertically scaled with more CPU and memory to 
handle its workload, alongside using asynchronous message processing or 
MessageListener with Java 21 virtual threads to handle more keys in parallel 
for I/O-intensive workloads.
   
   Here's an example of how to achieve high parallelism (concurrency) using 
Pulsar MessageListener and Java 21 virtual threads: 
https://github.com/lhotari/dss25-mastering-key-ordered-demo/blob/740dd8d1b350b8201266fb62d0456f44806299ba/pulsar-listener/src/main/java/com/github/lhotari/dss25/listener/PulsarListenerApp.java#L145-L188.
 (inspired by @pdolif's work and implementation). This is especially suitable 
for workloads where the consumer handles processing through external services, 
e.g. calling REST APIs. In that case, the processing itself isn't 
compute-intensive. ([explained in my DSS 25 
presentation)](https://youtu.be/AkSGvYP4r88?si=yro2ekDoA5eDKzWY&t=1370)
   There's a complete demonstration in 
https://github.com/lhotari/dss25-mastering-key-ordered-demo (it wasn't 
presented in DSS25 due to time constraints). The demonstration doesn't 
currently include a simulation of slow keys and that could be added. 
   
   > 
   > 2. The hidden cost of "the simplest solution" is underestimated.
   > 
   > Scaling `individualDeletedMessages` to the millions involves:
   > 
   > * Serialization/deserialization overhead for the RangeSet — every cursor 
persistence operation must handle millions of ranges (whether written to the 
cursor ledger in BK or the metadata store)
   > * Broker restart recovery time scanning millions of ack holes from 
mark-delete
   
   It's true that this could happen. For use cases where this is a challenge, 
the load balancer should be configured in ways that avoid moving the topic to 
another node. Broker restarts aren't a common scenario and typically only 
happen during cluster upgrades. Broker nodes should be sufficiently 
overprovisioned so that they can handle the load, and backlogs can be caught up 
in such scenarios.
   
   > 3. Backlog management "not changing" is actually a disadvantage.
   > 
   > The backlog would include a large volume of stuck-key messages that are 
known to be unconsumable, misleading operational judgment. The Overflow 
approach isolates them so the backlog reflects only genuinely consumable 
messages — which is actually more accurate semantics.
   > 
   > **Conclusion**: This approach is useful for mild scenarios (e.g., brief 
consumer restarts), but is ineffective for sustained hot keys.
   
   There are already stats for how many keys are backlogged as slow keys — it's 
currently the size of the replay queue. Since pending acks are kept in memory, 
we could also add a separate REST API to gain more details about the actual 
keys that are part of the replay queue, including the message keys and the 
cardinality for each consumer.
   
   > 
   > **Alternative 3**
   > 
   > This approach has the most fundamental contradiction — a stuck consumer 
cannot rescue itself.
   > 
   > Client transparency is violated: all Pulsar client libraries (Java, C++, 
Python, Go, Node.js…) would need to implement hot-key detection + routing 
logic. This is a massive cross-language engineering effort and is invasive to 
users.
   > 
   > **Conclusion**: This is the weakest alternative. It pushes a broker-side 
scheduling problem to the client, yet the client is precisely the victim of the 
problem (stuck consumer) and cannot rescue itself.
   
   I agree that this isn't easy to implement on the application side without 
support in the Pulsar client and broker. There are similarities to the problem 
I described in my DSS25 presentation, Challenge #2: Error handling in 
key-ordered message processing 
(https://youtu.be/AkSGvYP4r88?si=E8YwCCHp0t2UQO7G&t=478). Sending failed 
messages to a dead letter queue results in out-of-order message processing 
since messages are skipped. A similar problem is already present with 
negative-acks and DLQ handling for Key_Shared. This isn't an easy problem to 
solve.
   
   The solution for negative-acks is slightly simpler. One possible solution is 
to introduce client-side and broker-side changes to pause sending of hashes for 
a specific key, so that negative-ack usage wouldn't break ordering guarantees.
   
   The pause-sending-of-hashes solution could be extended to re-assign hashes 
of all other keys to another consumer with less load when a single consumer is 
occupied processing hot keys. The benefit of this would be that other keys in 
the consumer's hash range wouldn't cause ack holes when the hot key eventually 
causes head-of-line blocking once permits are exhausted.
   
   Thanks again for engaging deeply with this. I really do hope we can validate 
Alternative 2 properly before committing to PIP-474, since simpler solutions 
tend to have fewer surprises down the road..
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to