This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 5679f1f  [Fix] Fix the dispatcher() stuck caused by availablePermitsCh 
(#875)
5679f1f is described below

commit 5679f1f7b3840184b8947b5b8c8170fec181eecc
Author: Jiaqi Shen <[email protected]>
AuthorDate: Mon Oct 31 17:14:54 2022 +0800

    [Fix] Fix the dispatcher() stuck caused by availablePermitsCh (#875)
    
    ### Motivation
    
    The `availablePermitsCh` may cause the dispatcher stuck.
    
    
https://github.com/apache/pulsar-client-go/blob/0b0720ab73d7f6378b8b6ac37acbafe547c268c8/pulsar/consumer_partition.go#L1096-L1109
    
    For example, if `messageCh <- nextMessage` continueously selected 10 times, 
the `availablePermitsCh` will be filled. The next `messageCh <- nextMessage` 
will be stuck forever because `pr := <-pc.availablePermitsCh` can never be 
reached.
    
    ### Modifications
    
    Remove the `pc.availablePermitsCh` from dispatcher.
---
 pulsar/consumer_partition.go | 84 +++++++++++++++++++++-----------------------
 1 file changed, 40 insertions(+), 44 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index ebaa48b..1ade1e2 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -25,6 +25,7 @@ import (
        "math"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/gogo/protobuf/proto"
@@ -83,15 +84,6 @@ const (
        noMessageEntry = -1
 )
 
-type permitsReq int32
-
-const (
-       // reset the availablePermits of pc
-       permitsReset permitsReq = iota
-       // increase the availablePermits
-       permitsInc
-)
-
 type partitionConsumerOpts struct {
        topic                       string
        consumerName                string
@@ -141,8 +133,7 @@ type partitionConsumer struct {
        messageCh chan ConsumerMessage
 
        // the number of message slots available
-       availablePermits   int32
-       availablePermitsCh chan permitsReq
+       availablePermits *availablePermits
 
        // the size of the queue channel for buffering messages
        queueSize       int32
@@ -170,6 +161,37 @@ type partitionConsumer struct {
        unAckChunksTracker *unAckChunksTracker
 }
 
+type availablePermits struct {
+       permits int32
+       pc      *partitionConsumer
+}
+
+func (p *availablePermits) inc() {
+       // atomic add availablePermits
+       ap := atomic.AddInt32(&p.permits, 1)
+
+       // TODO implement a better flow controller
+       // send more permits if needed
+       flowThreshold := int32(math.Max(float64(p.pc.queueSize/2), 1))
+       if ap >= flowThreshold {
+               availablePermits := ap
+               requestedPermits := ap
+               // check if permits changed
+               if !atomic.CompareAndSwapInt32(&p.permits, ap, 0) {
+                       return
+               }
+
+               p.pc.log.Debugf("requesting more permits=%d available=%d", 
requestedPermits, availablePermits)
+               if err := p.pc.internalFlow(uint32(requestedPermits)); err != 
nil {
+                       p.pc.log.WithError(err).Error("unable to send permits")
+               }
+       }
+}
+
+func (p *availablePermits) reset() {
+       atomic.StoreInt32(&p.permits, 0)
+}
+
 type schemaInfoCache struct {
        lock   sync.RWMutex
        cache  map[string]Schema
@@ -241,8 +263,8 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                dlq:                  dlq,
                metrics:              metrics,
                schemaInfoCache:      newSchemaInfoCache(client, options.topic),
-               availablePermitsCh:   make(chan permitsReq, 10),
        }
+       pc.availablePermits = &availablePermits{pc: pc}
        pc.chunkedMsgCtxMap = 
newChunkedMsgCtxMap(options.maxPendingChunkedMessage, pc)
        pc.unAckChunksTracker = newUnAckChunksTracker(pc)
        pc.setConsumerState(consumerInit)
@@ -931,14 +953,14 @@ func (pc *partitionConsumer) 
processMessageChunk(compressedPayload internal.Buff
                        "Received unexpected chunk messageId %s, last-chunk-id 
%d, chunkId = %d, total-chunks %d",
                        msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
                pc.chunkedMsgCtxMap.remove(uuid)
-               pc.availablePermitsCh <- permitsInc
+               pc.availablePermits.inc()
                return nil
        }
 
        ctx.append(chunkID, msgID, compressedPayload)
 
        if msgMeta.GetChunkId() != msgMeta.GetNumChunksFromMsg()-1 {
-               pc.availablePermitsCh <- permitsInc
+               pc.availablePermits.inc()
                return nil
        }
 
@@ -1075,7 +1097,7 @@ func (pc *partitionConsumer) dispatcher() {
                        messages = nil
 
                        // reset available permits
-                       pc.availablePermitsCh <- permitsReset
+                       pc.availablePermits.reset()
                        initialPermits := uint32(pc.queueSize)
 
                        pc.log.Debugf("dispatcher requesting initial 
permits=%d", initialPermits)
@@ -1098,15 +1120,7 @@ func (pc *partitionConsumer) dispatcher() {
                        messages[0] = nil
                        messages = messages[1:]
 
-                       pc.availablePermitsCh <- permitsInc
-
-               case pr := <-pc.availablePermitsCh:
-                       switch pr {
-                       case permitsInc:
-                               pc.increasePermitsAndRequestMoreIfNeed()
-                       case permitsReset:
-                               pc.availablePermits = 0
-                       }
+                       pc.availablePermits.inc()
 
                case clearQueueCb := <-pc.clearQueueCh:
                        // drain the message queue on any new connection by 
sending a
@@ -1136,7 +1150,7 @@ func (pc *partitionConsumer) dispatcher() {
                        messages = nil
 
                        // reset available permits
-                       pc.availablePermitsCh <- permitsReset
+                       pc.availablePermits.reset()
                        initialPermits := uint32(pc.queueSize)
 
                        pc.log.Debugf("dispatcher requesting initial 
permits=%d", initialPermits)
@@ -1576,25 +1590,7 @@ func (pc *partitionConsumer) 
discardCorruptedMessage(msgID *pb.MessageIdData,
        if err != nil {
                pc.log.Error("Connection was closed when request ack cmd")
        }
-       pc.availablePermitsCh <- permitsInc
-}
-
-func (pc *partitionConsumer) increasePermitsAndRequestMoreIfNeed() {
-       // TODO implement a better flow controller
-       // send more permits if needed
-       flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
-       pc.availablePermits++
-       ap := pc.availablePermits
-       if ap >= flowThreshold {
-               availablePermits := ap
-               requestedPermits := ap
-               pc.availablePermitsCh <- permitsReset
-
-               pc.log.Debugf("requesting more permits=%d available=%d", 
requestedPermits, availablePermits)
-               if err := pc.internalFlow(uint32(requestedPermits)); err != nil 
{
-                       pc.log.WithError(err).Error("unable to send permits")
-               }
-       }
+       pc.availablePermits.inc()
 }
 
 // _setConn sets the internal connection field of this partition consumer 
atomically.

Reply via email to