This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new de2dc05  fix:when msgCh full,can't release the pq.mutex 
lock,removeMessage method will blocked. (#903)
de2dc05 is described below

commit de2dc059378a1f4b893a3c05e3905957cec8972a
Author: Nick <[email protected]>
AuthorDate: Thu Sep 1 19:50:33 2022 +0800

    fix:when msgCh full,can't release the pq.mutex lock,removeMessage method 
will blocked. (#903)
---
 consumer/process_queue.go | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 2d35c07..7bcfa2f 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -96,19 +96,19 @@ func (pq *processQueue) putMessage(messages 
...*primitive.MessageExt) {
        if len(messages) == 0 {
                return
        }
-       pq.mutex.Lock()
        if pq.IsDroppd() {
-               pq.mutex.Unlock()
                return
        }
        if !pq.order {
                select {
                case <-pq.closeChan:
-                       pq.mutex.Unlock()
                        return
                case pq.msgCh <- messages:
                }
        }
+
+       pq.mutex.Lock()
+
        validMessageCount := 0
        for idx := range messages {
                msg := messages[idx]

Reply via email to