This is an automated email from the ASF dual-hosted git repository.

nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new ae91394  Add negative acknowledgement support for consumer (#59)
ae91394 is described below

commit ae91394e89501aeb4edba71cb67c0c22301eb4a6
Author: Fernando Rejon Barrera <39321865+frej...@users.noreply.github.com>
AuthorDate: Wed Nov 27 03:39:15 2019 +0100

    Add negative acknowledgement support for consumer (#59)
    
    * add nack redeliver timeout declaration
    
    * implmement nack redeliver timeout
    
    * add nack to consumer test
    
    * add negative acknowledge e2e test
    
    * add negative acknowledge declarations
    
    * add negative acknowledge definitions
---
 src/Consumer.cc          | 20 ++++++++++++++++++++
 src/Consumer.h           |  2 ++
 src/ConsumerConfig.cc    | 11 ++++++++++-
 src/ConsumerConfig.h     |  2 ++
 tests/consumer.test.js   | 15 +++++++++++++++
 tests/end_to_end.test.js | 47 +++++++++++++++++++++++++++++++++++++++++++++++
 6 files changed, 96 insertions(+), 1 deletion(-)

diff --git a/src/Consumer.cc b/src/Consumer.cc
index 12bba8e..4726402 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -34,6 +34,8 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
                       InstanceMethod("receive", &Consumer::Receive),
                       InstanceMethod("acknowledge", &Consumer::Acknowledge),
                       InstanceMethod("acknowledgeId", 
&Consumer::AcknowledgeId),
+                      InstanceMethod("negativeAcknowledge", 
&Consumer::NegativeAcknowledge),
+                      InstanceMethod("negativeAcknowledgeId", 
&Consumer::NegativeAcknowledgeId),
                       InstanceMethod("acknowledgeCumulative", 
&Consumer::AcknowledgeCumulative),
                       InstanceMethod("acknowledgeCumulativeId", 
&Consumer::AcknowledgeCumulativeId),
                       InstanceMethod("close", &Consumer::Close),
@@ -75,6 +77,12 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker {
       SetError(msg);
       return;
     }
+    int32_t nAckRedeliverTimeoutMs = 
this->consumerConfig->GetNAckRedeliverTimeoutMs();
+    if (nAckRedeliverTimeoutMs < 0) {
+      std::string msg("NAck timeout should be greater than or equal to zero");
+      SetError(msg);
+      return;
+    }
 
     pulsar_result result =
         pulsar_client_subscribe(this->cClient, topic.c_str(), 
subscription.c_str(),
@@ -168,6 +176,18 @@ void Consumer::AcknowledgeId(const Napi::CallbackInfo 
&info) {
   pulsar_consumer_acknowledge_async_id(this->cConsumer, 
msgId->GetCMessageId(), NULL, NULL);
 }
 
+void Consumer::NegativeAcknowledge(const Napi::CallbackInfo &info) {
+  Napi::Object obj = info[0].As<Napi::Object>();
+  Message *msg = Message::Unwrap(obj);
+  pulsar_consumer_negative_acknowledge(this->cConsumer, msg->GetCMessage());
+}
+
+void Consumer::NegativeAcknowledgeId(const Napi::CallbackInfo &info) {
+  Napi::Object obj = info[0].As<Napi::Object>();
+  MessageId *msgId = MessageId::Unwrap(obj);
+  pulsar_consumer_negative_acknowledge_id(this->cConsumer, 
msgId->GetCMessageId());
+}
+
 void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
   Napi::Object obj = info[0].As<Napi::Object>();
   Message *msg = Message::Unwrap(obj);
diff --git a/src/Consumer.h b/src/Consumer.h
index 7aa41d9..c6373d0 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -38,6 +38,8 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
   Napi::Value Receive(const Napi::CallbackInfo &info);
   void Acknowledge(const Napi::CallbackInfo &info);
   void AcknowledgeId(const Napi::CallbackInfo &info);
+  void NegativeAcknowledge(const Napi::CallbackInfo &info);
+  void NegativeAcknowledgeId(const Napi::CallbackInfo &info);
   void AcknowledgeCumulative(const Napi::CallbackInfo &info);
   void AcknowledgeCumulativeId(const Napi::CallbackInfo &info);
   Napi::Value Close(const Napi::CallbackInfo &info);
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index a98bb69..b07bef2 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -25,6 +25,7 @@ static const std::string CFG_TOPIC = "topic";
 static const std::string CFG_SUBSCRIPTION = "subscription";
 static const std::string CFG_SUBSCRIPTION_TYPE = "subscriptionType";
 static const std::string CFG_ACK_TIMEOUT = "ackTimeoutMs";
+static const std::string CFG_NACK_REDELIVER_TIMEOUT = "nAckRedeliverTimeoutMs";
 static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
 static const std::string CFG_RECV_QUEUE_ACROSS_PARTITIONS = 
"receiverQueueSizeAcrossPartitions";
 static const std::string CFG_CONSUMER_NAME = "consumerName";
@@ -36,7 +37,7 @@ static const std::map<std::string, pulsar_consumer_type> 
SUBSCRIPTION_TYPE = {
     {"Failover", pulsar_ConsumerFailover}};
 
 ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig)
-    : topic(""), subscription(""), ackTimeoutMs(0) {
+    : topic(""), subscription(""), ackTimeoutMs(0), 
nAckRedeliverTimeoutMs(60000) {
   this->cConsumerConfig = pulsar_consumer_configuration_create();
 
   if (consumerConfig.Has(CFG_TOPIC) && 
consumerConfig.Get(CFG_TOPIC).IsString()) {
@@ -67,6 +68,13 @@ ConsumerConfig::ConsumerConfig(const Napi::Object 
&consumerConfig)
     }
   }
 
+  if (consumerConfig.Has(CFG_NACK_REDELIVER_TIMEOUT) && 
consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).IsNumber()) {
+    this->nAckRedeliverTimeoutMs = 
consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).ToNumber().Int64Value();
+    if (this->nAckRedeliverTimeoutMs >= 0) {
+      
pulsar_configure_set_negative_ack_redelivery_delay_ms(this->cConsumerConfig, 
this->nAckRedeliverTimeoutMs);
+    }
+  }
+
   if (consumerConfig.Has(CFG_RECV_QUEUE) && 
consumerConfig.Get(CFG_RECV_QUEUE).IsNumber()) {
     int32_t receiverQueueSize = 
consumerConfig.Get(CFG_RECV_QUEUE).ToNumber().Int32Value();
     if (receiverQueueSize >= 0) {
@@ -103,3 +111,4 @@ pulsar_consumer_configuration_t 
*ConsumerConfig::GetCConsumerConfig() { return t
 std::string ConsumerConfig::GetTopic() { return this->topic; }
 std::string ConsumerConfig::GetSubscription() { return this->subscription; }
 int64_t ConsumerConfig::GetAckTimeoutMs() { return this->ackTimeoutMs; }
+int64_t ConsumerConfig::GetNAckRedeliverTimeoutMs() { return 
this->nAckRedeliverTimeoutMs; }
diff --git a/src/ConsumerConfig.h b/src/ConsumerConfig.h
index 7070bf9..f5dcce8 100644
--- a/src/ConsumerConfig.h
+++ b/src/ConsumerConfig.h
@@ -33,12 +33,14 @@ class ConsumerConfig {
   std::string GetTopic();
   std::string GetSubscription();
   int64_t GetAckTimeoutMs();
+  int64_t GetNAckRedeliverTimeoutMs();
 
  private:
   pulsar_consumer_configuration_t *cConsumerConfig;
   std::string topic;
   std::string subscription;
   int64_t ackTimeoutMs;
+  int64_t nAckRedeliverTimeoutMs;
 };
 
 #endif
diff --git a/tests/consumer.test.js b/tests/consumer.test.js
index c008cac..65bd792 100644
--- a/tests/consumer.test.js
+++ b/tests/consumer.test.js
@@ -30,6 +30,7 @@ const Pulsar = require('../index.js');
         await expect(client.subscribe({
           subscription: 'sub1',
           ackTimeoutMs: 10000,
+          nAckRedeliverTimeoutMs: 60000,
         })).rejects.toThrow('Topic is required and must be specified as a 
string when creating consumer');
       });
 
@@ -38,6 +39,7 @@ const Pulsar = require('../index.js');
           topic: 0,
           subscription: 'sub1',
           ackTimeoutMs: 10000,
+          nAckRedeliverTimeoutMs: 60000,
         })).rejects.toThrow('Topic is required and must be specified as a 
string when creating consumer');
       });
 
@@ -45,6 +47,7 @@ const Pulsar = require('../index.js');
         await expect(client.subscribe({
           topic: 'persistent://public/default/t1',
           ackTimeoutMs: 10000,
+          nAckRedeliverTimeoutMs: 60000,
         })).rejects.toThrow('Subscription is required and must be specified as 
a string when creating consumer');
       });
 
@@ -53,6 +56,7 @@ const Pulsar = require('../index.js');
           topic: 'persistent://public/default/t1',
           subscription: 0,
           ackTimeoutMs: 10000,
+          nAckRedeliverTimeoutMs: 60000,
         })).rejects.toThrow('Subscription is required and must be specified as 
a string when creating consumer');
       });
 
@@ -61,6 +65,7 @@ const Pulsar = require('../index.js');
           topic: 'persistent://no-tenant/namespace/topic',
           subscription: 'sub1',
           ackTimeoutMs: 10000,
+          nAckRedeliverTimeoutMs: 60000,
         })).rejects.toThrow('Failed to create consumer: ConnectError');
       });
 
@@ -69,8 +74,18 @@ const Pulsar = require('../index.js');
           topic: 'persistent://public/no-namespace/topic',
           subscription: 'sub1',
           ackTimeoutMs: 10000,
+          nAckRedeliverTimeoutMs: 60000,
         })).rejects.toThrow('Failed to create consumer: ConnectError');
       });
+
+      test('Not Positive NAckRedeliverTimeout', async () => {
+        await expect(client.subscribe({
+          topic: 'persistent://public/default/t1',
+          subscription: 'sub1',
+          ackTimeoutMs: 10000,
+          nAckRedeliverTimeoutMs: -12,
+        })).rejects.toThrow('NAck timeout should be greater than or equal to 
zero');
+      });
     });
   });
 })();
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 477e57f..0a8ce6f 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -67,6 +67,53 @@ const Pulsar = require('../index.js');
       await client.close();
     });
 
+    test('negativeAcknowledge', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'persistent://public/default/produce-consume';
+      const producer = await client.createProducer({
+        topic,
+        sendTimeoutMs: 30000,
+        batchingEnabled: true,
+      });
+      expect(producer).not.toBeNull();
+
+      const consumer = await client.subscribe({
+        topic,
+        subscription: 'sub1',
+        ackTimeoutMs: 10000,
+        nAckRedeliverTimeoutMs: 1000,
+      });
+
+      expect(consumer).not.toBeNull();
+
+      const message = 'my-message';
+      producer.send({
+        data: Buffer.from(message),
+      });
+      await producer.flush();
+
+      const results = [];
+      const msg = await consumer.receive();
+      results.push(msg.getData().toString());
+      consumer.negativeAcknowledge(msg);
+
+      const msg2 = await consumer.receive();
+      results.push(msg2.getData().toString());
+      consumer.acknowledge(msg2);
+
+      await expect(consumer.receive(1000)).rejects.toThrow('Failed to received 
message TimeOut');
+
+      expect(results).toEqual([message, message]);
+
+      await producer.close();
+      await consumer.close();
+      await client.close();
+    });
+
     test('acknowledgeCumulative', async () => {
       const client = new Pulsar.Client({
         serviceUrl: 'pulsar://localhost:6650',

Reply via email to