This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new fee97b2 fix: use async receive function (#252)
fee97b2 is described below
commit fee97b206bdb2583498e1af6b3b0282047289ad0
Author: Yuri Mizushima <[email protected]>
AuthorDate: Mon Dec 12 13:27:24 2022 +0900
fix: use async receive function (#252)
---
src/Consumer.cc | 27 +++++++++++++++++++++++----
1 file changed, 23 insertions(+), 4 deletions(-)
diff --git a/src/Consumer.cc b/src/Consumer.cc
index e4e7898..91d7572 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -247,16 +247,35 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
};
Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
- Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
if (info[0].IsUndefined()) {
- ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred,
this->cConsumer);
- wk->Queue();
+ auto deferred = ThreadSafeDeferred::New(Env());
+ auto ctx = new ExtDeferredContext(deferred);
+ pulsar_consumer_receive_async(
+ this->cConsumer.get(),
+ [](pulsar_result result, pulsar_message_t *rawMessage, void *ctx) {
+ auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
+ auto deferred = deferredContext->deferred;
+ delete deferredContext;
+
+ if (result != pulsar_result_Ok) {
+ deferred->Reject(std::string("Failed to receive message: ") +
pulsar_result_str(result));
+ } else {
+ deferred->Resolve([rawMessage](const Napi::Env env) {
+ Napi::Object obj = Message::NewInstance(
+ {}, std::shared_ptr<pulsar_message_t>(rawMessage,
pulsar_message_free));
+ return obj;
+ });
+ }
+ },
+ ctx);
+ return deferred->Promise();
} else {
+ Napi::Promise::Deferred deferred =
Napi::Promise::Deferred::New(info.Env());
Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred,
this->cConsumer, timeout.Int64Value());
wk->Queue();
+ return deferred.Promise();
}
- return deferred.Promise();
}
Napi::Value Consumer::Acknowledge(const Napi::CallbackInfo &info) {