This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 12a03ce  feat: support producer access mode. (#331)
12a03ce is described below

commit 12a03ce7fc1b26ba68f75aaf586d73aae9f9fbf9
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jun 15 21:41:55 2023 +0800

    feat: support producer access mode. (#331)
---
 index.d.ts             |  7 ++++++
 src/ProducerConfig.cc  | 14 ++++++++++++
 tests/producer.test.js | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 83 insertions(+)

diff --git a/index.d.ts b/index.d.ts
index b394804..161dabc 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -63,6 +63,7 @@ export interface ProducerConfig {
   cryptoFailureAction?: ProducerCryptoFailureAction;
   chunkingEnabled?: boolean;
   schema?: SchemaInfo;
+  accessMode?: ProducerAccessMode;
 }
 
 export class Producer {
@@ -267,3 +268,9 @@ export type SchemaType =
   'Bytes' |
   'AutoConsume' |
   'AutoPublish';
+
+export type ProducerAccessMode =
+    'Shared' |
+    'Exclusive' |
+    'WaitForExclusive' |
+    'ExclusiveWithFencing';
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 52d7707..120eebf 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -39,6 +39,7 @@ static const std::string CFG_PUBLIC_KEY_PATH = 
"publicKeyPath";
 static const std::string CFG_ENCRYPTION_KEY = "encryptionKey";
 static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
 static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
+static const std::string CFG_ACCESS_MODE = "accessMode";
 
 static const std::map<std::string, pulsar_partitions_routing_mode> 
MESSAGE_ROUTING_MODE = {
     {"UseSinglePartition", pulsar_UseSinglePartition},
@@ -63,6 +64,13 @@ static std::map<std::string, 
pulsar_producer_crypto_failure_action> PRODUCER_CRY
     {"SEND", pulsar_ProducerSend},
 };
 
+static std::map<std::string, pulsar_producer_access_mode> PRODUCER_ACCESS_MODE 
= {
+    {"Shared", pulsar_ProducerAccessModeShared},
+    {"Exclusive", pulsar_ProducerAccessModeExclusive},
+    {"WaitForExclusive", pulsar_ProducerAccessModeWaitForExclusive},
+    {"ExclusiveWithFencing", pulsar_ProducerAccessModeExclusiveWithFencing},
+};
+
 ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") 
{
   this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>(
       pulsar_producer_configuration_create(), 
pulsar_producer_configuration_free);
@@ -194,6 +202,12 @@ ProducerConfig::ProducerConfig(const Napi::Object& 
producerConfig) : topic("") {
     bool chunkingEnabled = 
producerConfig.Get(CFG_CHUNK_ENABLED).ToBoolean().Value();
     
pulsar_producer_configuration_set_chunking_enabled(this->cProducerConfig.get(), 
chunkingEnabled);
   }
+
+  std::string accessMode = 
producerConfig.Get(CFG_ACCESS_MODE).ToString().Utf8Value();
+  if (PRODUCER_ACCESS_MODE.count(accessMode)) {
+    pulsar_producer_configuration_set_access_mode(this->cProducerConfig.get(),
+                                                  
PRODUCER_ACCESS_MODE.at(accessMode));
+  }
 }
 
 ProducerConfig::~ProducerConfig() {}
diff --git a/tests/producer.test.js b/tests/producer.test.js
index fa7710b..fbce743 100644
--- a/tests/producer.test.js
+++ b/tests/producer.test.js
@@ -94,5 +94,67 @@ const Pulsar = require('../index.js');
         await producer.close();
       });
     });
+    describe('Access Mode', () => {
+      test('Exclusive', async () => {
+        const topicName = 'test-access-mode-exclusive';
+        const producer1 = await client.createProducer({
+          topic: topicName,
+          producerName: 'p-1',
+          accessMode: 'Exclusive',
+        });
+        expect(producer1.getProducerName()).toBe('p-1');
+
+        await expect(client.createProducer({
+          topic: topicName,
+          producerName: 'p-2',
+          accessMode: 'Exclusive',
+        })).rejects.toThrow('Failed to create producer: ResultProducerFenced');
+
+        await producer1.close();
+      });
+
+      test('WaitForExclusive', async () => {
+        const topicName = 'test-access-mode-wait-for-exclusive';
+        const producer1 = await client.createProducer({
+          topic: topicName,
+          producerName: 'p-1',
+          accessMode: 'Exclusive',
+        });
+        expect(producer1.getProducerName()).toBe('p-1');
+        // async close producer1
+        producer1.close();
+        // when p1 close, p2 success created.
+        const producer2 = await client.createProducer({
+          topic: topicName,
+          producerName: 'p-2',
+          accessMode: 'WaitForExclusive',
+        });
+        expect(producer2.getProducerName()).toBe('p-2');
+        await producer2.close();
+      });
+
+      test('ExclusiveWithFencing', async () => {
+        const topicName = 'test-access-mode';
+        const producer1 = await client.createProducer({
+          topic: topicName,
+          producerName: 'p-1',
+          accessMode: 'Exclusive',
+        });
+        expect(producer1.getProducerName()).toBe('p-1');
+        const producer2 = await client.createProducer({
+          topic: topicName,
+          producerName: 'p-2',
+          accessMode: 'ExclusiveWithFencing',
+        });
+        expect(producer2.getProducerName()).toBe('p-2');
+        // producer1 will be fenced.
+        await expect(
+          producer1.send({
+            data: Buffer.from('test-msg'),
+          }),
+        ).rejects.toThrow('Failed to send message: ResultProducerFenced');
+        await producer2.close();
+      });
+    });
   });
 })();

Reply via email to