This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a343f1b  fix consumer stopped consuming when panic in consumeListener 
(#910)
a343f1b is described below

commit a343f1b2ad03e8c383d2044228eba982a194ab5b
Author: cserwen <[email protected]>
AuthorDate: Thu Sep 1 19:53:26 2022 +0800

    fix consumer stopped consuming when panic in consumeListener (#910)
    
    Co-authored-by: dengzhiwen1 <[email protected]>
---
 consumer/push_consumer.go | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3ee5470..6080657 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -1038,15 +1038,23 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                }
 
                go primitive.WithRecover(func() {
+                       defer func() {
+                               if err := recover(); err != nil {
+                                       rlog.Error("consumeMessageCurrently 
panic", map[string]interface{}{
+                                               rlog.LogKeyUnderlayError: err,
+                                               rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
+                                       })
+                               }
+                               if !limiterOn {
+                                       <-pc.crCh
+                               }
+                       }()
                RETRY:
                        if pq.IsDroppd() {
                                rlog.Info("the message queue not be able to 
consume, because it was dropped", map[string]interface{}{
                                        rlog.LogKeyMessageQueue:  mq.String(),
                                        rlog.LogKeyConsumerGroup: 
pc.consumerGroup,
                                })
-                               if !limiterOn {
-                                       <-pc.crCh
-                               }
                                return
                        }
 
@@ -1126,9 +1134,6 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                        "message":               subMsgs,
                                })
                        }
-                       if !limiterOn {
-                               <-pc.crCh
-                       }
                })
        }
 }

Reply via email to