Matt-Esch commented on a change in pull request #200:
URL: https://github.com/apache/pulsar-client-node/pull/200#discussion_r833189580



##########
File path: src/Consumer.cc
##########
@@ -243,160 +235,225 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
 
  private:
   Napi::Promise::Deferred deferred;
-  pulsar_consumer_t *cConsumer;
-  pulsar_message_t *cMessage;
+  std::shared_ptr<pulsar_consumer_t> cConsumer;
+  std::shared_ptr<pulsar_message_t> cMessage;
   int64_t timeout;
 };
 
 Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
   Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
   if (info[0].IsUndefined()) {
-    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, 
this->wrapper->cConsumer);
+    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, 
this->cConsumer);
     wk->Queue();
   } else {
     Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
-    ConsumerReceiveWorker *wk =
-        new ConsumerReceiveWorker(deferred, this->wrapper->cConsumer, 
timeout.Int64Value());
+    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, 
this->cConsumer, timeout.Int64Value());
     wk->Queue();
   }
   return deferred.Promise();
 }
 
-void Consumer::Acknowledge(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  Message *msg = Message::Unwrap(obj);
-  pulsar_consumer_acknowledge_async(this->wrapper->cConsumer, 
msg->GetCMessage(), NULL, NULL);
+Napi::Value Consumer::Acknowledge(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto msg = Message::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_async(
+      this->cConsumer.get(), msg->GetCMessage().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to acknowledge: ") + 
pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
-void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  MessageId *msgId = MessageId::Unwrap(obj);
-  pulsar_consumer_acknowledge_async_id(this->wrapper->cConsumer, 
msgId->GetCMessageId(), NULL, NULL);
+Napi::Value Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto *msgId = MessageId::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_async_id(
+      this->cConsumer.get(), msgId->GetCMessageId().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to acknowledge id: ") + 
pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
 void Consumer::NegativeAcknowledge(const Napi::CallbackInfo &info) {
   Napi::Object obj = info[0].As<Napi::Object>();
   Message *msg = Message::Unwrap(obj);
-  pulsar_consumer_negative_acknowledge(this->wrapper->cConsumer, 
msg->GetCMessage());
+  std::shared_ptr<pulsar_message_t> cMessage = msg->GetCMessage();
+  pulsar_consumer_negative_acknowledge(this->cConsumer.get(), cMessage.get());
 }
 
 void Consumer::NegativeAcknowledgeId(const Napi::CallbackInfo &info) {
   Napi::Object obj = info[0].As<Napi::Object>();
   MessageId *msgId = MessageId::Unwrap(obj);
-  pulsar_consumer_negative_acknowledge_id(this->wrapper->cConsumer, 
msgId->GetCMessageId());
+  std::shared_ptr<pulsar_message_id_t> cMessageId = msgId->GetCMessageId();
+  pulsar_consumer_negative_acknowledge_id(this->cConsumer.get(), 
cMessageId.get());
 }
 
-void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  Message *msg = Message::Unwrap(obj);
-  pulsar_consumer_acknowledge_cumulative_async(this->wrapper->cConsumer, 
msg->GetCMessage(), NULL, NULL);
+Napi::Value Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto *msg = Message::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_cumulative_async(
+      this->cConsumer.get(), msg->GetCMessage().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to acknowledge cumulatively: ") 
+ pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
-void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  MessageId *msgId = MessageId::Unwrap(obj);
-  pulsar_consumer_acknowledge_cumulative_async_id(this->wrapper->cConsumer, 
msgId->GetCMessageId(), NULL,
-                                                  NULL);
+Napi::Value Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto *msgId = MessageId::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_cumulative_async_id(
+      this->cConsumer.get(), msgId->GetCMessageId().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to acknowledge cumulatively by 
id: ") +
+                           pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
 Napi::Value Consumer::IsConnected(const Napi::CallbackInfo &info) {
   Napi::Env env = info.Env();
-  return Napi::Boolean::New(env, 
pulsar_consumer_is_connected(this->wrapper->cConsumer));
+  return Napi::Boolean::New(env, 
pulsar_consumer_is_connected(this->cConsumer.get()));
 }
 
-class ConsumerCloseWorker : public Napi::AsyncWorker {
- public:
-  ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, 
pulsar_consumer_t *cConsumer,
-                      Consumer *consumer)
-      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const 
Napi::CallbackInfo &info) {})),
-        deferred(deferred),
-        cConsumer(cConsumer),
-        consumer(consumer) {}
-
-  ~ConsumerCloseWorker() {}
-  void Execute() {
-    pulsar_consumer_pause_message_listener(this->cConsumer);
-    pulsar_result result = pulsar_consumer_close(this->cConsumer);
-    if (result != pulsar_result_Ok) {
-      SetError(pulsar_result_str(result));
-    }
-  }
-  void OnOK() {
-    this->consumer->Cleanup();
-    this->deferred.Resolve(Env().Null());
-  }
-  void OnError(const Napi::Error &e) {
-    this->deferred.Reject(
-        Napi::Error::New(Env(), std::string("Failed to close consumer: ") + 
e.Message()).Value());
-  }
-
- private:
-  Napi::Promise::Deferred deferred;
-  pulsar_consumer_t *cConsumer;
-  Consumer *consumer;
-};
-
-class ConsumerUnsubscribeWorker : public Napi::AsyncWorker {
- public:
-  ConsumerUnsubscribeWorker(const Napi::Promise::Deferred &deferred, 
pulsar_consumer_t *cConsumer,
-                            Consumer *consumer)
-      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const 
Napi::CallbackInfo &info) {})),
-        deferred(deferred),
-        cConsumer(cConsumer),
-        consumer(consumer) {}
-
-  ~ConsumerUnsubscribeWorker() {}
-  void Execute() {
-    pulsar_consumer_pause_message_listener(this->cConsumer);
-    pulsar_result result = pulsar_consumer_unsubscribe(this->cConsumer);
-    if (result != pulsar_result_Ok) {
-      SetError(pulsar_result_str(result));
-    }
-  }
-  void OnOK() {
-    this->consumer->Cleanup();
-    this->deferred.Resolve(Env().Null());
-  }
-  void OnError(const Napi::Error &e) {
-    this->deferred.Reject(
-        Napi::Error::New(Env(), std::string("Failed to unsubscribe consumer: 
") + e.Message()).Value());
-  }
-
- private:
-  Napi::Promise::Deferred deferred;
-  pulsar_consumer_t *cConsumer;
-  Consumer *consumer;
-};
-
 void Consumer::Cleanup() {
-  if (this->listener) {
-    this->CleanupListener();
+  if (this->listener != nullptr) {
+    pulsar_consumer_pause_message_listener(this->cConsumer.get());
+    this->listener->callback.Release();
+    this->listener = nullptr;
+    this->Unref();
   }
 }
 
-void Consumer::CleanupListener() {
-  pulsar_consumer_pause_message_listener(this->wrapper->cConsumer);
-  this->Unref();
-  this->listener->callback.Release();
-  this->listener = nullptr;
-}
-
 Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
-  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-  ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, 
this->wrapper->cConsumer, this);
-  wk->Queue();
-  return deferred.Promise();
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_pause_message_listener(this->cConsumer.get());
+  pulsar_consumer_close_async(
+      this->cConsumer.get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to close consumer: ") + 
pulsar_result_str(result));
+        } else {
+          self->Cleanup();
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
 Napi::Value Consumer::Unsubscribe(const Napi::CallbackInfo &info) {
-  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-  ConsumerUnsubscribeWorker *wk = new ConsumerUnsubscribeWorker(deferred, 
this->wrapper->cConsumer, this);
-  wk->Queue();
-  return deferred.Promise();
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_pause_message_listener(this->cConsumer.get());
+  pulsar_consumer_unsubscribe_async(
+      this->cConsumer.get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to unsubscribe consumer: ") + 
pulsar_result_str(result));
+        } else {
+          self->Cleanup();
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
 Consumer::~Consumer() {
-  if (this->listener) {
-    this->CleanupListener();
+  this->Cleanup();
+  while (this->Unref() != 0) {
+    // If Ref() > 0 then the process is shutting down. We must unref to prevent
+    // double free (once for the env shutdown and once for non-zero refs)
   }

Review comment:
       I'm glad you pointed that out because I think there is some 
inconsistency between versions. I've added a ref prior to the unref to ensure 
that the first unref doesn't throw. We really shouldn't need to do this at all 
but it's clearly a bug in the node api. The first destructor should reset the 
references. The reset method also frees so that doesn't help here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to