This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 5790fc48 golang: Add namespace in Resource and metadata (#753)
5790fc48 is described below

commit 5790fc486a0e45f96b78e64550a54371c3deac61
Author: guyinyou <36399867+guyin...@users.noreply.github.com>
AuthorDate: Thu May 16 15:32:35 2024 +0800

    golang: Add namespace in Resource and metadata (#753)
    
    * metadata和resource支持namespace字段
    
    * add ut
    
    ---------
    
    Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com>
---
 golang/client.go                                   |  5 ++-
 golang/client_test.go                              |  6 ++--
 golang/metadata/metadata.go                        | 10 +++---
 golang/producer.go                                 | 13 ++++---
 golang/publishing_message.go                       |  8 +++--
 .../metadata.go => publishing_message_test.go}     | 42 ++++++++++------------
 golang/simple_consumer.go                          | 15 +++++---
 golang/simple_consumer_options.go                  |  3 +-
 golang/transaction.go                              |  4 +--
 9 files changed, 59 insertions(+), 47 deletions(-)

diff --git a/golang/client.go b/golang/client.go
index 71c48198..9f60e061 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -377,7 +377,8 @@ func (cli *defaultClient) queryRoute(ctx context.Context, 
topic string, duration
 func (cli *defaultClient) getQueryRouteRequest(topic string) 
*v2.QueryRouteRequest {
        return &v2.QueryRouteRequest{
                Topic: &v2.Resource{
-                       Name: topic,
+                       Name:              topic,
+                       ResourceNamespace: cli.config.NameSpace,
                },
                Endpoints: cli.accessPoint,
        }
@@ -599,6 +600,8 @@ func (cli *defaultClient) Sign(ctx context.Context) 
context.Context {
                innerMD.VersionValue,
                innerMD.ClintID,
                cli.clientID,
+               innerMD.NameSpace,
+               cli.config.NameSpace,
                innerMD.DateTime,
                now,
                innerMD.Authorization,
diff --git a/golang/client_test.go b/golang/client_test.go
index 4549bdfe..716ca8cf 100644
--- a/golang/client_test.go
+++ b/golang/client_test.go
@@ -298,7 +298,8 @@ func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) 
{
 func Test_routeEqual(t *testing.T) {
        oldMq := &v2.MessageQueue{
                Topic: &v2.Resource{
-                       Name: "topic-test",
+                       Name:              "topic-test",
+                       ResourceNamespace: "ns-test",
                },
                Id:         0,
                Permission: v2.Permission_READ_WRITE,
@@ -313,7 +314,8 @@ func Test_routeEqual(t *testing.T) {
        }
        newMq := &v2.MessageQueue{
                Topic: &v2.Resource{
-                       Name: "topic-test",
+                       Name:              "topic-test",
+                       ResourceNamespace: "ns-test",
                },
                Id:         0,
                Permission: v2.Permission_READ_WRITE,
diff --git a/golang/metadata/metadata.go b/golang/metadata/metadata.go
index 5a8b919d..ccc6627c 100644
--- a/golang/metadata/metadata.go
+++ b/golang/metadata/metadata.go
@@ -18,11 +18,11 @@
 package metadata
 
 const (
-       LanguageKey = "x-mq-language"
-       ProtocolKey = "x-mq-protocol"
-       RequestID   = "x-mq-request-id"
-       VersionKey  = "x-mq-client-version"
-       // NameSpace     = "x-mq-namespace"
+       LanguageKey   = "x-mq-language"
+       ProtocolKey   = "x-mq-protocol"
+       RequestID     = "x-mq-request-id"
+       VersionKey    = "x-mq-client-version"
+       NameSpace     = "x-mq-namespace"
        DateTime      = "x-mq-date-time"
        ClintID       = "x-mq-client-id"
        Authorization = "authorization"
diff --git a/golang/producer.go b/golang/producer.go
index f2fbf590..3e878a93 100644
--- a/golang/producer.go
+++ b/golang/producer.go
@@ -155,7 +155,8 @@ var NewProducer = func(config *Config, opts 
...ProducerOption) (Producer, error)
        }
        for _, topic := range po.topics {
                topicResource := &v2.Resource{
-                       Name: topic,
+                       Name:              topic,
+                       ResourceNamespace: config.NameSpace,
                }
                p.pSetting.topics.Store(topic, topicResource)
        }
@@ -287,7 +288,7 @@ func (p *defaultProducer) send0(ctx context.Context, msgs 
[]*UnifiedMessage, txE
                var err error
                pubMessage = uMsg.pubMsg
                if uMsg.pubMsg == nil {
-                       pubMessage, err = NewPublishingMessage(msg, p.pSetting, 
txEnabled)
+                       pubMessage, err = NewPublishingMessage(msg, 
p.cli.config.NameSpace, p.pSetting, txEnabled)
                        if err != nil {
                                return nil, err
                        }
@@ -315,7 +316,8 @@ func (p *defaultProducer) send0(ctx context.Context, msgs 
[]*UnifiedMessage, txE
        }
        if _, ok := p.pSetting.topics.Load(topicName); !ok {
                p.pSetting.topics.Store(topicName, &v2.Resource{
-                       Name: topicName,
+                       Name:              topicName,
+                       ResourceNamespace: p.cli.config.NameSpace,
                })
        }
        pubLoadBalancer, err := p.getPublishingTopicRouteResult(ctx, topicName)
@@ -362,7 +364,7 @@ func (p *defaultProducer) SendWithTransaction(ctx 
context.Context, msg *Message,
                return nil, fmt.Errorf("producer is not running")
        }
        t := transaction.(*transactionImpl)
-       pubMessage, err := t.tryAddMessage(msg)
+       pubMessage, err := t.tryAddMessage(msg, p.cli.config.NameSpace)
        if err != nil {
                return nil, err
        }
@@ -394,7 +396,8 @@ func (p *defaultProducer) endTransaction(ctx 
context.Context, endpoints *v2.Endp
        ctx = p.cli.Sign(ctx)
        request := &v2.EndTransactionRequest{
                Topic: &v2.Resource{
-                       Name: messageCommon.topic,
+                       Name:              messageCommon.topic,
+                       ResourceNamespace: p.cli.config.NameSpace,
                },
                MessageId:     messageId,
                TransactionId: transactionId,
diff --git a/golang/publishing_message.go b/golang/publishing_message.go
index 456202ab..ea55dd19 100644
--- a/golang/publishing_message.go
+++ b/golang/publishing_message.go
@@ -26,6 +26,7 @@ import (
 )
 
 type PublishingMessage struct {
+       namespace    string
        msg          *Message
        encoding     v2.Encoding
        messageId    string
@@ -33,7 +34,7 @@ type PublishingMessage struct {
        traceContext *string
 }
 
-var NewPublishingMessage = func(msg *Message, settings *producerSettings, 
txEnabled bool) (*PublishingMessage, error) {
+var NewPublishingMessage = func(msg *Message, namespace string, settings 
*producerSettings, txEnabled bool) (*PublishingMessage, error) {
        if msg == nil {
                return nil, fmt.Errorf("message is nil")
        }
@@ -51,6 +52,8 @@ var NewPublishingMessage = func(msg *Message, settings 
*producerSettings, txEnab
        // No need to compress message body.
        pMsg.encoding = v2.Encoding_IDENTITY
 
+       pMsg.namespace = namespace
+
        // Generate message id.
        pMsg.messageId = GetMessageIdCodecInstance().NextMessageId().String()
        // Normal message.
@@ -84,7 +87,8 @@ func (pMsg *PublishingMessage) toProtobuf() (*v2.Message, 
error) {
        msg := &v2.Message{
                Topic: &v2.Resource{
                        // ResourceNamespace: b.conn.Config().NameSpace,
-                       Name: pMsg.msg.Topic,
+                       Name:              pMsg.msg.Topic,
+                       ResourceNamespace: pMsg.namespace,
                },
                SystemProperties: &v2.SystemProperties{
                        Keys:          pMsg.msg.GetKeys(),
diff --git a/golang/metadata/metadata.go b/golang/publishing_message_test.go
similarity index 59%
copy from golang/metadata/metadata.go
copy to golang/publishing_message_test.go
index 5a8b919d..5a030e48 100644
--- a/golang/metadata/metadata.go
+++ b/golang/publishing_message_test.go
@@ -15,29 +15,23 @@
  * limitations under the License.
  */
 
-package metadata
+package golang
 
-const (
-       LanguageKey = "x-mq-language"
-       ProtocolKey = "x-mq-protocol"
-       RequestID   = "x-mq-request-id"
-       VersionKey  = "x-mq-client-version"
-       // NameSpace     = "x-mq-namespace"
-       DateTime      = "x-mq-date-time"
-       ClintID       = "x-mq-client-id"
-       Authorization = "authorization"
-)
+import "testing"
 
-const (
-       LanguageValue = "GO"
-       ProtocolValue = "v2"
-       VersionValue  = "5.0.1-rc1"
-)
-
-const (
-       EncryptHeader = "MQv2-HMAC-SHA1"
-       Rocketmq      = "Rocketmq"
-       Credential    = "Credential"
-       Signature     = "Signature"
-       SignedHeaders = "SignedHeaders"
-)
+func TestNewPublishingMessage(t *testing.T) {
+       namespace := "ns-test"
+       pSetting := &producerSettings{}
+       msg := &Message{}
+       pMsg, err := NewPublishingMessage(msg, namespace, pSetting, false)
+       if err != nil {
+               t.Error(err)
+       }
+       v2Msg, err := pMsg.toProtobuf()
+       if err != nil {
+               t.Error(err)
+       }
+       if v2Msg.GetTopic().GetResourceNamespace() != namespace {
+               t.Error("namespace not equal")
+       }
+}
diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go
index 8abfb413..f867696e 100644
--- a/golang/simple_consumer.go
+++ b/golang/simple_consumer.go
@@ -75,10 +75,12 @@ func (sc *defaultSimpleConsumer) 
changeInvisibleDuration0(messageView *MessageVi
        ctx := sc.cli.Sign(context.Background())
        request := &v2.ChangeInvisibleDurationRequest{
                Topic: &v2.Resource{
-                       Name: messageView.GetTopic(),
+                       Name:              messageView.GetTopic(),
+                       ResourceNamespace: sc.cli.config.NameSpace,
                },
                Group: &v2.Resource{
-                       Name: sc.groupName,
+                       Name:              sc.groupName,
+                       ResourceNamespace: sc.cli.config.NameSpace,
                },
                ReceiptHandle:     messageView.GetReceiptHandle(),
                InvisibleDuration: durationpb.New(invisibleDuration),
@@ -166,7 +168,8 @@ func (sc *defaultSimpleConsumer) 
wrapReceiveMessageRequest(batchSize int, messag
 
        return &v2.ReceiveMessageRequest{
                Group: &v2.Resource{
-                       Name: sc.groupName,
+                       Name:              sc.groupName,
+                       ResourceNamespace: sc.cli.config.NameSpace,
                },
                MessageQueue: messageQueue,
                FilterExpression: &v2.FilterExpression{
@@ -183,7 +186,8 @@ func (sc *defaultSimpleConsumer) 
wrapAckMessageRequest(messageView *MessageView)
        return &v2.AckMessageRequest{
                Group: sc.scSettings.groupName,
                Topic: &v2.Resource{
-                       Name: messageView.GetTopic(),
+                       Name:              messageView.GetTopic(),
+                       ResourceNamespace: sc.cli.config.NameSpace,
                },
                Entries: []*v2.AckMessageEntry{
                        {
@@ -369,7 +373,8 @@ var NewSimpleConsumer = func(config *Config, opts 
...SimpleConsumerOption) (Simp
                requestTimeout: sc.cli.opts.timeout,
 
                groupName: &v2.Resource{
-                       Name: sc.groupName,
+                       Name:              sc.groupName,
+                       ResourceNamespace: config.NameSpace,
                },
                longPollingTimeout:      scOpts.awaitDuration,
                subscriptionExpressions: scOpts.subscriptionExpressions,
diff --git a/golang/simple_consumer_options.go 
b/golang/simple_consumer_options.go
index 857a4191..253b3c3d 100644
--- a/golang/simple_consumer_options.go
+++ b/golang/simple_consumer_options.go
@@ -156,7 +156,8 @@ func (sc *simpleConsumerSettings) toProtobuf() *v2.Settings 
{
        subscriptions := make([]*v2.SubscriptionEntry, 0)
        for k, v := range sc.subscriptionExpressions {
                topic := &v2.Resource{
-                       Name: k,
+                       Name:              k,
+                       ResourceNamespace: sc.groupName.GetResourceNamespace(),
                }
                filterExpression := &v2.FilterExpression{
                        Expression: v.expression,
diff --git a/golang/transaction.go b/golang/transaction.go
index f6741101..89839616 100644
--- a/golang/transaction.go
+++ b/golang/transaction.go
@@ -88,7 +88,7 @@ func (t *transactionImpl) RollBack() error {
        return nil
 }
 
-func (t *transactionImpl) tryAddMessage(message *Message) (*PublishingMessage, 
error) {
+func (t *transactionImpl) tryAddMessage(message *Message, namespace string) 
(*PublishingMessage, error) {
        t.messagesLock.RLock()
        if len(t.messages) > MAX_MESSAGE_NUM {
                return nil, fmt.Errorf("message in transaction has exceeded the 
threshold: %d", MAX_MESSAGE_NUM)
@@ -100,7 +100,7 @@ func (t *transactionImpl) tryAddMessage(message *Message) 
(*PublishingMessage, e
        if len(t.messages) > MAX_MESSAGE_NUM {
                return nil, fmt.Errorf("message in transaction has exceeded the 
threshold: %d", MAX_MESSAGE_NUM)
        }
-       pubMessage, err := NewPublishingMessage(message, 
t.producerImpl.(*defaultProducer).pSetting, true)
+       pubMessage, err := NewPublishingMessage(message, namespace, 
t.producerImpl.(*defaultProducer).pSetting, true)
        if err != nil {
                return nil, err
        }

Reply via email to