This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a343f1b fix consumer stopped consuming when panic in consumeListener
(#910)
a343f1b is described below
commit a343f1b2ad03e8c383d2044228eba982a194ab5b
Author: cserwen <[email protected]>
AuthorDate: Thu Sep 1 19:53:26 2022 +0800
fix consumer stopped consuming when panic in consumeListener (#910)
Co-authored-by: dengzhiwen1 <[email protected]>
---
consumer/push_consumer.go | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3ee5470..6080657 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -1038,15 +1038,23 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
}
go primitive.WithRecover(func() {
+ defer func() {
+ if err := recover(); err != nil {
+ rlog.Error("consumeMessageCurrently
panic", map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ rlog.LogKeyConsumerGroup:
pc.consumerGroup,
+ })
+ }
+ if !limiterOn {
+ <-pc.crCh
+ }
+ }()
RETRY:
if pq.IsDroppd() {
rlog.Info("the message queue not be able to
consume, because it was dropped", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup:
pc.consumerGroup,
})
- if !limiterOn {
- <-pc.crCh
- }
return
}
@@ -1126,9 +1134,6 @@ func (pc *pushConsumer) consumeMessageCurrently(pq
*processQueue, mq *primitive.
"message": subMsgs,
})
}
- if !limiterOn {
- <-pc.crCh
- }
})
}
}