PavelZeger opened a new issue, #1504:
URL: https://github.com/apache/pulsar-client-go/issues/1504

   ## Gap
   
   Java exposes:
   ```java
   void pause();
   void resume();
   ```
   on every `Consumer`. While paused, the consumer stops issuing flow permits 
to the broker, so no new messages are delivered. Already-delivered (in-flight) 
messages are unaffected.
   The Go client has no equivalent. The closest workaround is closing and 
re-subscribing, which is heavyweight (re-resolves the topic, redelivers any 
unacked, may rebalance other consumers in `KeyShared`/`Failover`).
   
   ## Why it's important
   
   Pause/resume is the standard pattern for backpressure when downstream is 
temporarily unavailable: a database is failing over, a feature flag is in the 
"halt ingestion" state, etc. It is also useful for orderly shutdown - "stop 
accepting new work, finish what's in flight, then close."
   
   ## Proposed Go API
   
   ```go
   type Consumer interface {
     // ... existing methods ...
   
     // Pause stops the consumer from requesting more messages from the broker.
     // In-flight messages already delivered to this consumer remain available
     // via Receive() / Chan(); ack/nack still work. Idempotent.
     Pause()
   
     // Resume reverses Pause. Idempotent.
     Resume()
   }
   ```
   
   A small accessor is also useful for tests and for users who want to expose 
the state via metrics:
   
   ```go
   // IsPaused reports whether Pause() has been called more recently than 
Resume().
   IsPaused() bool
   ```
   
   (Java doesn't expose an `isPaused` getter, but it's cheap and harmless and 
helps reduce flaky tests.)
   
   ## Implementation
   
   The Go consumer maintains an `availablePermits` counter on each 
`partitionConsumer` that gets refilled when the application drains the queue. 
Pausing means: stop sending `CommandFlow` until resumed.
   
   ```go
   // partitionConsumer fields:
   paused atomic.Bool
   
   // In the place that decides whether to send a flow update:
   if pc.paused.Load() {
     return
   }
   
   // Public API (consumer_impl.go):
   func (c *consumer) Pause() {
     for _, pc := range c.consumers {
         pc.paused.Store(true)
     }
   }
   
   func (c *consumer) Resume() {
     for _, pc := range c.consumers {
         if pc.paused.CompareAndSwap(true, false) {
             pc.requestMorePermits() // a small helper that re-checks thresholds
         }
     }
   }
   ```
   
   Pause must be applied per partition consumer. The multi-topic and regex 
consumers must propagate to all underlying partition consumers.
   
   ## Edge cases
   
   - Pausing right before a reconnect: when the partition consumer reconnects, 
it must re-check `paused` and not blindly send a fresh flow command.
   - `KeyShared` semantics: pausing a single consumer in a key-shared 
subscription will cause its keys to be redistributed to other consumers 
**only** if it disconnects. Pause does not change membership, so it just builds 
a backlog on that key range. This should be documented as well.
   - Interaction with `BatchReceive` (suggestion 01): a paused consumer should 
return an empty batch on timeout rather than blocking forever.
   
   ## References
   
   - Java: `Consumer.pause()` / `Consumer.resume()` in 
`pulsar-client-api/.../api/Consumer.java`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to