This is an automated email from the ASF dual-hosted git repository. nkurihar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push: new ae91394 Add negative acknowledgement support for consumer (#59) ae91394 is described below commit ae91394e89501aeb4edba71cb67c0c22301eb4a6 Author: Fernando Rejon Barrera <39321865+frej...@users.noreply.github.com> AuthorDate: Wed Nov 27 03:39:15 2019 +0100 Add negative acknowledgement support for consumer (#59) * add nack redeliver timeout declaration * implmement nack redeliver timeout * add nack to consumer test * add negative acknowledge e2e test * add negative acknowledge declarations * add negative acknowledge definitions --- src/Consumer.cc | 20 ++++++++++++++++++++ src/Consumer.h | 2 ++ src/ConsumerConfig.cc | 11 ++++++++++- src/ConsumerConfig.h | 2 ++ tests/consumer.test.js | 15 +++++++++++++++ tests/end_to_end.test.js | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 96 insertions(+), 1 deletion(-) diff --git a/src/Consumer.cc b/src/Consumer.cc index 12bba8e..4726402 100644 --- a/src/Consumer.cc +++ b/src/Consumer.cc @@ -34,6 +34,8 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) { InstanceMethod("receive", &Consumer::Receive), InstanceMethod("acknowledge", &Consumer::Acknowledge), InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId), + InstanceMethod("negativeAcknowledge", &Consumer::NegativeAcknowledge), + InstanceMethod("negativeAcknowledgeId", &Consumer::NegativeAcknowledgeId), InstanceMethod("acknowledgeCumulative", &Consumer::AcknowledgeCumulative), InstanceMethod("acknowledgeCumulativeId", &Consumer::AcknowledgeCumulativeId), InstanceMethod("close", &Consumer::Close), @@ -75,6 +77,12 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker { SetError(msg); return; } + int32_t nAckRedeliverTimeoutMs = this->consumerConfig->GetNAckRedeliverTimeoutMs(); + if (nAckRedeliverTimeoutMs < 0) { + std::string msg("NAck timeout should be greater than or equal to zero"); + SetError(msg); + return; + } pulsar_result result = pulsar_client_subscribe(this->cClient, topic.c_str(), subscription.c_str(), @@ -168,6 +176,18 @@ void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) { pulsar_consumer_acknowledge_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL); } +void Consumer::NegativeAcknowledge(const Napi::CallbackInfo &info) { + Napi::Object obj = info[0].As<Napi::Object>(); + Message *msg = Message::Unwrap(obj); + pulsar_consumer_negative_acknowledge(this->cConsumer, msg->GetCMessage()); +} + +void Consumer::NegativeAcknowledgeId(const Napi::CallbackInfo &info) { + Napi::Object obj = info[0].As<Napi::Object>(); + MessageId *msgId = MessageId::Unwrap(obj); + pulsar_consumer_negative_acknowledge_id(this->cConsumer, msgId->GetCMessageId()); +} + void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) { Napi::Object obj = info[0].As<Napi::Object>(); Message *msg = Message::Unwrap(obj); diff --git a/src/Consumer.h b/src/Consumer.h index 7aa41d9..c6373d0 100644 --- a/src/Consumer.h +++ b/src/Consumer.h @@ -38,6 +38,8 @@ class Consumer : public Napi::ObjectWrap<Consumer> { Napi::Value Receive(const Napi::CallbackInfo &info); void Acknowledge(const Napi::CallbackInfo &info); void AcknowledgeId(const Napi::CallbackInfo &info); + void NegativeAcknowledge(const Napi::CallbackInfo &info); + void NegativeAcknowledgeId(const Napi::CallbackInfo &info); void AcknowledgeCumulative(const Napi::CallbackInfo &info); void AcknowledgeCumulativeId(const Napi::CallbackInfo &info); Napi::Value Close(const Napi::CallbackInfo &info); diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc index a98bb69..b07bef2 100644 --- a/src/ConsumerConfig.cc +++ b/src/ConsumerConfig.cc @@ -25,6 +25,7 @@ static const std::string CFG_TOPIC = "topic"; static const std::string CFG_SUBSCRIPTION = "subscription"; static const std::string CFG_SUBSCRIPTION_TYPE = "subscriptionType"; static const std::string CFG_ACK_TIMEOUT = "ackTimeoutMs"; +static const std::string CFG_NACK_REDELIVER_TIMEOUT = "nAckRedeliverTimeoutMs"; static const std::string CFG_RECV_QUEUE = "receiverQueueSize"; static const std::string CFG_RECV_QUEUE_ACROSS_PARTITIONS = "receiverQueueSizeAcrossPartitions"; static const std::string CFG_CONSUMER_NAME = "consumerName"; @@ -36,7 +37,7 @@ static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = { {"Failover", pulsar_ConsumerFailover}}; ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig) - : topic(""), subscription(""), ackTimeoutMs(0) { + : topic(""), subscription(""), ackTimeoutMs(0), nAckRedeliverTimeoutMs(60000) { this->cConsumerConfig = pulsar_consumer_configuration_create(); if (consumerConfig.Has(CFG_TOPIC) && consumerConfig.Get(CFG_TOPIC).IsString()) { @@ -67,6 +68,13 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig) } } + if (consumerConfig.Has(CFG_NACK_REDELIVER_TIMEOUT) && consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).IsNumber()) { + this->nAckRedeliverTimeoutMs = consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).ToNumber().Int64Value(); + if (this->nAckRedeliverTimeoutMs >= 0) { + pulsar_configure_set_negative_ack_redelivery_delay_ms(this->cConsumerConfig, this->nAckRedeliverTimeoutMs); + } + } + if (consumerConfig.Has(CFG_RECV_QUEUE) && consumerConfig.Get(CFG_RECV_QUEUE).IsNumber()) { int32_t receiverQueueSize = consumerConfig.Get(CFG_RECV_QUEUE).ToNumber().Int32Value(); if (receiverQueueSize >= 0) { @@ -103,3 +111,4 @@ pulsar_consumer_configuration_t *ConsumerConfig::GetCConsumerConfig() { return t std::string ConsumerConfig::GetTopic() { return this->topic; } std::string ConsumerConfig::GetSubscription() { return this->subscription; } int64_t ConsumerConfig::GetAckTimeoutMs() { return this->ackTimeoutMs; } +int64_t ConsumerConfig::GetNAckRedeliverTimeoutMs() { return this->nAckRedeliverTimeoutMs; } diff --git a/src/ConsumerConfig.h b/src/ConsumerConfig.h index 7070bf9..f5dcce8 100644 --- a/src/ConsumerConfig.h +++ b/src/ConsumerConfig.h @@ -33,12 +33,14 @@ class ConsumerConfig { std::string GetTopic(); std::string GetSubscription(); int64_t GetAckTimeoutMs(); + int64_t GetNAckRedeliverTimeoutMs(); private: pulsar_consumer_configuration_t *cConsumerConfig; std::string topic; std::string subscription; int64_t ackTimeoutMs; + int64_t nAckRedeliverTimeoutMs; }; #endif diff --git a/tests/consumer.test.js b/tests/consumer.test.js index c008cac..65bd792 100644 --- a/tests/consumer.test.js +++ b/tests/consumer.test.js @@ -30,6 +30,7 @@ const Pulsar = require('../index.js'); await expect(client.subscribe({ subscription: 'sub1', ackTimeoutMs: 10000, + nAckRedeliverTimeoutMs: 60000, })).rejects.toThrow('Topic is required and must be specified as a string when creating consumer'); }); @@ -38,6 +39,7 @@ const Pulsar = require('../index.js'); topic: 0, subscription: 'sub1', ackTimeoutMs: 10000, + nAckRedeliverTimeoutMs: 60000, })).rejects.toThrow('Topic is required and must be specified as a string when creating consumer'); }); @@ -45,6 +47,7 @@ const Pulsar = require('../index.js'); await expect(client.subscribe({ topic: 'persistent://public/default/t1', ackTimeoutMs: 10000, + nAckRedeliverTimeoutMs: 60000, })).rejects.toThrow('Subscription is required and must be specified as a string when creating consumer'); }); @@ -53,6 +56,7 @@ const Pulsar = require('../index.js'); topic: 'persistent://public/default/t1', subscription: 0, ackTimeoutMs: 10000, + nAckRedeliverTimeoutMs: 60000, })).rejects.toThrow('Subscription is required and must be specified as a string when creating consumer'); }); @@ -61,6 +65,7 @@ const Pulsar = require('../index.js'); topic: 'persistent://no-tenant/namespace/topic', subscription: 'sub1', ackTimeoutMs: 10000, + nAckRedeliverTimeoutMs: 60000, })).rejects.toThrow('Failed to create consumer: ConnectError'); }); @@ -69,8 +74,18 @@ const Pulsar = require('../index.js'); topic: 'persistent://public/no-namespace/topic', subscription: 'sub1', ackTimeoutMs: 10000, + nAckRedeliverTimeoutMs: 60000, })).rejects.toThrow('Failed to create consumer: ConnectError'); }); + + test('Not Positive NAckRedeliverTimeout', async () => { + await expect(client.subscribe({ + topic: 'persistent://public/default/t1', + subscription: 'sub1', + ackTimeoutMs: 10000, + nAckRedeliverTimeoutMs: -12, + })).rejects.toThrow('NAck timeout should be greater than or equal to zero'); + }); }); }); })(); diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js index 477e57f..0a8ce6f 100644 --- a/tests/end_to_end.test.js +++ b/tests/end_to_end.test.js @@ -67,6 +67,53 @@ const Pulsar = require('../index.js'); await client.close(); }); + test('negativeAcknowledge', async () => { + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); + + const topic = 'persistent://public/default/produce-consume'; + const producer = await client.createProducer({ + topic, + sendTimeoutMs: 30000, + batchingEnabled: true, + }); + expect(producer).not.toBeNull(); + + const consumer = await client.subscribe({ + topic, + subscription: 'sub1', + ackTimeoutMs: 10000, + nAckRedeliverTimeoutMs: 1000, + }); + + expect(consumer).not.toBeNull(); + + const message = 'my-message'; + producer.send({ + data: Buffer.from(message), + }); + await producer.flush(); + + const results = []; + const msg = await consumer.receive(); + results.push(msg.getData().toString()); + consumer.negativeAcknowledge(msg); + + const msg2 = await consumer.receive(); + results.push(msg2.getData().toString()); + consumer.acknowledge(msg2); + + await expect(consumer.receive(1000)).rejects.toThrow('Failed to received message TimeOut'); + + expect(results).toEqual([message, message]); + + await producer.close(); + await consumer.close(); + await client.close(); + }); + test('acknowledgeCumulative', async () => { const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650',