This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.12.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 9fdefe2bbd2ecdd7c9f853580e829cd66f22b78e Author: Jinjun Pan <75996911+pansz...@users.noreply.github.com> AuthorDate: Fri Feb 23 21:47:37 2024 +0800 [Fix] Fix available permits in MessageReceived (#1181) Fixes #1180 ### Motivation In the `MessageReceived`, the number of skipped messages should be increased to available permits to avoid skipped permits leading flow request not be sent. --------- Co-authored-by: panjinjun <1619-panjin...@users.noreply.git.sysop.bigo.sg> (cherry picked from commit 5d258272cb83444fe156dcbb57cbf8f2d475a50b) --- pulsar/consumer_partition.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 95b5bc09..3572a522 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1093,7 +1093,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.metrics.MessagesReceived.Add(float64(numMsgs)) pc.metrics.PrefetchedMessages.Add(float64(numMsgs)) - var bytesReceived int + var ( + bytesReceived int + skippedMessages int32 + ) for i := 0; i < numMsgs; i++ { smm, payload, err := reader.ReadMessage() if err != nil || payload == nil { @@ -1102,6 +1105,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } if ackSet != nil && !ackSet.Test(uint(i)) { pc.log.Debugf("Ignoring message from %vth message, which has been acknowledged", i) + skippedMessages++ continue } @@ -1120,6 +1124,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header if pc.messageShouldBeDiscarded(trackingMsgID) { pc.AckID(trackingMsgID) + skippedMessages++ continue } @@ -1144,6 +1149,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } if pc.ackGroupingTracker.isDuplicate(msgID) { + skippedMessages++ continue } @@ -1218,6 +1224,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.markScaleIfNeed() } + if skippedMessages > 0 { + pc.availablePermits.add(skippedMessages) + } + // send messages to the dispatcher pc.queueCh <- messages return nil