PavelZeger commented on issue #1504:
URL: 
https://github.com/apache/pulsar-client-go/issues/1504#issuecomment-4524921989

   Thanks @nodece  and @crossoverJie. Let me go through them and pin down the 
semantics before I open the PR.
   
   **1. `Pause()` semantics**
   
   Agreed. I'll reword the API doc to match Java: pausing only stops sending 
CommandFlow. Locally buffered messages, in-flight messages, and previously 
granted permits are all unaffected and still come out of `Receive()` / `Chan()`.
   
   **2. Reconnect must respect paused**
   
   Right - the dispatcher sends initial permits on every new connection 
(`https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_partition.go#L1872-L1885`).
 I'll put the paused check in `flowIfNeed` and at the reconnect branch so every 
flow path - initial connect, reconnect, dispatcher refill, ack refill - goes 
through the same gate:
   
   ```go
   // partitionConsumer
   paused atomic.Bool
   
   // flowIfNeed (consumer_partition.go:308)
   func (p *availablePermits) flowIfNeed() {
       if p.pc.paused.Load() {
           return
       }
       // ... existing threshold logic ...
   }
   
   // dispatcher reconnect branch (consumer_partition.go:1882)
   if !pc.paused.Load() {
       if err := pc.internalFlow(initialPermits); err != nil && 
!pc.options.enableZeroQueueConsumer {
           pc.log.WithError(err).Error("unable to send initial permits to 
broker")
       }
   }
   ```
   A `Pause()` before a reconnect means no flow command goes out on the new 
connection. `Resume()` then sends a fresh batch sized to `currentQueueSize`.
   
   **3. Zero-queue consumer**
   
   Good point - `Receive()` itself triggers the one permit 
(`https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_zero_queue.go#L132`).
 When paused, `Receive(ctx)` will block
   until `Resume() `is called or `ctx` is cancelled. I'll use a `resumedCh` 
that `Pause()` creates and `Resume()` closes:
   
   ```go
   // zeroQueueConsumer
   func (z *zeroQueueConsumer) Receive(ctx context.Context) (Message, error) {
       // ... existing state/lock checks ...
       if z.pc.paused.Load() {
           select {
           case <-z.pc.resumedCh: // closed by Resume()
           case <-ctx.Done():
               return nil, ctx.Err()
           case <-z.closeCh:
               return nil, newError(ConsumerClosed, "consumer closed")
           }
       }
       z.pc.availablePermits.inc()
       // ... existing receive loop ...
   }
   ```
   The partition consumer's paused field stays the single source of truth.
   
   **4. Regex / multi-topic - newly discovered topics inherit paused**
   
   Agreed. Both `regexConsumer` 
(`https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_regex.go`)
 and `multiTopicConsumer`
   
(`https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_multitopic.go`)
 discover topics over time. I'll track paused on the parent and apply it to 
each new child under the same consumersLock already guarding the map:
   
   ```go
   // regexConsumer.subscribe (consumer_regex.go:412)
   func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq 
*retryRouter) {
       consumers := make(map[string]Consumer, len(topics))
       for ce := range subscriber(c.client, topics, c.options, c.messageCh, 
dlq, rlq) {
           if ce.err != nil {
               c.log.Warnf("Failed to subscribe to topic=%s", ce.topic)
               continue
           }
           consumers[ce.topic] = ce.consumer
       }
   
       c.consumersLock.Lock()
       defer c.consumersLock.Unlock()
       for t, child := range consumers {
           if c.paused.Load() {
               child.Pause()
           }
           c.consumers[t] = child
       }
   }
   ```
   Same shape for `multiTopicConsumer`. Dynamic partitions on a single consumer 
go through consumers atomic.Value in `consumer_impl.go` and pick up paused the 
same way.
   
   **5. IsPaused()**
   
   Top-level only, defined as "Pause() called more recently than Resume()". 
Internally just return `c.paused.Load()` on the parent. No attempt to derive it 
from child consumers — that would be racy with discovery.
   
   **6. Subscription-type behavior**
   
   I'll add this to the "Edge cases" section:
   - `Shared`: dispatch moves off the paused consumer once its granted permits 
run out. No reassignment - it stays connected.
   - `KeyShared`: pause does not reassign keys. The paused consumer's keys 
build up in the broker backlog until `Resume()`.
   - `Failover`: pause does not trigger failover. The consumer stays active; 
backlog grows until resumed. 
   - `Exclusive`: backlog grows until resumed.
   
   ---
   
   PIP updates I can make based on this exchange:
   - Reword `Pause()` docstring to include "previously granted permits"
   - Add a zero-queue subsection under "Edge cases"
   - Make the `regex/multi-topic` rule explicit for dynamic discovery, not just 
existing children
   - Move paused to the top-level consumer struct (the current sketch only puts 
it on `partitionConsumer`)
   - Expand the subscription-type notes beyond `KeyShared` as above
   
   One more thing not raised in the review but worth flagging for the PR: 
`BatchReceive` interaction. A paused consumer should return an empty batch on 
the configured timeout rather than blocking forever. I'll cover this in the 
implementation but it doesn't need to change the API.
   
   ---
   I'll open the PR after your confirmation of the proposed and discussed 
changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to