This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 5679f1f [Fix] Fix the dispatcher() stuck caused by availablePermitsCh
(#875)
5679f1f is described below
commit 5679f1f7b3840184b8947b5b8c8170fec181eecc
Author: Jiaqi Shen <[email protected]>
AuthorDate: Mon Oct 31 17:14:54 2022 +0800
[Fix] Fix the dispatcher() stuck caused by availablePermitsCh (#875)
### Motivation
The `availablePermitsCh` may cause the dispatcher stuck.
https://github.com/apache/pulsar-client-go/blob/0b0720ab73d7f6378b8b6ac37acbafe547c268c8/pulsar/consumer_partition.go#L1096-L1109
For example, if `messageCh <- nextMessage` continueously selected 10 times,
the `availablePermitsCh` will be filled. The next `messageCh <- nextMessage`
will be stuck forever because `pr := <-pc.availablePermitsCh` can never be
reached.
### Modifications
Remove the `pc.availablePermitsCh` from dispatcher.
---
pulsar/consumer_partition.go | 84 +++++++++++++++++++++-----------------------
1 file changed, 40 insertions(+), 44 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index ebaa48b..1ade1e2 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -25,6 +25,7 @@ import (
"math"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/gogo/protobuf/proto"
@@ -83,15 +84,6 @@ const (
noMessageEntry = -1
)
-type permitsReq int32
-
-const (
- // reset the availablePermits of pc
- permitsReset permitsReq = iota
- // increase the availablePermits
- permitsInc
-)
-
type partitionConsumerOpts struct {
topic string
consumerName string
@@ -141,8 +133,7 @@ type partitionConsumer struct {
messageCh chan ConsumerMessage
// the number of message slots available
- availablePermits int32
- availablePermitsCh chan permitsReq
+ availablePermits *availablePermits
// the size of the queue channel for buffering messages
queueSize int32
@@ -170,6 +161,37 @@ type partitionConsumer struct {
unAckChunksTracker *unAckChunksTracker
}
+type availablePermits struct {
+ permits int32
+ pc *partitionConsumer
+}
+
+func (p *availablePermits) inc() {
+ // atomic add availablePermits
+ ap := atomic.AddInt32(&p.permits, 1)
+
+ // TODO implement a better flow controller
+ // send more permits if needed
+ flowThreshold := int32(math.Max(float64(p.pc.queueSize/2), 1))
+ if ap >= flowThreshold {
+ availablePermits := ap
+ requestedPermits := ap
+ // check if permits changed
+ if !atomic.CompareAndSwapInt32(&p.permits, ap, 0) {
+ return
+ }
+
+ p.pc.log.Debugf("requesting more permits=%d available=%d",
requestedPermits, availablePermits)
+ if err := p.pc.internalFlow(uint32(requestedPermits)); err !=
nil {
+ p.pc.log.WithError(err).Error("unable to send permits")
+ }
+ }
+}
+
+func (p *availablePermits) reset() {
+ atomic.StoreInt32(&p.permits, 0)
+}
+
type schemaInfoCache struct {
lock sync.RWMutex
cache map[string]Schema
@@ -241,8 +263,8 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
dlq: dlq,
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
- availablePermitsCh: make(chan permitsReq, 10),
}
+ pc.availablePermits = &availablePermits{pc: pc}
pc.chunkedMsgCtxMap =
newChunkedMsgCtxMap(options.maxPendingChunkedMessage, pc)
pc.unAckChunksTracker = newUnAckChunksTracker(pc)
pc.setConsumerState(consumerInit)
@@ -931,14 +953,14 @@ func (pc *partitionConsumer)
processMessageChunk(compressedPayload internal.Buff
"Received unexpected chunk messageId %s, last-chunk-id
%d, chunkId = %d, total-chunks %d",
msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
pc.chunkedMsgCtxMap.remove(uuid)
- pc.availablePermitsCh <- permitsInc
+ pc.availablePermits.inc()
return nil
}
ctx.append(chunkID, msgID, compressedPayload)
if msgMeta.GetChunkId() != msgMeta.GetNumChunksFromMsg()-1 {
- pc.availablePermitsCh <- permitsInc
+ pc.availablePermits.inc()
return nil
}
@@ -1075,7 +1097,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil
// reset available permits
- pc.availablePermitsCh <- permitsReset
+ pc.availablePermits.reset()
initialPermits := uint32(pc.queueSize)
pc.log.Debugf("dispatcher requesting initial
permits=%d", initialPermits)
@@ -1098,15 +1120,7 @@ func (pc *partitionConsumer) dispatcher() {
messages[0] = nil
messages = messages[1:]
- pc.availablePermitsCh <- permitsInc
-
- case pr := <-pc.availablePermitsCh:
- switch pr {
- case permitsInc:
- pc.increasePermitsAndRequestMoreIfNeed()
- case permitsReset:
- pc.availablePermits = 0
- }
+ pc.availablePermits.inc()
case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by
sending a
@@ -1136,7 +1150,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil
// reset available permits
- pc.availablePermitsCh <- permitsReset
+ pc.availablePermits.reset()
initialPermits := uint32(pc.queueSize)
pc.log.Debugf("dispatcher requesting initial
permits=%d", initialPermits)
@@ -1576,25 +1590,7 @@ func (pc *partitionConsumer)
discardCorruptedMessage(msgID *pb.MessageIdData,
if err != nil {
pc.log.Error("Connection was closed when request ack cmd")
}
- pc.availablePermitsCh <- permitsInc
-}
-
-func (pc *partitionConsumer) increasePermitsAndRequestMoreIfNeed() {
- // TODO implement a better flow controller
- // send more permits if needed
- flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
- pc.availablePermits++
- ap := pc.availablePermits
- if ap >= flowThreshold {
- availablePermits := ap
- requestedPermits := ap
- pc.availablePermitsCh <- permitsReset
-
- pc.log.Debugf("requesting more permits=%d available=%d",
requestedPermits, availablePermits)
- if err := pc.internalFlow(uint32(requestedPermits)); err != nil
{
- pc.log.WithError(err).Error("unable to send permits")
- }
- }
+ pc.availablePermits.inc()
}
// _setConn sets the internal connection field of this partition consumer
atomically.