This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e51f5a  feat: support pattern subscription non persistent topic. 
(#334)
1e51f5a is described below

commit 1e51f5a874e8237cda5f9f34228ee2299d94d77e
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Jun 16 14:06:07 2023 +0800

    feat: support pattern subscription non persistent topic. (#334)
---
 index.d.ts             |  6 ++++++
 src/ConsumerConfig.cc  | 16 ++++++++++++++++
 tests/consumer.test.js | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 72 insertions(+)

diff --git a/index.d.ts b/index.d.ts
index 161dabc..bd47146 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -96,6 +96,7 @@ export interface ConsumerConfig {
   autoAckOldestChunkedMessageOnQueueFull?: number;
   schema?: SchemaInfo;
   batchIndexAckEnabled?: boolean;
+  regexSubscriptionMode?: RegexSubscriptionMode;
 }
 
 export class Consumer {
@@ -251,6 +252,11 @@ export type ConsumerCryptoFailureAction =
   'DISCARD' |
   'CONSUME';
 
+export type RegexSubscriptionMode =
+  'PersistentOnly' |
+  'NonPersistentOnly' |
+  'AllTopics';
+
 export type SchemaType =
   'None' |
   'String' |
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index 69f97d9..be646ae 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -31,6 +31,7 @@ static const std::string CFG_TOPICS_PATTERN = "topicsPattern";
 static const std::string CFG_SUBSCRIPTION = "subscription";
 static const std::string CFG_SUBSCRIPTION_TYPE = "subscriptionType";
 static const std::string CFG_INIT_POSITION = "subscriptionInitialPosition";
+static const std::string CFG_REGEX_SUBSCRIPTION_MODE = "regexSubscriptionMode";
 static const std::string CFG_ACK_TIMEOUT = "ackTimeoutMs";
 static const std::string CFG_NACK_REDELIVER_TIMEOUT = "nAckRedeliverTimeoutMs";
 static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
@@ -53,6 +54,11 @@ static const std::map<std::string, pulsar_consumer_type> 
SUBSCRIPTION_TYPE = {
     {"KeyShared", pulsar_ConsumerKeyShared},
     {"Failover", pulsar_ConsumerFailover}};
 
+static const std::map<std::string, pulsar_consumer_regex_subscription_mode> 
REGEX_SUBSCRIPTION_MODE = {
+    {"PersistentOnly", pulsar_consumer_regex_sub_mode_PersistentOnly},
+    {"NonPersistentOnly", pulsar_consumer_regex_sub_mode_NonPersistentOnly},
+    {"AllTopics", pulsar_consumer_regex_sub_mode_AllTopics}};
+
 static const std::map<std::string, initial_position> INIT_POSITION = {
     {"Latest", initial_position_latest}, {"Earliest", 
initial_position_earliest}};
 
@@ -111,6 +117,16 @@ ConsumerConfig::ConsumerConfig(const Napi::Object 
&consumerConfig, pulsar_messag
     }
   }
 
+  if (consumerConfig.Has(CFG_REGEX_SUBSCRIPTION_MODE) &&
+      consumerConfig.Get(CFG_REGEX_SUBSCRIPTION_MODE).IsString()) {
+    std::string regexSubscriptionMode =
+        consumerConfig.Get(CFG_REGEX_SUBSCRIPTION_MODE).ToString().Utf8Value();
+    if (REGEX_SUBSCRIPTION_MODE.count(regexSubscriptionMode)) {
+      pulsar_consumer_configuration_set_regex_subscription_mode(
+          this->cConsumerConfig.get(), 
REGEX_SUBSCRIPTION_MODE.at(regexSubscriptionMode));
+    }
+  }
+
   if (consumerConfig.Has(CFG_CONSUMER_NAME) && 
consumerConfig.Get(CFG_CONSUMER_NAME).IsString()) {
     std::string consumerName = 
consumerConfig.Get(CFG_CONSUMER_NAME).ToString().Utf8Value();
     if (!consumerName.empty())
diff --git a/tests/consumer.test.js b/tests/consumer.test.js
index 828cb88..d976cb2 100644
--- a/tests/consumer.test.js
+++ b/tests/consumer.test.js
@@ -197,6 +197,56 @@ const Pulsar = require('../index.js');
           'Failed to receive message: TimeOut',
         );
       });
+
+      test('Regex subscription', async () => {
+        const topicName1 = 'persistent://public/default/regex-sub-1';
+        const topicName2 = 'persistent://public/default/regex-sub-2';
+        const topicName3 = 'non-persistent://public/default/regex-sub-3';
+        const topicName4 = 'persistent://public/default/no-match-regex-sub-2';
+        const producer1 = await client.createProducer({
+          topic: topicName1,
+        });
+        const producer2 = await client.createProducer({
+          topic: topicName2,
+        });
+        const producer3 = await client.createProducer({
+          topic: topicName3,
+        });
+        const producer4 = await client.createProducer({
+          topic: topicName4,
+        });
+
+        const consumer = await client.subscribe({
+          topicsPattern: 'persistent://public/default/regex-sub.*',
+          subscription: 'sub1',
+          subscriptionType: 'Shared',
+          regexSubscriptionMode: 'AllTopics',
+        });
+
+        const num = 10;
+        for (let i = 0; i < num; i += 1) {
+          const msg = `my-message-${i}`;
+          await producer1.send({ data: Buffer.from(msg) });
+          await producer2.send({ data: Buffer.from(msg) });
+          await producer3.send({ data: Buffer.from(msg) });
+          await producer4.send({ data: Buffer.from(msg) });
+        }
+        const results = [];
+        for (let i = 0; i < 3 * num; i += 1) {
+          const msg = await consumer.receive();
+          results.push(msg.getData().toString());
+        }
+        expect(results.length).toEqual(3 * num);
+        // assert no more msgs.
+        await expect(consumer.receive(1000)).rejects.toThrow(
+          'Failed to receive message: TimeOut',
+        );
+        await producer1.close();
+        await producer2.close();
+        await producer3.close();
+        await producer4.close();
+        await consumer.close();
+      });
     });
   });
 })();

Reply via email to