PavelZeger opened a new pull request, #1507:
URL: https://github.com/apache/pulsar-client-go/pull/1507

   Fixes #1504
   
   ### Motivation
   
   The Java client exposes `pause()` / `resume()` on every `Consumer`; the Go 
client has no equivalent. While paused, a consumer should stop issuing flow 
permits to the broker so no new
   messages are delivered, while already-buffered, in-flight, and 
previously-granted messages remain available via `Receive()` / `Chan()` and 
ack/nack keep working.
   
   Pause/resume is the standard backpressure pattern for when a downstream is 
temporarily unavailable (database failover, a "halt ingestion" feature flag, 
orderly shutdown that finishes in-flight work before closing). The only 
workaround today is closing and re-subscribing, which is heavyweight 
(re-resolves the topic, redelivers unacked messages, and can rebalance other 
consumers in `KeyShared`/`Failover`).
   
   The semantics were reviewed and agreed in #1504.
   
   ### Modifications
   
   - Added `Pause()`, `Resume()`, and `Paused()` to the `Consumer` interface 
`pulsar/consumer.go`), with GoDoc matching the Java semantics.
   - `partitionConsumer` (`pulsar/consumer_partition.go`):
   - Added a `paused` flag and gated permit flow in `flowIfNeed` — the single 
place that returns permits to the broker.
   - When paused, the dispatcher connect/reconnect branch **stashes** the 
initial grant into `availablePermits` instead of sending it, so reconnecting 
while paused does not resume delivery.
   - `resume()` flushes only the permits actually owed (via `flowIfNeed`), 
mirroring Java's `increaseAvailablePermits(cnx, 0)`, rather than granting a 
fresh full batch (which would over-grant after a mid-stream pause).
   - Wired the API through all four `Consumer` implementations: `consumer`, 
`zeroQueueConsumer`, `multiTopicConsumer`, and `regexConsumer`. `Paused()` is 
reported at the top level ("`Pause()` called more recently than `Resume()`").
   - Propagated the paused state to dynamically created children: new 
partitions (`internalTopicSubscribeToPartitions`) and newly discovered regex 
topics (`regexConsumer.subscribe`) inherit the parent's paused state at 
creation.
   - For the zero-queue consumer, `Receive()` is unchanged: the existing permit 
is gated while paused, so `Receive(ctx)` blocks until `Resume()` (or the 
context is cancelled).
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   
   - Added `TestConsumerPauseResume`, `TestPartitionedConsumerPauseResume`, 
`TestZeroQueueConsumerPauseResume`, and 
`TestRegexConsumerPauseResumeInheritsNewTopics` in `pulsar/consumer_test.go`. 
They verify that a paused consumer stops delivery before draining the full 
backlog, that `Resume()` delivers the remainder with no message loss, that 
`Pause()`/`Resume()` are idempotent, that `Paused()` reflects state, that a 
newly discovered regex topic inherits the paused state, and that a zero-queue 
`Receive()` blocks while paused and returns after resume.
   - Verified locally against Apache Pulsar 4.0.3 standalone (all four tests 
pass, repeated runs stable);
   
   ### Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API: yes — adds `Pause()`, `Resume()`, and `Paused()` to the 
`Consumer` interface
     - The schema: no
     - The default values of configurations: no
     - The wire protocol: no (uses the existing `CommandFlow`; pausing simply 
withholds it)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? GoDocs (doc comments on the new 
`Consumer` interface methods)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to