This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-1.13 in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
commit b8c75de6a5c4202773cd67810b54b4dc0bf0f3c2 Author: Baodi Shi <[email protected]> AuthorDate: Sun Mar 2 21:03:42 2025 +0800 Fix message might lost when use listener (#406) * Fix message might lost when use listener * Remove todo * code format * Remove consumer if null judge (cherry picked from commit b17a2e12f677b059d610a3256371ddb0c9ea348a) --- src/Consumer.cc | 54 +++++++++++++++++++-------------------------------- src/MessageListener.h | 8 +++++--- 2 files changed, 25 insertions(+), 37 deletions(-) diff --git a/src/Consumer.cc b/src/Consumer.cc index e56f8ba..0b82436 100644 --- a/src/Consumer.cc +++ b/src/Consumer.cc @@ -77,32 +77,28 @@ void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListe Napi::Object msg = Message::NewInstance({}, data->cMessage); Consumer *consumer = data->consumer; - // `consumer` might be null in certain cases, segmentation fault might happend without this null check. We - // need to handle this rare case in future. - if (consumer) { - Napi::Value ret; - try { - ret = jsCallback.Call({msg, consumer->Value()}); - } catch (std::exception &exception) { - logMessageListenerError(consumer, exception.what()); - } + Napi::Value ret; + try { + ret = jsCallback.Call({msg, consumer->Value()}); + } catch (std::exception &exception) { + logMessageListenerError(consumer, exception.what()); + } - if (ret.IsPromise()) { - Napi::Promise promise = ret.As<Napi::Promise>(); - Napi::Function catchFunc = promise.Get("catch").As<Napi::Function>(); + if (ret.IsPromise()) { + Napi::Promise promise = ret.As<Napi::Promise>(); + Napi::Function catchFunc = promise.Get("catch").As<Napi::Function>(); - ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const Napi::CallbackInfo &info) { - Napi::Error error = info[0].As<Napi::Error>(); - logMessageListenerError(consumer, error.what()); - })}); + ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const Napi::CallbackInfo &info) { + Napi::Error error = info[0].As<Napi::Error>(); + logMessageListenerError(consumer, error.what()); + })}); - promise = ret.As<Napi::Promise>(); - Napi::Function finallyFunc = promise.Get("finally").As<Napi::Function>(); + promise = ret.As<Napi::Promise>(); + Napi::Function finallyFunc = promise.Get("finally").As<Napi::Function>(); - finallyFunc.Call( - promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); })}); - return; - } + finallyFunc.Call( + promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); })}); + return; } data->callback(); } @@ -111,7 +107,7 @@ void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessag std::shared_ptr<pulsar_message_t> cMessage(rawMessage, pulsar_message_free); MessageListenerCallback *listenerCallback = (MessageListenerCallback *)ctx; - Consumer *consumer = (Consumer *)listenerCallback->consumer; + Consumer *consumer = static_cast<Consumer *>(listenerCallback->consumerFuture.get()); if (listenerCallback->callback.Acquire() != napi_ok) { return; @@ -135,7 +131,7 @@ void Consumer::SetListenerCallback(MessageListenerCallback *listener) { } if (listener != nullptr) { - listener->consumer = this; + listener->consumerPromise.set_value(this); // If a consumer listener is set, the Consumer instance is kept alive even if it goes out of scope in JS // code. this->Ref(); @@ -168,11 +164,6 @@ struct ConsumerNewInstanceContext { auto cConsumer = std::shared_ptr<pulsar_consumer_t>(rawConsumer, pulsar_consumer_free); auto listener = consumerConfig->GetListenerCallback(); - if (listener) { - // pause, will resume in OnOK, to prevent MessageListener get a nullptr of consumer - pulsar_consumer_pause_message_listener(cConsumer.get()); - } - deferred->Resolve([cConsumer, consumerConfig, listener](const Napi::Env env) { Napi::Object obj = Consumer::constructor.New({}); Consumer *consumer = Consumer::Unwrap(obj); @@ -180,11 +171,6 @@ struct ConsumerNewInstanceContext { consumer->SetCConsumer(cConsumer); consumer->SetListenerCallback(listener); - if (listener) { - // resume to enable MessageListener function callback - resume_message_listener(cConsumer.get()); - } - return obj; }); } diff --git a/src/MessageListener.h b/src/MessageListener.h index ff4efee..704c8ec 100644 --- a/src/MessageListener.h +++ b/src/MessageListener.h @@ -21,14 +21,16 @@ #define MESSAGELISTENER_H #include <napi.h> +#include <future> struct MessageListenerCallback { Napi::ThreadSafeFunction callback; - // Using consumer as void* since the ListenerCallback is shared between Config and Consumer. - void *consumer; + // Use future store consumer point, because need ensure sync. + std::promise<void *> consumerPromise; + std::shared_future<void *> consumerFuture; - MessageListenerCallback() : consumer(nullptr) {} + MessageListenerCallback() : consumerPromise(), consumerFuture(consumerPromise.get_future()) {} }; #endif
