PavelZeger commented on issue #1504: URL: https://github.com/apache/pulsar-client-go/issues/1504#issuecomment-4524921989
Thanks @nodece and @crossoverJie. Let me go through them and pin down the semantics before I open the PR. **1. `Pause()` semantics** Agreed. I'll reword the API doc to match Java: pausing only stops sending CommandFlow. Locally buffered messages, in-flight messages, and previously granted permits are all unaffected and still come out of `Receive()` / `Chan()`. **2. Reconnect must respect paused** Right - the dispatcher sends initial permits on every new connection (`https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L1872-L1885`). I'll put the paused check in `flowIfNeed` and at the reconnect branch so every flow path - initial connect, reconnect, dispatcher refill, ack refill - goes through the same gate: ```go // partitionConsumer paused atomic.Bool // flowIfNeed (consumer_partition.go:308) func (p *availablePermits) flowIfNeed() { if p.pc.paused.Load() { return } // ... existing threshold logic ... } // dispatcher reconnect branch (consumer_partition.go:1882) if !pc.paused.Load() { if err := pc.internalFlow(initialPermits); err != nil && !pc.options.enableZeroQueueConsumer { pc.log.WithError(err).Error("unable to send initial permits to broker") } } ``` A `Pause()` before a reconnect means no flow command goes out on the new connection. `Resume()` then sends a fresh batch sized to `currentQueueSize`. **3. Zero-queue consumer** Good point - `Receive()` itself triggers the one permit (`https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_zero_queue.go#L132`). When paused, `Receive(ctx)` will block until `Resume() `is called or `ctx` is cancelled. I'll use a `resumedCh` that `Pause()` creates and `Resume()` closes: ```go // zeroQueueConsumer func (z *zeroQueueConsumer) Receive(ctx context.Context) (Message, error) { // ... existing state/lock checks ... if z.pc.paused.Load() { select { case <-z.pc.resumedCh: // closed by Resume() case <-ctx.Done(): return nil, ctx.Err() case <-z.closeCh: return nil, newError(ConsumerClosed, "consumer closed") } } z.pc.availablePermits.inc() // ... existing receive loop ... } ``` The partition consumer's paused field stays the single source of truth. **4. Regex / multi-topic - newly discovered topics inherit paused** Agreed. Both `regexConsumer` (`https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_regex.go`) and `multiTopicConsumer` (`https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_multitopic.go`) discover topics over time. I'll track paused on the parent and apply it to each new child under the same consumersLock already guarding the map: ```go // regexConsumer.subscribe (consumer_regex.go:412) func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRouter) { consumers := make(map[string]Consumer, len(topics)) for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq, rlq) { if ce.err != nil { c.log.Warnf("Failed to subscribe to topic=%s", ce.topic) continue } consumers[ce.topic] = ce.consumer } c.consumersLock.Lock() defer c.consumersLock.Unlock() for t, child := range consumers { if c.paused.Load() { child.Pause() } c.consumers[t] = child } } ``` Same shape for `multiTopicConsumer`. Dynamic partitions on a single consumer go through consumers atomic.Value in `consumer_impl.go` and pick up paused the same way. **5. IsPaused()** Top-level only, defined as "Pause() called more recently than Resume()". Internally just return `c.paused.Load()` on the parent. No attempt to derive it from child consumers — that would be racy with discovery. **6. Subscription-type behavior** I'll add this to the "Edge cases" section: - `Shared`: dispatch moves off the paused consumer once its granted permits run out. No reassignment - it stays connected. - `KeyShared`: pause does not reassign keys. The paused consumer's keys build up in the broker backlog until `Resume()`. - `Failover`: pause does not trigger failover. The consumer stays active; backlog grows until resumed. - `Exclusive`: backlog grows until resumed. --- PIP updates I can make based on this exchange: - Reword `Pause()` docstring to include "previously granted permits" - Add a zero-queue subsection under "Edge cases" - Make the `regex/multi-topic` rule explicit for dynamic discovery, not just existing children - Move paused to the top-level consumer struct (the current sketch only puts it on `partitionConsumer`) - Expand the subscription-type notes beyond `KeyShared` as above One more thing not raised in the review but worth flagging for the PR: `BatchReceive` interaction. A paused consumer should return an empty batch on the configured timeout rather than blocking forever. I'll cover this in the implementation but it doesn't need to change the API. --- I'll open the PR after your confirmation of the proposed and discussed changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
