equanz commented on a change in pull request #200:
URL: https://github.com/apache/pulsar-client-node/pull/200#discussion_r832988565



##########
File path: src/Client.cc
##########
@@ -196,33 +236,27 @@ void LogMessage(pulsar_logger_level_t level, const char 
*file, int line, const c
   logCallback->callback.Release();
 }
 
-class ClientCloseWorker : public Napi::AsyncWorker {
- public:
-  ClientCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t 
*cClient)
-      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const 
Napi::CallbackInfo &info) {})),
-        deferred(deferred),
-        cClient(cClient) {}
-  ~ClientCloseWorker() {}
-  void Execute() {
-    pulsar_result result = pulsar_client_close(this->cClient);
-    if (result != pulsar_result_Ok) SetError(pulsar_result_str(result));
-  }
-  void OnOK() { this->deferred.Resolve(Env().Null()); }
-  void OnError(const Napi::Error &e) {
-    this->deferred.Reject(
-        Napi::Error::New(Env(), std::string("Failed to close client: ") + 
e.Message()).Value());
-  }
+Napi::Value Client::Close(const Napi::CallbackInfo &info) {
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Client *>(this, deferred);
+  this->Ref();
 
- private:
-  Napi::Promise::Deferred deferred;
-  pulsar_client_t *cClient;
-};
+  pulsar_client_close_async(
+      this->cClient.get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Client *> 
*>(ctx);

Review comment:
       I'm not sure, but currently, `deferredContext` is not deleted at the 
callback. Is it okay?

##########
File path: src/Consumer.cc
##########
@@ -243,160 +235,225 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
 
  private:
   Napi::Promise::Deferred deferred;
-  pulsar_consumer_t *cConsumer;
-  pulsar_message_t *cMessage;
+  std::shared_ptr<pulsar_consumer_t> cConsumer;
+  std::shared_ptr<pulsar_message_t> cMessage;
   int64_t timeout;
 };
 
 Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
   Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
   if (info[0].IsUndefined()) {
-    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, 
this->wrapper->cConsumer);
+    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, 
this->cConsumer);
     wk->Queue();
   } else {
     Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
-    ConsumerReceiveWorker *wk =
-        new ConsumerReceiveWorker(deferred, this->wrapper->cConsumer, 
timeout.Int64Value());
+    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, 
this->cConsumer, timeout.Int64Value());
     wk->Queue();
   }
   return deferred.Promise();
 }
 
-void Consumer::Acknowledge(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  Message *msg = Message::Unwrap(obj);
-  pulsar_consumer_acknowledge_async(this->wrapper->cConsumer, 
msg->GetCMessage(), NULL, NULL);
+Napi::Value Consumer::Acknowledge(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto msg = Message::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_async(
+      this->cConsumer.get(), msg->GetCMessage().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to acknowledge: ") + 
pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
-void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  MessageId *msgId = MessageId::Unwrap(obj);
-  pulsar_consumer_acknowledge_async_id(this->wrapper->cConsumer, 
msgId->GetCMessageId(), NULL, NULL);
+Napi::Value Consumer::AcknowledgeId(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto *msgId = MessageId::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_async_id(
+      this->cConsumer.get(), msgId->GetCMessageId().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to acknowledge id: ") + 
pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
 void Consumer::NegativeAcknowledge(const Napi::CallbackInfo &info) {
   Napi::Object obj = info[0].As<Napi::Object>();
   Message *msg = Message::Unwrap(obj);
-  pulsar_consumer_negative_acknowledge(this->wrapper->cConsumer, 
msg->GetCMessage());
+  std::shared_ptr<pulsar_message_t> cMessage = msg->GetCMessage();
+  pulsar_consumer_negative_acknowledge(this->cConsumer.get(), cMessage.get());
 }
 
 void Consumer::NegativeAcknowledgeId(const Napi::CallbackInfo &info) {
   Napi::Object obj = info[0].As<Napi::Object>();
   MessageId *msgId = MessageId::Unwrap(obj);
-  pulsar_consumer_negative_acknowledge_id(this->wrapper->cConsumer, 
msgId->GetCMessageId());
+  std::shared_ptr<pulsar_message_id_t> cMessageId = msgId->GetCMessageId();
+  pulsar_consumer_negative_acknowledge_id(this->cConsumer.get(), 
cMessageId.get());
 }
 
-void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  Message *msg = Message::Unwrap(obj);
-  pulsar_consumer_acknowledge_cumulative_async(this->wrapper->cConsumer, 
msg->GetCMessage(), NULL, NULL);
+Napi::Value Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto *msg = Message::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_cumulative_async(
+      this->cConsumer.get(), msg->GetCMessage().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to acknowledge cumulatively: ") 
+ pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
-void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
-  Napi::Object obj = info[0].As<Napi::Object>();
-  MessageId *msgId = MessageId::Unwrap(obj);
-  pulsar_consumer_acknowledge_cumulative_async_id(this->wrapper->cConsumer, 
msgId->GetCMessageId(), NULL,
-                                                  NULL);
+Napi::Value Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) {
+  auto obj = info[0].As<Napi::Object>();
+  auto *msgId = MessageId::Unwrap(obj);
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_acknowledge_cumulative_async_id(
+      this->cConsumer.get(), msgId->GetCMessageId().get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to acknowledge cumulatively by 
id: ") +
+                           pulsar_result_str(result));
+        } else {
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
 Napi::Value Consumer::IsConnected(const Napi::CallbackInfo &info) {
   Napi::Env env = info.Env();
-  return Napi::Boolean::New(env, 
pulsar_consumer_is_connected(this->wrapper->cConsumer));
+  return Napi::Boolean::New(env, 
pulsar_consumer_is_connected(this->cConsumer.get()));
 }
 
-class ConsumerCloseWorker : public Napi::AsyncWorker {
- public:
-  ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, 
pulsar_consumer_t *cConsumer,
-                      Consumer *consumer)
-      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const 
Napi::CallbackInfo &info) {})),
-        deferred(deferred),
-        cConsumer(cConsumer),
-        consumer(consumer) {}
-
-  ~ConsumerCloseWorker() {}
-  void Execute() {
-    pulsar_consumer_pause_message_listener(this->cConsumer);
-    pulsar_result result = pulsar_consumer_close(this->cConsumer);
-    if (result != pulsar_result_Ok) {
-      SetError(pulsar_result_str(result));
-    }
-  }
-  void OnOK() {
-    this->consumer->Cleanup();
-    this->deferred.Resolve(Env().Null());
-  }
-  void OnError(const Napi::Error &e) {
-    this->deferred.Reject(
-        Napi::Error::New(Env(), std::string("Failed to close consumer: ") + 
e.Message()).Value());
-  }
-
- private:
-  Napi::Promise::Deferred deferred;
-  pulsar_consumer_t *cConsumer;
-  Consumer *consumer;
-};
-
-class ConsumerUnsubscribeWorker : public Napi::AsyncWorker {
- public:
-  ConsumerUnsubscribeWorker(const Napi::Promise::Deferred &deferred, 
pulsar_consumer_t *cConsumer,
-                            Consumer *consumer)
-      : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const 
Napi::CallbackInfo &info) {})),
-        deferred(deferred),
-        cConsumer(cConsumer),
-        consumer(consumer) {}
-
-  ~ConsumerUnsubscribeWorker() {}
-  void Execute() {
-    pulsar_consumer_pause_message_listener(this->cConsumer);
-    pulsar_result result = pulsar_consumer_unsubscribe(this->cConsumer);
-    if (result != pulsar_result_Ok) {
-      SetError(pulsar_result_str(result));
-    }
-  }
-  void OnOK() {
-    this->consumer->Cleanup();
-    this->deferred.Resolve(Env().Null());
-  }
-  void OnError(const Napi::Error &e) {
-    this->deferred.Reject(
-        Napi::Error::New(Env(), std::string("Failed to unsubscribe consumer: 
") + e.Message()).Value());
-  }
-
- private:
-  Napi::Promise::Deferred deferred;
-  pulsar_consumer_t *cConsumer;
-  Consumer *consumer;
-};
-
 void Consumer::Cleanup() {
-  if (this->listener) {
-    this->CleanupListener();
+  if (this->listener != nullptr) {
+    pulsar_consumer_pause_message_listener(this->cConsumer.get());
+    this->listener->callback.Release();
+    this->listener = nullptr;
+    this->Unref();
   }
 }
 
-void Consumer::CleanupListener() {
-  pulsar_consumer_pause_message_listener(this->wrapper->cConsumer);
-  this->Unref();
-  this->listener->callback.Release();
-  this->listener = nullptr;
-}
-
 Napi::Value Consumer::Close(const Napi::CallbackInfo &info) {
-  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-  ConsumerCloseWorker *wk = new ConsumerCloseWorker(deferred, 
this->wrapper->cConsumer, this);
-  wk->Queue();
-  return deferred.Promise();
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_pause_message_listener(this->cConsumer.get());
+  pulsar_consumer_close_async(
+      this->cConsumer.get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to close consumer: ") + 
pulsar_result_str(result));
+        } else {
+          self->Cleanup();
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
 Napi::Value Consumer::Unsubscribe(const Napi::CallbackInfo &info) {
-  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
-  ConsumerUnsubscribeWorker *wk = new ConsumerUnsubscribeWorker(deferred, 
this->wrapper->cConsumer, this);
-  wk->Queue();
-  return deferred.Promise();
+  auto deferred = ThreadSafeDeferred::New(Env());
+  auto ctx = new ExtDeferredContext<Consumer *>(this, deferred);
+  this->Ref();
+
+  pulsar_consumer_pause_message_listener(this->cConsumer.get());
+  pulsar_consumer_unsubscribe_async(
+      this->cConsumer.get(),
+      [](pulsar_result result, void *ctx) {
+        auto deferredContext = static_cast<ExtDeferredContext<Consumer *> 
*>(ctx);
+        auto deferred = deferredContext->deferred;
+        auto self = deferredContext->ref;
+        delete deferredContext;
+
+        if (result != pulsar_result_Ok) {
+          deferred->Reject(std::string("Failed to unsubscribe consumer: ") + 
pulsar_result_str(result));
+        } else {
+          self->Cleanup();
+          deferred->Resolve(THREADSAFE_DEFERRED_RESOLVER(env.Null()));
+        }
+
+        self->Unref();
+      },
+      ctx);
+
+  return deferred->Promise();
 }
 
 Consumer::~Consumer() {
-  if (this->listener) {
-    this->CleanupListener();
+  this->Cleanup();
+  while (this->Unref() != 0) {
+    // If Ref() > 0 then the process is shutting down. We must unref to prevent
+    // double free (once for the env shutdown and once for non-zero refs)
   }

Review comment:
       If the refcount is already 0, the Reference#Unref might throw an error.
   
   https://github.com/nodejs/node-addon-api/blob/v4.3.0/napi-inl.h#L2940-L2946
   
https://github.com/nodejs/node/blob/v16.14.2/src/js_native_api_v8.cc#L2549-L2572
   
   Are these lines okay?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to