PavelZeger opened a new issue, #1504:
URL: https://github.com/apache/pulsar-client-go/issues/1504
## Gap
Java exposes:
```java
void pause();
void resume();
```
on every `Consumer`. While paused, the consumer stops issuing flow permits
to the broker, so no new messages are delivered. Already-delivered (in-flight)
messages are unaffected.
The Go client has no equivalent. The closest workaround is closing and
re-subscribing, which is heavyweight (re-resolves the topic, redelivers any
unacked, may rebalance other consumers in `KeyShared`/`Failover`).
## Why it's important
Pause/resume is the standard pattern for backpressure when downstream is
temporarily unavailable: a database is failing over, a feature flag is in the
"halt ingestion" state, etc. It is also useful for orderly shutdown - "stop
accepting new work, finish what's in flight, then close."
## Proposed Go API
```go
type Consumer interface {
// ... existing methods ...
// Pause stops the consumer from requesting more messages from the broker.
// In-flight messages already delivered to this consumer remain available
// via Receive() / Chan(); ack/nack still work. Idempotent.
Pause()
// Resume reverses Pause. Idempotent.
Resume()
}
```
A small accessor is also useful for tests and for users who want to expose
the state via metrics:
```go
// IsPaused reports whether Pause() has been called more recently than
Resume().
IsPaused() bool
```
(Java doesn't expose an `isPaused` getter, but it's cheap and harmless and
helps reduce flaky tests.)
## Implementation
The Go consumer maintains an `availablePermits` counter on each
`partitionConsumer` that gets refilled when the application drains the queue.
Pausing means: stop sending `CommandFlow` until resumed.
```go
// partitionConsumer fields:
paused atomic.Bool
// In the place that decides whether to send a flow update:
if pc.paused.Load() {
return
}
// Public API (consumer_impl.go):
func (c *consumer) Pause() {
for _, pc := range c.consumers {
pc.paused.Store(true)
}
}
func (c *consumer) Resume() {
for _, pc := range c.consumers {
if pc.paused.CompareAndSwap(true, false) {
pc.requestMorePermits() // a small helper that re-checks thresholds
}
}
}
```
Pause must be applied per partition consumer. The multi-topic and regex
consumers must propagate to all underlying partition consumers.
## Edge cases
- Pausing right before a reconnect: when the partition consumer reconnects,
it must re-check `paused` and not blindly send a fresh flow command.
- `KeyShared` semantics: pausing a single consumer in a key-shared
subscription will cause its keys to be redistributed to other consumers
**only** if it disconnects. Pause does not change membership, so it just builds
a backlog on that key range. This should be documented as well.
- Interaction with `BatchReceive` (suggestion 01): a paused consumer should
return an empty batch on timeout rather than blocking forever.
## References
- Java: `Consumer.pause()` / `Consumer.resume()` in
`pulsar-client-api/.../api/Consumer.java`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]