This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 1e51f5a feat: support pattern subscription non persistent topic.
(#334)
1e51f5a is described below
commit 1e51f5a874e8237cda5f9f34228ee2299d94d77e
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Jun 16 14:06:07 2023 +0800
feat: support pattern subscription non persistent topic. (#334)
---
index.d.ts | 6 ++++++
src/ConsumerConfig.cc | 16 ++++++++++++++++
tests/consumer.test.js | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 72 insertions(+)
diff --git a/index.d.ts b/index.d.ts
index 161dabc..bd47146 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -96,6 +96,7 @@ export interface ConsumerConfig {
autoAckOldestChunkedMessageOnQueueFull?: number;
schema?: SchemaInfo;
batchIndexAckEnabled?: boolean;
+ regexSubscriptionMode?: RegexSubscriptionMode;
}
export class Consumer {
@@ -251,6 +252,11 @@ export type ConsumerCryptoFailureAction =
'DISCARD' |
'CONSUME';
+export type RegexSubscriptionMode =
+ 'PersistentOnly' |
+ 'NonPersistentOnly' |
+ 'AllTopics';
+
export type SchemaType =
'None' |
'String' |
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index 69f97d9..be646ae 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -31,6 +31,7 @@ static const std::string CFG_TOPICS_PATTERN = "topicsPattern";
static const std::string CFG_SUBSCRIPTION = "subscription";
static const std::string CFG_SUBSCRIPTION_TYPE = "subscriptionType";
static const std::string CFG_INIT_POSITION = "subscriptionInitialPosition";
+static const std::string CFG_REGEX_SUBSCRIPTION_MODE = "regexSubscriptionMode";
static const std::string CFG_ACK_TIMEOUT = "ackTimeoutMs";
static const std::string CFG_NACK_REDELIVER_TIMEOUT = "nAckRedeliverTimeoutMs";
static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
@@ -53,6 +54,11 @@ static const std::map<std::string, pulsar_consumer_type>
SUBSCRIPTION_TYPE = {
{"KeyShared", pulsar_ConsumerKeyShared},
{"Failover", pulsar_ConsumerFailover}};
+static const std::map<std::string, pulsar_consumer_regex_subscription_mode>
REGEX_SUBSCRIPTION_MODE = {
+ {"PersistentOnly", pulsar_consumer_regex_sub_mode_PersistentOnly},
+ {"NonPersistentOnly", pulsar_consumer_regex_sub_mode_NonPersistentOnly},
+ {"AllTopics", pulsar_consumer_regex_sub_mode_AllTopics}};
+
static const std::map<std::string, initial_position> INIT_POSITION = {
{"Latest", initial_position_latest}, {"Earliest",
initial_position_earliest}};
@@ -111,6 +117,16 @@ ConsumerConfig::ConsumerConfig(const Napi::Object
&consumerConfig, pulsar_messag
}
}
+ if (consumerConfig.Has(CFG_REGEX_SUBSCRIPTION_MODE) &&
+ consumerConfig.Get(CFG_REGEX_SUBSCRIPTION_MODE).IsString()) {
+ std::string regexSubscriptionMode =
+ consumerConfig.Get(CFG_REGEX_SUBSCRIPTION_MODE).ToString().Utf8Value();
+ if (REGEX_SUBSCRIPTION_MODE.count(regexSubscriptionMode)) {
+ pulsar_consumer_configuration_set_regex_subscription_mode(
+ this->cConsumerConfig.get(),
REGEX_SUBSCRIPTION_MODE.at(regexSubscriptionMode));
+ }
+ }
+
if (consumerConfig.Has(CFG_CONSUMER_NAME) &&
consumerConfig.Get(CFG_CONSUMER_NAME).IsString()) {
std::string consumerName =
consumerConfig.Get(CFG_CONSUMER_NAME).ToString().Utf8Value();
if (!consumerName.empty())
diff --git a/tests/consumer.test.js b/tests/consumer.test.js
index 828cb88..d976cb2 100644
--- a/tests/consumer.test.js
+++ b/tests/consumer.test.js
@@ -197,6 +197,56 @@ const Pulsar = require('../index.js');
'Failed to receive message: TimeOut',
);
});
+
+ test('Regex subscription', async () => {
+ const topicName1 = 'persistent://public/default/regex-sub-1';
+ const topicName2 = 'persistent://public/default/regex-sub-2';
+ const topicName3 = 'non-persistent://public/default/regex-sub-3';
+ const topicName4 = 'persistent://public/default/no-match-regex-sub-2';
+ const producer1 = await client.createProducer({
+ topic: topicName1,
+ });
+ const producer2 = await client.createProducer({
+ topic: topicName2,
+ });
+ const producer3 = await client.createProducer({
+ topic: topicName3,
+ });
+ const producer4 = await client.createProducer({
+ topic: topicName4,
+ });
+
+ const consumer = await client.subscribe({
+ topicsPattern: 'persistent://public/default/regex-sub.*',
+ subscription: 'sub1',
+ subscriptionType: 'Shared',
+ regexSubscriptionMode: 'AllTopics',
+ });
+
+ const num = 10;
+ for (let i = 0; i < num; i += 1) {
+ const msg = `my-message-${i}`;
+ await producer1.send({ data: Buffer.from(msg) });
+ await producer2.send({ data: Buffer.from(msg) });
+ await producer3.send({ data: Buffer.from(msg) });
+ await producer4.send({ data: Buffer.from(msg) });
+ }
+ const results = [];
+ for (let i = 0; i < 3 * num; i += 1) {
+ const msg = await consumer.receive();
+ results.push(msg.getData().toString());
+ }
+ expect(results.length).toEqual(3 * num);
+ // assert no more msgs.
+ await expect(consumer.receive(1000)).rejects.toThrow(
+ 'Failed to receive message: TimeOut',
+ );
+ await producer1.close();
+ await producer2.close();
+ await producer3.close();
+ await producer4.close();
+ await consumer.close();
+ });
});
});
})();