This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new 5790fc48 golang: Add namespace in Resource and metadata (#753) 5790fc48 is described below commit 5790fc486a0e45f96b78e64550a54371c3deac61 Author: guyinyou <36399867+guyin...@users.noreply.github.com> AuthorDate: Thu May 16 15:32:35 2024 +0800 golang: Add namespace in Resource and metadata (#753) * metadata和resource支持namespace字段 * add ut --------- Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com> --- golang/client.go | 5 ++- golang/client_test.go | 6 ++-- golang/metadata/metadata.go | 10 +++--- golang/producer.go | 13 ++++--- golang/publishing_message.go | 8 +++-- .../metadata.go => publishing_message_test.go} | 42 ++++++++++------------ golang/simple_consumer.go | 15 +++++--- golang/simple_consumer_options.go | 3 +- golang/transaction.go | 4 +-- 9 files changed, 59 insertions(+), 47 deletions(-) diff --git a/golang/client.go b/golang/client.go index 71c48198..9f60e061 100644 --- a/golang/client.go +++ b/golang/client.go @@ -377,7 +377,8 @@ func (cli *defaultClient) queryRoute(ctx context.Context, topic string, duration func (cli *defaultClient) getQueryRouteRequest(topic string) *v2.QueryRouteRequest { return &v2.QueryRouteRequest{ Topic: &v2.Resource{ - Name: topic, + Name: topic, + ResourceNamespace: cli.config.NameSpace, }, Endpoints: cli.accessPoint, } @@ -599,6 +600,8 @@ func (cli *defaultClient) Sign(ctx context.Context) context.Context { innerMD.VersionValue, innerMD.ClintID, cli.clientID, + innerMD.NameSpace, + cli.config.NameSpace, innerMD.DateTime, now, innerMD.Authorization, diff --git a/golang/client_test.go b/golang/client_test.go index 4549bdfe..716ca8cf 100644 --- a/golang/client_test.go +++ b/golang/client_test.go @@ -298,7 +298,8 @@ func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) { func Test_routeEqual(t *testing.T) { oldMq := &v2.MessageQueue{ Topic: &v2.Resource{ - Name: "topic-test", + Name: "topic-test", + ResourceNamespace: "ns-test", }, Id: 0, Permission: v2.Permission_READ_WRITE, @@ -313,7 +314,8 @@ func Test_routeEqual(t *testing.T) { } newMq := &v2.MessageQueue{ Topic: &v2.Resource{ - Name: "topic-test", + Name: "topic-test", + ResourceNamespace: "ns-test", }, Id: 0, Permission: v2.Permission_READ_WRITE, diff --git a/golang/metadata/metadata.go b/golang/metadata/metadata.go index 5a8b919d..ccc6627c 100644 --- a/golang/metadata/metadata.go +++ b/golang/metadata/metadata.go @@ -18,11 +18,11 @@ package metadata const ( - LanguageKey = "x-mq-language" - ProtocolKey = "x-mq-protocol" - RequestID = "x-mq-request-id" - VersionKey = "x-mq-client-version" - // NameSpace = "x-mq-namespace" + LanguageKey = "x-mq-language" + ProtocolKey = "x-mq-protocol" + RequestID = "x-mq-request-id" + VersionKey = "x-mq-client-version" + NameSpace = "x-mq-namespace" DateTime = "x-mq-date-time" ClintID = "x-mq-client-id" Authorization = "authorization" diff --git a/golang/producer.go b/golang/producer.go index f2fbf590..3e878a93 100644 --- a/golang/producer.go +++ b/golang/producer.go @@ -155,7 +155,8 @@ var NewProducer = func(config *Config, opts ...ProducerOption) (Producer, error) } for _, topic := range po.topics { topicResource := &v2.Resource{ - Name: topic, + Name: topic, + ResourceNamespace: config.NameSpace, } p.pSetting.topics.Store(topic, topicResource) } @@ -287,7 +288,7 @@ func (p *defaultProducer) send0(ctx context.Context, msgs []*UnifiedMessage, txE var err error pubMessage = uMsg.pubMsg if uMsg.pubMsg == nil { - pubMessage, err = NewPublishingMessage(msg, p.pSetting, txEnabled) + pubMessage, err = NewPublishingMessage(msg, p.cli.config.NameSpace, p.pSetting, txEnabled) if err != nil { return nil, err } @@ -315,7 +316,8 @@ func (p *defaultProducer) send0(ctx context.Context, msgs []*UnifiedMessage, txE } if _, ok := p.pSetting.topics.Load(topicName); !ok { p.pSetting.topics.Store(topicName, &v2.Resource{ - Name: topicName, + Name: topicName, + ResourceNamespace: p.cli.config.NameSpace, }) } pubLoadBalancer, err := p.getPublishingTopicRouteResult(ctx, topicName) @@ -362,7 +364,7 @@ func (p *defaultProducer) SendWithTransaction(ctx context.Context, msg *Message, return nil, fmt.Errorf("producer is not running") } t := transaction.(*transactionImpl) - pubMessage, err := t.tryAddMessage(msg) + pubMessage, err := t.tryAddMessage(msg, p.cli.config.NameSpace) if err != nil { return nil, err } @@ -394,7 +396,8 @@ func (p *defaultProducer) endTransaction(ctx context.Context, endpoints *v2.Endp ctx = p.cli.Sign(ctx) request := &v2.EndTransactionRequest{ Topic: &v2.Resource{ - Name: messageCommon.topic, + Name: messageCommon.topic, + ResourceNamespace: p.cli.config.NameSpace, }, MessageId: messageId, TransactionId: transactionId, diff --git a/golang/publishing_message.go b/golang/publishing_message.go index 456202ab..ea55dd19 100644 --- a/golang/publishing_message.go +++ b/golang/publishing_message.go @@ -26,6 +26,7 @@ import ( ) type PublishingMessage struct { + namespace string msg *Message encoding v2.Encoding messageId string @@ -33,7 +34,7 @@ type PublishingMessage struct { traceContext *string } -var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) { +var NewPublishingMessage = func(msg *Message, namespace string, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) { if msg == nil { return nil, fmt.Errorf("message is nil") } @@ -51,6 +52,8 @@ var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnab // No need to compress message body. pMsg.encoding = v2.Encoding_IDENTITY + pMsg.namespace = namespace + // Generate message id. pMsg.messageId = GetMessageIdCodecInstance().NextMessageId().String() // Normal message. @@ -84,7 +87,8 @@ func (pMsg *PublishingMessage) toProtobuf() (*v2.Message, error) { msg := &v2.Message{ Topic: &v2.Resource{ // ResourceNamespace: b.conn.Config().NameSpace, - Name: pMsg.msg.Topic, + Name: pMsg.msg.Topic, + ResourceNamespace: pMsg.namespace, }, SystemProperties: &v2.SystemProperties{ Keys: pMsg.msg.GetKeys(), diff --git a/golang/metadata/metadata.go b/golang/publishing_message_test.go similarity index 59% copy from golang/metadata/metadata.go copy to golang/publishing_message_test.go index 5a8b919d..5a030e48 100644 --- a/golang/metadata/metadata.go +++ b/golang/publishing_message_test.go @@ -15,29 +15,23 @@ * limitations under the License. */ -package metadata +package golang -const ( - LanguageKey = "x-mq-language" - ProtocolKey = "x-mq-protocol" - RequestID = "x-mq-request-id" - VersionKey = "x-mq-client-version" - // NameSpace = "x-mq-namespace" - DateTime = "x-mq-date-time" - ClintID = "x-mq-client-id" - Authorization = "authorization" -) +import "testing" -const ( - LanguageValue = "GO" - ProtocolValue = "v2" - VersionValue = "5.0.1-rc1" -) - -const ( - EncryptHeader = "MQv2-HMAC-SHA1" - Rocketmq = "Rocketmq" - Credential = "Credential" - Signature = "Signature" - SignedHeaders = "SignedHeaders" -) +func TestNewPublishingMessage(t *testing.T) { + namespace := "ns-test" + pSetting := &producerSettings{} + msg := &Message{} + pMsg, err := NewPublishingMessage(msg, namespace, pSetting, false) + if err != nil { + t.Error(err) + } + v2Msg, err := pMsg.toProtobuf() + if err != nil { + t.Error(err) + } + if v2Msg.GetTopic().GetResourceNamespace() != namespace { + t.Error("namespace not equal") + } +} diff --git a/golang/simple_consumer.go b/golang/simple_consumer.go index 8abfb413..f867696e 100644 --- a/golang/simple_consumer.go +++ b/golang/simple_consumer.go @@ -75,10 +75,12 @@ func (sc *defaultSimpleConsumer) changeInvisibleDuration0(messageView *MessageVi ctx := sc.cli.Sign(context.Background()) request := &v2.ChangeInvisibleDurationRequest{ Topic: &v2.Resource{ - Name: messageView.GetTopic(), + Name: messageView.GetTopic(), + ResourceNamespace: sc.cli.config.NameSpace, }, Group: &v2.Resource{ - Name: sc.groupName, + Name: sc.groupName, + ResourceNamespace: sc.cli.config.NameSpace, }, ReceiptHandle: messageView.GetReceiptHandle(), InvisibleDuration: durationpb.New(invisibleDuration), @@ -166,7 +168,8 @@ func (sc *defaultSimpleConsumer) wrapReceiveMessageRequest(batchSize int, messag return &v2.ReceiveMessageRequest{ Group: &v2.Resource{ - Name: sc.groupName, + Name: sc.groupName, + ResourceNamespace: sc.cli.config.NameSpace, }, MessageQueue: messageQueue, FilterExpression: &v2.FilterExpression{ @@ -183,7 +186,8 @@ func (sc *defaultSimpleConsumer) wrapAckMessageRequest(messageView *MessageView) return &v2.AckMessageRequest{ Group: sc.scSettings.groupName, Topic: &v2.Resource{ - Name: messageView.GetTopic(), + Name: messageView.GetTopic(), + ResourceNamespace: sc.cli.config.NameSpace, }, Entries: []*v2.AckMessageEntry{ { @@ -369,7 +373,8 @@ var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (Simp requestTimeout: sc.cli.opts.timeout, groupName: &v2.Resource{ - Name: sc.groupName, + Name: sc.groupName, + ResourceNamespace: config.NameSpace, }, longPollingTimeout: scOpts.awaitDuration, subscriptionExpressions: scOpts.subscriptionExpressions, diff --git a/golang/simple_consumer_options.go b/golang/simple_consumer_options.go index 857a4191..253b3c3d 100644 --- a/golang/simple_consumer_options.go +++ b/golang/simple_consumer_options.go @@ -156,7 +156,8 @@ func (sc *simpleConsumerSettings) toProtobuf() *v2.Settings { subscriptions := make([]*v2.SubscriptionEntry, 0) for k, v := range sc.subscriptionExpressions { topic := &v2.Resource{ - Name: k, + Name: k, + ResourceNamespace: sc.groupName.GetResourceNamespace(), } filterExpression := &v2.FilterExpression{ Expression: v.expression, diff --git a/golang/transaction.go b/golang/transaction.go index f6741101..89839616 100644 --- a/golang/transaction.go +++ b/golang/transaction.go @@ -88,7 +88,7 @@ func (t *transactionImpl) RollBack() error { return nil } -func (t *transactionImpl) tryAddMessage(message *Message) (*PublishingMessage, error) { +func (t *transactionImpl) tryAddMessage(message *Message, namespace string) (*PublishingMessage, error) { t.messagesLock.RLock() if len(t.messages) > MAX_MESSAGE_NUM { return nil, fmt.Errorf("message in transaction has exceeded the threshold: %d", MAX_MESSAGE_NUM) @@ -100,7 +100,7 @@ func (t *transactionImpl) tryAddMessage(message *Message) (*PublishingMessage, e if len(t.messages) > MAX_MESSAGE_NUM { return nil, fmt.Errorf("message in transaction has exceeded the threshold: %d", MAX_MESSAGE_NUM) } - pubMessage, err := NewPublishingMessage(message, t.producerImpl.(*defaultProducer).pSetting, true) + pubMessage, err := NewPublishingMessage(message, namespace, t.producerImpl.(*defaultProducer).pSetting, true) if err != nil { return nil, err }