This is an automated email from the ASF dual-hosted git repository.

cserwen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a15c777  [ISSUE #1056] fix: When the consumer and producer have the 
same topic, the consumer fails to update the topic.
a15c777 is described below

commit a15c7771e84b00a33ba6f054d8c31d85654e2dac
Author: lvxiao <lvx...@yeah.net>
AuthorDate: Mon Jun 19 17:51:11 2023 +0800

    [ISSUE #1056] fix: When the consumer and producer have the same topic, the 
consumer fails to update the topic.
---
 internal/client.go | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/internal/client.go b/internal/client.go
index d479b0c..1321da9 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -686,19 +686,17 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
 }
 
 func (c *rmqClient) UpdateTopicRouteInfo() {
+       allTopics := make(map[string]bool, 0)
        publishTopicSet := make(map[string]bool, 0)
        c.producerMap.Range(func(key, value interface{}) bool {
                producer := value.(InnerProducer)
                list := producer.PublishTopicList()
                for idx := range list {
                        publishTopicSet[list[idx]] = true
+                       allTopics[list[idx]] = true
                }
                return true
        })
-       for topic := range publishTopicSet {
-               data, changed, _ := c.GetNameSrv().UpdateTopicRouteInfo(topic)
-               c.UpdatePublishInfo(topic, data, changed)
-       }
 
        subscribedTopicSet := make(map[string]bool, 0)
        c.consumerMap.Range(func(key, value interface{}) bool {
@@ -706,13 +704,22 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
                list := consumer.SubscriptionDataList()
                for idx := range list {
                        subscribedTopicSet[list[idx].Topic] = true
+                       allTopics[list[idx].Topic] = true
                }
                return true
        })
 
-       for topic := range subscribedTopicSet {
+       for topic := range allTopics {
                data, changed, _ := c.GetNameSrv().UpdateTopicRouteInfo(topic)
-               c.updateSubscribeInfo(topic, data, changed)
+
+               if publishTopicSet[topic] {
+                       c.UpdatePublishInfo(topic, data, changed)
+               }
+
+               if subscribedTopicSet[topic] {
+                       c.updateSubscribeInfo(topic, data, changed)
+               }
+
        }
 }
 

Reply via email to