This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new fee97b2  fix: use async receive function (#252)
fee97b2 is described below

commit fee97b206bdb2583498e1af6b3b0282047289ad0
Author: Yuri Mizushima <[email protected]>
AuthorDate: Mon Dec 12 13:27:24 2022 +0900

    fix: use async receive function (#252)
---
 src/Consumer.cc | 27 +++++++++++++++++++++++----
 1 file changed, 23 insertions(+), 4 deletions(-)

diff --git a/src/Consumer.cc b/src/Consumer.cc
index e4e7898..91d7572 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -247,16 +247,35 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
 };
 
 Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
-  Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
   if (info[0].IsUndefined()) {
-    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, 
this->cConsumer);
-    wk->Queue();
+    auto deferred = ThreadSafeDeferred::New(Env());
+    auto ctx = new ExtDeferredContext(deferred);
+    pulsar_consumer_receive_async(
+        this->cConsumer.get(),
+        [](pulsar_result result, pulsar_message_t *rawMessage, void *ctx) {
+          auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
+          auto deferred = deferredContext->deferred;
+          delete deferredContext;
+
+          if (result != pulsar_result_Ok) {
+            deferred->Reject(std::string("Failed to receive message: ") + 
pulsar_result_str(result));
+          } else {
+            deferred->Resolve([rawMessage](const Napi::Env env) {
+              Napi::Object obj = Message::NewInstance(
+                  {}, std::shared_ptr<pulsar_message_t>(rawMessage, 
pulsar_message_free));
+              return obj;
+            });
+          }
+        },
+        ctx);
+    return deferred->Promise();
   } else {
+    Napi::Promise::Deferred deferred = 
Napi::Promise::Deferred::New(info.Env());
     Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
     ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, 
this->cConsumer, timeout.Int64Value());
     wk->Queue();
+    return deferred.Promise();
   }
-  return deferred.Promise();
 }
 
 Napi::Value Consumer::Acknowledge(const Napi::CallbackInfo &info) {

Reply via email to