This is an automated email from the ASF dual-hosted git repository. cserwen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push: new a15c777 [ISSUE #1056] fix: When the consumer and producer have the same topic, the consumer fails to update the topic. a15c777 is described below commit a15c7771e84b00a33ba6f054d8c31d85654e2dac Author: lvxiao <lvx...@yeah.net> AuthorDate: Mon Jun 19 17:51:11 2023 +0800 [ISSUE #1056] fix: When the consumer and producer have the same topic, the consumer fails to update the topic. --- internal/client.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/internal/client.go b/internal/client.go index d479b0c..1321da9 100644 --- a/internal/client.go +++ b/internal/client.go @@ -686,19 +686,17 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() { } func (c *rmqClient) UpdateTopicRouteInfo() { + allTopics := make(map[string]bool, 0) publishTopicSet := make(map[string]bool, 0) c.producerMap.Range(func(key, value interface{}) bool { producer := value.(InnerProducer) list := producer.PublishTopicList() for idx := range list { publishTopicSet[list[idx]] = true + allTopics[list[idx]] = true } return true }) - for topic := range publishTopicSet { - data, changed, _ := c.GetNameSrv().UpdateTopicRouteInfo(topic) - c.UpdatePublishInfo(topic, data, changed) - } subscribedTopicSet := make(map[string]bool, 0) c.consumerMap.Range(func(key, value interface{}) bool { @@ -706,13 +704,22 @@ func (c *rmqClient) UpdateTopicRouteInfo() { list := consumer.SubscriptionDataList() for idx := range list { subscribedTopicSet[list[idx].Topic] = true + allTopics[list[idx].Topic] = true } return true }) - for topic := range subscribedTopicSet { + for topic := range allTopics { data, changed, _ := c.GetNameSrv().UpdateTopicRouteInfo(topic) - c.updateSubscribeInfo(topic, data, changed) + + if publishTopicSet[topic] { + c.UpdatePublishInfo(topic, data, changed) + } + + if subscribedTopicSet[topic] { + c.updateSubscribeInfo(topic, data, changed) + } + } }