This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new bc2a112b Client configuration add namespace (#750)
bc2a112b is described below

commit bc2a112b092018061561966f8bbacefc9f111a5b
Author: zhaohai <33314633+zhaohai...@users.noreply.github.com>
AuthorDate: Fri May 17 15:19:10 2024 +0800

    Client configuration add namespace (#750)
    
    * clientConfiguration add namespace
    
    * update beta version
    
    * add namespace check
    
    * add namespace
    
    * add namespace
    
    * update version
    
    * Remove empty values from settings
    
    * update format and single quotes
    
    * update format
    
    * update version
    
    * remove codecov
    
    * Change namespace to required
    
    * Change namespace to required
    
    * Change namespace to required
    
    * test adds default parameters
    
    ---------
    
    Co-authored-by: zh378814 <wb-zh378...@alibaba-inc.com>
---
 .github/workflows/nodejs_coverage.yml             | 46 -----------------------
 nodejs/examples/Producer.ts                       |  1 +
 nodejs/examples/SimpleConsumer.ts                 |  1 +
 nodejs/src/client/BaseClient.ts                   |  6 +++
 nodejs/src/client/Settings.ts                     |  5 ++-
 nodejs/src/consumer/SimpleConsumer.ts             |  2 +-
 nodejs/src/consumer/SimpleSubscriptionSettings.ts |  5 +--
 nodejs/src/message/PublishingMessage.ts           |  6 ++-
 nodejs/src/producer/Producer.ts                   | 10 +++--
 nodejs/src/producer/PublishingSettings.ts         |  6 +--
 nodejs/test/consumer/SimpleConsumer.test.ts       |  9 ++++-
 nodejs/test/helper.ts                             |  1 +
 nodejs/test/producer/Producer.test.ts             | 12 +++++-
 13 files changed, 47 insertions(+), 63 deletions(-)

diff --git a/.github/workflows/nodejs_coverage.yml 
b/.github/workflows/nodejs_coverage.yml
deleted file mode 100644
index 2a29a5b2..00000000
--- a/.github/workflows/nodejs_coverage.yml
+++ /dev/null
@@ -1,46 +0,0 @@
-name: Node.js Coverage
-on:
-  pull_request:
-    types: [opened, reopened, synchronize]
-    paths:
-      - 'nodejs/**'
-  push:
-    branches:
-      - master
-
-jobs:
-  build:
-    runs-on: ubuntu-latest
-    strategy:
-      matrix:
-        node-version: [16.19.0, 16.x, 18.x, 20.x]
-    steps:
-      - name: Checkout Git Source
-        uses: actions/checkout@v3
-        with:
-          submodules: recursive
-
-      - name: Use Node.js ${{ matrix.node-version }}
-        uses: actions/setup-node@v3
-        with:
-          node-version: ${{ matrix.node-version }}
-
-      - name: Install dependencies
-        working-directory: ./nodejs
-        run: npm i && npm run init
-
-      - name: Start RocketMQ Server
-        working-directory: ./nodejs
-        run: npm run start-rocketmq
-
-      - name: Run test
-        working-directory: ./nodejs
-        run: npm run ci
-
-      - name: Code Coverage
-        uses: codecov/codecov-action@v3
-        with:
-          files: ./nodejs/coverage/coverage-final.json
-          flags: nodejs
-          fail_ci_if_error: true
-          verbose: true
diff --git a/nodejs/examples/Producer.ts b/nodejs/examples/Producer.ts
index c90c761f..6b8be4c1 100644
--- a/nodejs/examples/Producer.ts
+++ b/nodejs/examples/Producer.ts
@@ -19,6 +19,7 @@ import { Producer } from '..';
 
 const producer = new Producer({
   endpoints: '127.0.0.1:8081',
+  namespace: ''
 });
 await producer.startup();
 
diff --git a/nodejs/examples/SimpleConsumer.ts 
b/nodejs/examples/SimpleConsumer.ts
index c58d9d17..fb36c38d 100644
--- a/nodejs/examples/SimpleConsumer.ts
+++ b/nodejs/examples/SimpleConsumer.ts
@@ -20,6 +20,7 @@ import { SimpleConsumer } from '..';
 const simpleConsumer = new SimpleConsumer({
   consumerGroup: 'nodejs-demo-group',
   endpoints: '127.0.0.1:8081',
+  namespace: '',
   subscriptions: new Map().set('TopicTest', 'nodejs-demo'),
 });
 await simpleConsumer.startup();
diff --git a/nodejs/src/client/BaseClient.ts b/nodejs/src/client/BaseClient.ts
index 9fa12dae..77f790b9 100644
--- a/nodejs/src/client/BaseClient.ts
+++ b/nodejs/src/client/BaseClient.ts
@@ -57,6 +57,7 @@ export interface BaseClientOptions {
    * - example.com:8443
    */
   endpoints: string;
+  namespace: string;
   sessionCredentials?: SessionCredentials;
   requestTimeout?: number;
   logger?: ILogger;
@@ -76,6 +77,7 @@ export abstract class BaseClient {
   readonly clientType = ClientType.CLIENT_TYPE_UNSPECIFIED;
   readonly sslEnabled: boolean;
   readonly #sessionCredentials?: SessionCredentials;
+  readonly namespace: string;
   protected readonly endpoints: Endpoints;
   protected readonly isolated = new Map<string, Endpoints>();
   protected readonly requestTimeout: number;
@@ -92,6 +94,7 @@ export abstract class BaseClient {
     this.logger = options.logger ?? getDefaultLogger();
     this.sslEnabled = options.sslEnabled === true;
     this.endpoints = new Endpoints(options.endpoints);
+    this.namespace = options.namespace;
     this.#sessionCredentials = options.sessionCredentials;
     // https://rocketmq.apache.org/docs/introduction/03limits/
     // Default request timeout is 3000ms
@@ -288,6 +291,9 @@ export abstract class BaseClient {
     metadata.set('x-mq-language', 'HTTP');
     // version of client
     metadata.set('x-mq-client-version', UserAgent.INSTANCE.version);
+    if (this.namespace) {
+      metadata.set('x-mq-namespace', this.namespace);
+    }
     if (this.#sessionCredentials) {
       if (this.#sessionCredentials.securityToken) {
         metadata.set('x-mq-session-token', 
this.#sessionCredentials.securityToken);
diff --git a/nodejs/src/client/Settings.ts b/nodejs/src/client/Settings.ts
index 34f7eebd..d67b51ae 100644
--- a/nodejs/src/client/Settings.ts
+++ b/nodejs/src/client/Settings.ts
@@ -20,15 +20,16 @@ import { Endpoints } from '../route/Endpoints';
 import { RetryPolicy } from '../retry';
 
 export abstract class Settings {
+  protected readonly namespace: string;
   protected readonly clientId: string;
   protected readonly clientType: ClientType;
   protected readonly accessPoint: Endpoints;
   protected retryPolicy?: RetryPolicy;
   protected readonly requestTimeout: number;
 
-  constructor(clientId: string, clientType: ClientType, accessPoint: 
Endpoints, requestTimeout: number,
-    retryPolicy?: RetryPolicy) {
+  constructor(namespace: string, clientId: string, clientType: ClientType, 
accessPoint: Endpoints, requestTimeout: number, retryPolicy?: RetryPolicy) {
     this.clientId = clientId;
+    this.namespace = namespace;
     this.clientType = clientType;
     this.accessPoint = accessPoint;
     this.retryPolicy = retryPolicy;
diff --git a/nodejs/src/consumer/SimpleConsumer.ts 
b/nodejs/src/consumer/SimpleConsumer.ts
index 916f362c..b5b045a6 100644
--- a/nodejs/src/consumer/SimpleConsumer.ts
+++ b/nodejs/src/consumer/SimpleConsumer.ts
@@ -60,7 +60,7 @@ export class SimpleConsumer extends Consumer {
       }
     }
     this.#awaitDuration = options.awaitDuration ?? 30000;
-    this.#simpleSubscriptionSettings = new 
SimpleSubscriptionSettings(this.clientId, this.endpoints,
+    this.#simpleSubscriptionSettings = new 
SimpleSubscriptionSettings(options.namespace, this.clientId, this.endpoints,
       this.consumerGroup, this.requestTimeout, this.#awaitDuration, 
this.#subscriptionExpressions);
   }
 
diff --git a/nodejs/src/consumer/SimpleSubscriptionSettings.ts 
b/nodejs/src/consumer/SimpleSubscriptionSettings.ts
index 116a613b..f12c4e28 100644
--- a/nodejs/src/consumer/SimpleSubscriptionSettings.ts
+++ b/nodejs/src/consumer/SimpleSubscriptionSettings.ts
@@ -30,9 +30,8 @@ export class SimpleSubscriptionSettings extends Settings {
   readonly group: string;
   readonly subscriptionExpressions: Map<string, FilterExpression>;
 
-  constructor(clientId: string, accessPoint: Endpoints, group: string,
-    requestTimeout: number, longPollingTimeout: number, 
subscriptionExpressions: Map<string, FilterExpression>) {
-    super(clientId, ClientType.SIMPLE_CONSUMER, accessPoint, requestTimeout);
+  constructor(namespace: string, clientId: string, accessPoint: Endpoints, 
group: string, requestTimeout: number, longPollingTimeout: number, 
subscriptionExpressions: Map<string, FilterExpression>) {
+    super(namespace, clientId, ClientType.SIMPLE_CONSUMER, accessPoint, 
requestTimeout);
     this.longPollingTimeout = longPollingTimeout;
     this.group = group;
     this.subscriptionExpressions = subscriptionExpressions;
diff --git a/nodejs/src/message/PublishingMessage.ts 
b/nodejs/src/message/PublishingMessage.ts
index b6015bf3..959d2f28 100644
--- a/nodejs/src/message/PublishingMessage.ts
+++ b/nodejs/src/message/PublishingMessage.ts
@@ -68,7 +68,7 @@ export class PublishingMessage extends Message {
    * This method should be invoked before each message sending, because the 
born time is reset before each
    * invocation, which means that it should not be invoked ahead of time.
    */
-  toProtobuf(mq: MessageQueue) {
+  toProtobuf(namespace: string, mq: MessageQueue) {
     const systemProperties = new SystemProperties()
       .setKeysList(this.keys)
       .setMessageId(this.messageId)
@@ -87,8 +87,10 @@ export class PublishingMessage extends Message {
       systemProperties.setMessageGroup(this.messageGroup);
     }
 
+    const resource = createResource(this.topic);
+    resource.setResourceNamespace(namespace);
     const message = new MessagePB()
-      .setTopic(createResource(this.topic))
+      .setTopic(resource)
       .setBody(this.body)
       .setSystemProperties(systemProperties);
     if (this.properties) {
diff --git a/nodejs/src/producer/Producer.ts b/nodejs/src/producer/Producer.ts
index 623ecf46..8484167f 100644
--- a/nodejs/src/producer/Producer.ts
+++ b/nodejs/src/producer/Producer.ts
@@ -66,7 +66,7 @@ export class Producer extends BaseClient {
     // https://rocketmq.apache.org/docs/introduction/03limits/
     // Default max number of message sending retries is 3
     const retryPolicy = 
ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(options.maxAttempts ?? 3);
-    this.#publishingSettings = new PublishingSettings(this.clientId, 
this.endpoints, retryPolicy,
+    this.#publishingSettings = new PublishingSettings(options.namespace, 
this.clientId, this.endpoints, retryPolicy,
       this.requestTimeout, this.topics);
     this.#checker = options.checker;
   }
@@ -85,7 +85,7 @@ export class Producer extends BaseClient {
     const request = new EndTransactionRequest()
       .setMessageId(messageId)
       .setTransactionId(transactionId)
-      .setTopic(createResource(message.topic))
+      
.setTopic(createResource(message.topic).setResourceNamespace(this.namespace))
       .setResolution(resolution);
     const response = await this.rpcClientManager.endTransaction(endpoints, 
request, this.requestTimeout);
     StatusChecker.check(response.getStatus()?.toObject());
@@ -187,7 +187,11 @@ export class Producer extends BaseClient {
   #wrapSendMessageRequest(pubMessages: PublishingMessage[], mq: MessageQueue) {
     const request = new SendMessageRequest();
     for (const pubMessage of pubMessages) {
-      request.addMessages(pubMessage.toProtobuf(mq));
+      if (this.namespace) {
+        request.addMessages(pubMessage.toProtobuf(this.namespace, mq));
+      } else {
+        request.addMessages(pubMessage.toProtobuf('', mq));
+      }
     }
     return request;
   }
diff --git a/nodejs/src/producer/PublishingSettings.ts 
b/nodejs/src/producer/PublishingSettings.ts
index fb72d66d..8b413808 100644
--- a/nodejs/src/producer/PublishingSettings.ts
+++ b/nodejs/src/producer/PublishingSettings.ts
@@ -35,9 +35,8 @@ export class PublishingSettings extends Settings {
   #maxBodySizeBytes = 4 * 1024 * 1024;
   #validateMessageType = true;
 
-  constructor(clientId: string, accessPoint: Endpoints, retryPolicy: 
ExponentialBackoffRetryPolicy,
-    requestTimeout: number, topics: Set<string>) {
-    super(clientId, ClientType.PRODUCER, accessPoint, requestTimeout, 
retryPolicy);
+  constructor(namespace: string, clientId: string, accessPoint: Endpoints, 
retryPolicy: ExponentialBackoffRetryPolicy, requestTimeout: number, topics: 
Set<string>) {
+    super(namespace, clientId, ClientType.PRODUCER, accessPoint, 
requestTimeout, retryPolicy);
     this.#topics = topics;
   }
 
@@ -54,6 +53,7 @@ export class PublishingSettings extends Settings {
       .setValidateMessageType(this.#validateMessageType);
     for (const topic of this.#topics) {
       publishing.addTopics().setName(topic);
+      publishing.addTopics().setResourceNamespace(this.namespace);
     }
     return new SettingsPB()
       .setClientType(this.clientType)
diff --git a/nodejs/test/consumer/SimpleConsumer.test.ts 
b/nodejs/test/consumer/SimpleConsumer.test.ts
index 8e375a90..3962a2ea 100644
--- a/nodejs/test/consumer/SimpleConsumer.test.ts
+++ b/nodejs/test/consumer/SimpleConsumer.test.ts
@@ -21,7 +21,7 @@ import {
   SimpleConsumer, FilterExpression,
   Producer,
 } from '../../src';
-import { topics, endpoints, sessionCredentials } from '../helper';
+import { topics, endpoints, sessionCredentials, namespace } from '../helper';
 
 describe('test/consumer/SimpleConsumer.test.ts', () => {
   let producer: Producer | null = null;
@@ -42,6 +42,7 @@ describe('test/consumer/SimpleConsumer.test.ts', () => {
       if (!sessionCredentials) return;
       simpleConsumer = new SimpleConsumer({
         endpoints,
+        namespace,
         sessionCredentials,
         consumerGroup: 'nodejs-unittest-group',
         subscriptions: new Map().set(topics.delay, FilterExpression.SUB_ALL),
@@ -53,6 +54,7 @@ describe('test/consumer/SimpleConsumer.test.ts', () => {
       if (!sessionCredentials) return;
       simpleConsumer = new SimpleConsumer({
         endpoints,
+        namespace,
         sessionCredentials: {
           ...sessionCredentials,
           accessKey: 'wrong',
@@ -69,6 +71,7 @@ describe('test/consumer/SimpleConsumer.test.ts', () => {
       if (!sessionCredentials) return;
       simpleConsumer = new SimpleConsumer({
         endpoints,
+        namespace,
         sessionCredentials: {
           ...sessionCredentials,
           accessSecret: 'wrong',
@@ -88,11 +91,13 @@ describe('test/consumer/SimpleConsumer.test.ts', () => {
       const tag = `nodejs-unittest-tag-${randomUUID()}`;
       producer = new Producer({
         endpoints,
-        sessionCredentials,
+        namespace,
+        sessionCredentials
       });
       await producer.startup();
       simpleConsumer = new SimpleConsumer({
         endpoints,
+        namespace,
         sessionCredentials,
         consumerGroup: `nodejs-unittest-group-${randomUUID()}`,
         subscriptions: new Map().set(topic, new FilterExpression(tag)),
diff --git a/nodejs/test/helper.ts b/nodejs/test/helper.ts
index 4a3deec2..4cd23fa4 100644
--- a/nodejs/test/helper.ts
+++ b/nodejs/test/helper.ts
@@ -18,6 +18,7 @@
 import { SessionCredentials } from '../src/client';
 
 export const endpoints = process.env.ROCKETMQ_NODEJS_CLIENT_ENDPOINTS ?? 
'localhost:8081';
+export const namespace = process.env.ROCKETMQ_NODEJS_CLIENT_NAMESPACE ?? '';
 export const topics = {
   normal: 'TopicTestForNormal',
   fifo: 'TopicTestForFifo',
diff --git a/nodejs/test/producer/Producer.test.ts 
b/nodejs/test/producer/Producer.test.ts
index 005cd3b9..66deec77 100644
--- a/nodejs/test/producer/Producer.test.ts
+++ b/nodejs/test/producer/Producer.test.ts
@@ -19,7 +19,7 @@ import { strict as assert } from 'node:assert';
 import { randomUUID } from 'node:crypto';
 import { NotFoundException, Producer, SimpleConsumer } from '../../src';
 import { TransactionResolution } from 
'../../proto/apache/rocketmq/v2/definition_pb';
-import { topics, endpoints, sessionCredentials, consumerGroup } from 
'../helper';
+import { topics, endpoints, sessionCredentials, consumerGroup, namespace } 
from '../helper';
 
 describe('test/producer/Producer.test.ts', () => {
   let producer: Producer | null = null;
@@ -39,6 +39,7 @@ describe('test/producer/Producer.test.ts', () => {
     it('should startup success', async () => {
       producer = new Producer({
         endpoints,
+        namespace,
         sessionCredentials,
         maxAttempts: 2,
       });
@@ -66,6 +67,7 @@ describe('test/producer/Producer.test.ts', () => {
         producer = new Producer({
           topic: 'TopicTest-not-exists',
           endpoints,
+          namespace,
           sessionCredentials,
           maxAttempts: 2,
         });
@@ -87,6 +89,7 @@ describe('test/producer/Producer.test.ts', () => {
       const tag = `nodejs-unittest-tag-${randomUUID()}`;
       producer = new Producer({
         endpoints,
+        namespace,
         sessionCredentials,
         maxAttempts: 2,
       });
@@ -108,6 +111,7 @@ describe('test/producer/Producer.test.ts', () => {
       simpleConsumer = new SimpleConsumer({
         consumerGroup,
         endpoints,
+        namespace,
         sessionCredentials,
         subscriptions: new Map().set(topic, tag),
         awaitDuration: 3000,
@@ -124,6 +128,7 @@ describe('test/producer/Producer.test.ts', () => {
       const tag = `nodejs-unittest-tag-${randomUUID()}`;
       producer = new Producer({
         endpoints,
+        namespace,
         sessionCredentials,
         maxAttempts: 2,
       });
@@ -147,6 +152,7 @@ describe('test/producer/Producer.test.ts', () => {
       simpleConsumer = new SimpleConsumer({
         consumerGroup,
         endpoints,
+        namespace,
         sessionCredentials,
         subscriptions: new Map().set(topic, tag),
         awaitDuration: 3000,
@@ -166,6 +172,7 @@ describe('test/producer/Producer.test.ts', () => {
       const tag = `nodejs-unittest-tag-${randomUUID()}`;
       producer = new Producer({
         endpoints,
+        namespace,
         sessionCredentials,
         maxAttempts: 2,
       });
@@ -173,6 +180,7 @@ describe('test/producer/Producer.test.ts', () => {
       simpleConsumer = new SimpleConsumer({
         consumerGroup,
         endpoints,
+        namespace,
         sessionCredentials,
         subscriptions: new Map().set(topic, tag),
         awaitDuration: 3000,
@@ -238,6 +246,7 @@ describe('test/producer/Producer.test.ts', () => {
       const tag = `nodejs-unittest-tag-${randomUUID()}`;
       producer = new Producer({
         endpoints,
+        namespace,
         sessionCredentials,
         maxAttempts: 2,
         checker: {
@@ -266,6 +275,7 @@ describe('test/producer/Producer.test.ts', () => {
       simpleConsumer = new SimpleConsumer({
         consumerGroup,
         endpoints,
+        namespace,
         sessionCredentials,
         subscriptions: new Map().set(topic, tag),
         awaitDuration: 3000,

Reply via email to