lhotari opened a new issue, #24926:
URL: https://github.com/apache/pulsar/issues/24926

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [x] I understand that [unsupported 
versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions)
 don't get bug fixes. I will attempt to reproduce the issue on a supported 
version of Pulsar client and Pulsar broker.
   
   
   ### User environment
   
   All released version
   
   ### Issue Description
   
   The persistent dispatcher for multiple consumers in Apache Pulsar does not 
consistently honor Netty channel backpressure. When a consumer's channel 
becomes non-writable, the dispatcher sometimes continues sending one entry at a 
time and sometimes skips dispatch entirely—with no reliable wake-up mechanism 
when the channel becomes writable again. This behavior can result in unbounded 
broker-side buffer accumulation, risking OOM under sustained slow consumers, 
large batches, or network congestion. In cases where dispatching is skipped 
entirely, the consumer may become stuck indefinitely until some other event 
resumes dispatching. This can lead to unnecessary latency when backpressure 
occurs, particularly when there is no continuous flow of new messages to the 
topic.
   
   ### Error messages
   
   ```text
   
   ```
   
   ### Reproducing the issue
   
   These are assumed ways to reproduce the issues.
   
   It should be possible to reproduce the OOME issue on Shared/Key_Shared 
subscription by having a slow client with a large receiver queue size. 
   - When the client stops reading from messages, the dispatching will continue 
each time the dispatcher is waken up.
   - reproducing would require having a backlog and sending more messages to 
the topic to wake up dispatching
   
   The consumer stuck issue should reproduce on Exclusive/Failover subscription 
after the client's connection gets backpressured. There's no logic to restart 
dispatching after dispatching stops due to the writability status. This would 
require that the client isn't able to send the next flow command to wake up 
dispatching and no new messages are added to the topic to wake up dispatching.
   There are existing issue reports which might be related.
   
   ### Additional information
   
   [BrokkAI](https://www.brokk.ai/) assisted analysis:
   
   ## Verified behavior in code (and assumptions where noted)
   
   - Dispatch when non-writable:
     - Observed behavior: dispatch continues with a reduced batch size of 1 
even when the channel is not writable; in other paths it skips dispatch 
entirely.
     - Risk: even “1 at a time” can still overflow buffers for large/batched 
entries or long-lived non-writable channels.
   
   - No event-driven resumption:
     - There’s no linkage from Netty `channelWritabilityChanged` to the 
dispatcher scheduling loop. When dispatch is skipped due to non-writability, 
there is no guaranteed wake-up when the channel becomes writable again.
     - Result: starvation or stalled subscriptions until an unrelated event 
triggers work.
   
   - No consistent transport-level gate before write:
     - The send path that ultimately calls Netty `writeAndFlush` doesn’t 
consistently check `Channel#isWritable()` or block/suspend dispatch work 
per-consumer.
     - Result: data can be pushed into the outbound buffer even when 
unwritable, relying on Netty’s buffering and watermarks alone.
   
   - Pause/resume is not tied to transport backpressure:
     - Dispatcher pause/resume behavior tends to track storage/cursor state 
(e.g., acks, reads) rather than transport backpressure.
     - Result: the broker keeps reading from BookKeeper and queuing for a 
consumer that can’t drain the network.
   
   - Watermarks exist but don’t solve the core issue:
     - Netty write-buffer high/low watermarks are configurable at the channel 
level, but without event-driven throttling in the dispatcher, they mainly bound 
Netty’s own buffer growth. They don’t stop the dispatcher from continuing to 
enqueue work on behalf of an unwritable consumer.
   
   ## Root cause
   
   - Lack of a consistent backpressure bridge from Netty to the dispatcher:
     - The dispatcher does not maintain a per-consumer “transport-blocked” 
state driven by Netty channel writability events.
     - No uniform suspension of dispatch work (cursor reads, read-ahead) when a 
consumer’s channel is non-writable.
     - Mixed fallback behavior (skip vs. send one entry) leads to unpredictable 
buffering and stalls.
   
   ---
   
   ## Impact
   
   - Broker memory pressure and potential OOM:
     - Slow consumers + large entries/batches can saturate outbound buffers, 
accumulate pending sends, and starve GC.
   
   - Throughput instability:
     - For fast consumers, buffering helps; for slow ones, it backfires, making 
end-to-end latency and broker memory unpredictable.
   
   - Operational ambiguity:
     - Without metrics and clear signals, operators may struggle to detect 
which consumers are backpressuring the broker and why dispatch stalls.
   
   ---
   
   ## High-level remediation plan
   
   1. Event-driven transport backpressure
      - Register for Netty channel writability changes and propagate to the 
consumer/dispatcher layer.
      - Maintain a per-consumer “transport-blocked” flag:
        - When unwritable: stop scheduling dispatch to that consumer; prevent 
read-ahead for it (or cap it tightly).
        - When writable: schedule a targeted resume task to drain pending 
messages for that consumer.
   
   2. Gate dispatch by transport writability
      - Before enqueuing network writes for a consumer, check writability.
      - If non-writable, do not push; instead, mark the consumer blocked and 
rely on the event-driven resume above.
   
   3. Tune and expose watermarks
      - Keep Netty channel `WRITE_BUFFER_HIGH_WATER_MARK` and 
`WRITE_BUFFER_LOW_WATER_MARK` configurable per broker.
      - Defaults should be conservative enough to prevent OOM, but allow 
operators to raise them for high-throughput fast consumers.
      - Consider guidance or auto-scaling based on heap size or percent-of-heap 
(educated suggestion; confirm feasibility).
   
   4. Hybrid wake-up (fallback polling)
      - If an event is missed (rare but possible), periodic polling can recheck 
writability for consumers marked blocked and re-arm their resume.
   
   5. Backward compatibility and rollout knobs
      - Feature flag for event-driven backpressure (default on or off depending 
on risk appetite).
      - Per-namespace or per-subscription toggles could be considered but start 
with per-broker configuration.
   
   ---
   
   ## Where to change in code (high level; no implementation here)
   
   - Netty integration layer:
     - Subscribe to `channelWritabilityChanged` and surface it to the consumer 
object associated with that channel.
   
   - Consumer/dispatcher boundary:
     - Add/track a per-consumer transport-blocked state.
     - Ensure dispatcher `readMoreEntries`/`dispatch` paths early-exit for 
blocked consumers.
   
   - Read-ahead and buffering:
     - Bound or pause per-consumer read-ahead while blocked; avoid accumulating 
entry lists destined for unwritable consumers.
   
   - Resume logic:
     - On writable transition, enqueue a resume task that:
       - Drains pending entries with backoff if writability flips again.
       - Avoids unbounded loops; rely on scheduling yields and checks.
   
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to