This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-1.13
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git

commit b8c75de6a5c4202773cd67810b54b4dc0bf0f3c2
Author: Baodi Shi <[email protected]>
AuthorDate: Sun Mar 2 21:03:42 2025 +0800

    Fix message might lost when use listener (#406)
    
    * Fix message might lost when use listener
    
    * Remove todo
    
    * code format
    
    * Remove consumer if null judge
    
    (cherry picked from commit b17a2e12f677b059d610a3256371ddb0c9ea348a)
---
 src/Consumer.cc       | 54 +++++++++++++++++++--------------------------------
 src/MessageListener.h |  8 +++++---
 2 files changed, 25 insertions(+), 37 deletions(-)

diff --git a/src/Consumer.cc b/src/Consumer.cc
index e56f8ba..0b82436 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -77,32 +77,28 @@ void MessageListenerProxy(Napi::Env env, Napi::Function 
jsCallback, MessageListe
   Napi::Object msg = Message::NewInstance({}, data->cMessage);
   Consumer *consumer = data->consumer;
 
-  // `consumer` might be null in certain cases, segmentation fault might 
happend without this null check. We
-  // need to handle this rare case in future.
-  if (consumer) {
-    Napi::Value ret;
-    try {
-      ret = jsCallback.Call({msg, consumer->Value()});
-    } catch (std::exception &exception) {
-      logMessageListenerError(consumer, exception.what());
-    }
+  Napi::Value ret;
+  try {
+    ret = jsCallback.Call({msg, consumer->Value()});
+  } catch (std::exception &exception) {
+    logMessageListenerError(consumer, exception.what());
+  }
 
-    if (ret.IsPromise()) {
-      Napi::Promise promise = ret.As<Napi::Promise>();
-      Napi::Function catchFunc = promise.Get("catch").As<Napi::Function>();
+  if (ret.IsPromise()) {
+    Napi::Promise promise = ret.As<Napi::Promise>();
+    Napi::Function catchFunc = promise.Get("catch").As<Napi::Function>();
 
-      ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const 
Napi::CallbackInfo &info) {
-                             Napi::Error error = info[0].As<Napi::Error>();
-                             logMessageListenerError(consumer, error.what());
-                           })});
+    ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const 
Napi::CallbackInfo &info) {
+                           Napi::Error error = info[0].As<Napi::Error>();
+                           logMessageListenerError(consumer, error.what());
+                         })});
 
-      promise = ret.As<Napi::Promise>();
-      Napi::Function finallyFunc = promise.Get("finally").As<Napi::Function>();
+    promise = ret.As<Napi::Promise>();
+    Napi::Function finallyFunc = promise.Get("finally").As<Napi::Function>();
 
-      finallyFunc.Call(
-          promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo 
&info) { data->callback(); })});
-      return;
-    }
+    finallyFunc.Call(
+        promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo 
&info) { data->callback(); })});
+    return;
   }
   data->callback();
 }
@@ -111,7 +107,7 @@ void MessageListener(pulsar_consumer_t *rawConsumer, 
pulsar_message_t *rawMessag
   std::shared_ptr<pulsar_message_t> cMessage(rawMessage, pulsar_message_free);
   MessageListenerCallback *listenerCallback = (MessageListenerCallback *)ctx;
 
-  Consumer *consumer = (Consumer *)listenerCallback->consumer;
+  Consumer *consumer = static_cast<Consumer 
*>(listenerCallback->consumerFuture.get());
 
   if (listenerCallback->callback.Acquire() != napi_ok) {
     return;
@@ -135,7 +131,7 @@ void Consumer::SetListenerCallback(MessageListenerCallback 
*listener) {
   }
 
   if (listener != nullptr) {
-    listener->consumer = this;
+    listener->consumerPromise.set_value(this);
     // If a consumer listener is set, the Consumer instance is kept alive even 
if it goes out of scope in JS
     // code.
     this->Ref();
@@ -168,11 +164,6 @@ struct ConsumerNewInstanceContext {
     auto cConsumer = std::shared_ptr<pulsar_consumer_t>(rawConsumer, 
pulsar_consumer_free);
     auto listener = consumerConfig->GetListenerCallback();
 
-    if (listener) {
-      // pause, will resume in OnOK, to prevent MessageListener get a nullptr 
of consumer
-      pulsar_consumer_pause_message_listener(cConsumer.get());
-    }
-
     deferred->Resolve([cConsumer, consumerConfig, listener](const Napi::Env 
env) {
       Napi::Object obj = Consumer::constructor.New({});
       Consumer *consumer = Consumer::Unwrap(obj);
@@ -180,11 +171,6 @@ struct ConsumerNewInstanceContext {
       consumer->SetCConsumer(cConsumer);
       consumer->SetListenerCallback(listener);
 
-      if (listener) {
-        // resume to enable MessageListener function callback
-        resume_message_listener(cConsumer.get());
-      }
-
       return obj;
     });
   }
diff --git a/src/MessageListener.h b/src/MessageListener.h
index ff4efee..704c8ec 100644
--- a/src/MessageListener.h
+++ b/src/MessageListener.h
@@ -21,14 +21,16 @@
 #define MESSAGELISTENER_H
 
 #include <napi.h>
+#include <future>
 
 struct MessageListenerCallback {
   Napi::ThreadSafeFunction callback;
 
-  // Using consumer as void* since the ListenerCallback is shared between 
Config and Consumer.
-  void *consumer;
+  // Use future store consumer point, because need ensure sync.
+  std::promise<void *> consumerPromise;
+  std::shared_future<void *> consumerFuture;
 
-  MessageListenerCallback() : consumer(nullptr) {}
+  MessageListenerCallback() : consumerPromise(), 
consumerFuture(consumerPromise.get_future()) {}
 };
 
 #endif

Reply via email to