This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.12.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 9fdefe2bbd2ecdd7c9f853580e829cd66f22b78e
Author: Jinjun Pan <75996911+pansz...@users.noreply.github.com>
AuthorDate: Fri Feb 23 21:47:37 2024 +0800

    [Fix] Fix available permits in MessageReceived (#1181)
    
    Fixes #1180
    
    ### Motivation
    In the `MessageReceived`, the number of skipped messages should be 
increased to available permits to avoid skipped permits leading flow request 
not be sent.
    ---------
    
    Co-authored-by: panjinjun <1619-panjin...@users.noreply.git.sysop.bigo.sg>
    (cherry picked from commit 5d258272cb83444fe156dcbb57cbf8f2d475a50b)
---
 pulsar/consumer_partition.go | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 95b5bc09..3572a522 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1093,7 +1093,10 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
        pc.metrics.MessagesReceived.Add(float64(numMsgs))
        pc.metrics.PrefetchedMessages.Add(float64(numMsgs))
 
-       var bytesReceived int
+       var (
+               bytesReceived   int
+               skippedMessages int32
+       )
        for i := 0; i < numMsgs; i++ {
                smm, payload, err := reader.ReadMessage()
                if err != nil || payload == nil {
@@ -1102,6 +1105,7 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                }
                if ackSet != nil && !ackSet.Test(uint(i)) {
                        pc.log.Debugf("Ignoring message from %vth message, 
which has been acknowledged", i)
+                       skippedMessages++
                        continue
                }
 
@@ -1120,6 +1124,7 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
 
                if pc.messageShouldBeDiscarded(trackingMsgID) {
                        pc.AckID(trackingMsgID)
+                       skippedMessages++
                        continue
                }
 
@@ -1144,6 +1149,7 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                }
 
                if pc.ackGroupingTracker.isDuplicate(msgID) {
+                       skippedMessages++
                        continue
                }
 
@@ -1218,6 +1224,10 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                pc.markScaleIfNeed()
        }
 
+       if skippedMessages > 0 {
+               pc.availablePermits.add(skippedMessages)
+       }
+
        // send messages to the dispatcher
        pc.queueCh <- messages
        return nil

Reply via email to