PavelZeger opened a new pull request, #1507:
URL: https://github.com/apache/pulsar-client-go/pull/1507
Fixes #1504
### Motivation
The Java client exposes `pause()` / `resume()` on every `Consumer`; the Go
client has no equivalent. While paused, a consumer should stop issuing flow
permits to the broker so no new
messages are delivered, while already-buffered, in-flight, and
previously-granted messages remain available via `Receive()` / `Chan()` and
ack/nack keep working.
Pause/resume is the standard backpressure pattern for when a downstream is
temporarily unavailable (database failover, a "halt ingestion" feature flag,
orderly shutdown that finishes in-flight work before closing). The only
workaround today is closing and re-subscribing, which is heavyweight
(re-resolves the topic, redelivers unacked messages, and can rebalance other
consumers in `KeyShared`/`Failover`).
The semantics were reviewed and agreed in #1504.
### Modifications
- Added `Pause()`, `Resume()`, and `Paused()` to the `Consumer` interface
`pulsar/consumer.go`), with GoDoc matching the Java semantics.
- `partitionConsumer` (`pulsar/consumer_partition.go`):
- Added a `paused` flag and gated permit flow in `flowIfNeed` — the single
place that returns permits to the broker.
- When paused, the dispatcher connect/reconnect branch **stashes** the
initial grant into `availablePermits` instead of sending it, so reconnecting
while paused does not resume delivery.
- `resume()` flushes only the permits actually owed (via `flowIfNeed`),
mirroring Java's `increaseAvailablePermits(cnx, 0)`, rather than granting a
fresh full batch (which would over-grant after a mid-stream pause).
- Wired the API through all four `Consumer` implementations: `consumer`,
`zeroQueueConsumer`, `multiTopicConsumer`, and `regexConsumer`. `Paused()` is
reported at the top level ("`Pause()` called more recently than `Resume()`").
- Propagated the paused state to dynamically created children: new
partitions (`internalTopicSubscribeToPartitions`) and newly discovered regex
topics (`regexConsumer.subscribe`) inherit the parent's paused state at
creation.
- For the zero-queue consumer, `Receive()` is unchanged: the existing permit
is gated while paused, so `Receive(ctx)` blocks until `Resume()` (or the
context is cancelled).
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
- Added `TestConsumerPauseResume`, `TestPartitionedConsumerPauseResume`,
`TestZeroQueueConsumerPauseResume`, and
`TestRegexConsumerPauseResumeInheritsNewTopics` in `pulsar/consumer_test.go`.
They verify that a paused consumer stops delivery before draining the full
backlog, that `Resume()` delivers the remainder with no message loss, that
`Pause()`/`Resume()` are idempotent, that `Paused()` reflects state, that a
newly discovered regex topic inherits the paused state, and that a zero-queue
`Receive()` blocks while paused and returns after resume.
- Verified locally against Apache Pulsar 4.0.3 standalone (all four tests
pass, repeated runs stable);
### Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API: yes — adds `Pause()`, `Resume()`, and `Paused()` to the
`Consumer` interface
- The schema: no
- The default values of configurations: no
- The wire protocol: no (uses the existing `CommandFlow`; pausing simply
withholds it)
### Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? GoDocs (doc comments on the new
`Consumer` interface methods)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]