massakam commented on a change in pull request #200:
URL: https://github.com/apache/pulsar-client-node/pull/200#discussion_r825670362



##########
File path: src/Consumer.cc
##########
@@ -243,160 +235,229 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
 
  private:
   Napi::Promise::Deferred deferred;
-  pulsar_consumer_t *cConsumer;
-  pulsar_message_t *cMessage;
+  std::shared_ptr<pulsar_consumer_t> cConsumer;
+  std::shared_ptr<pulsar_message_t> cMessage;
   int64_t timeout;
 };
 
 Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
   Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
   if (info[0].IsUndefined()) {
-    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, 
this->wrapper->cConsumer);
+    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, 
this->cConsumer);
     wk->Queue();
   } else {
     Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
-    ConsumerReceiveWorker *wk =
-        new ConsumerReceiveWorker(deferred, this->wrapper->cConsumer, 
timeout.Int64Value());
+    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, 
this->cConsumer, timeout.Int64Value());
     wk->Queue();
   }
   return deferred.Promise();
 }
 
-void Consumer::Acknowledge(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  Message *msg = Message::Unwrap(obj);
-  pulsar_consumer_acknowledge_async(this->wrapper->cConsumer, 
msg->GetCMessage(), NULL, NULL);
+Napi::Value Consumer::Acknowledge(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto msg = Message::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_async(
+      this->cConsumer.get(), msg->GetCMessage().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to consumer acknowledge: ") + 
pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
-void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  MessageId *msgId = MessageId::Unwrap(obj);
-  pulsar_consumer_acknowledge_async_id(this->wrapper->cConsumer, 
msgId->GetCMessageId(), NULL, NULL);
+Napi::Value Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto *msgId = MessageId::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_async_id(
+      this->cConsumer.get(), msgId->GetCMessageId().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to consumer acknowledge id: ") 
+ pulsar_result_str(result));

Review comment:
       ```suggestion
             deferred->Reject(std::string("Failed to acknowledge by id: ") + 
pulsar_result_str(result));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to