This is an automated email from the ASF dual-hosted git repository.

nkurihar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 661e965  fix: use pause and resume api to ensure orderly init 
listener->consumer (#182) (#183)
661e965 is described below

commit 661e965d0089613a3b14ff5d51f589a6c7f9c5bb
Author: Masahiro Sakamoto <massa...@yahoo-corp.jp>
AuthorDate: Tue Dec 7 13:13:32 2021 +0900

    fix: use pause and resume api to ensure orderly init listener->consumer 
(#182) (#183)
    
    Co-authored-by: lipeining <lipein...@joyy.com>
    
    Co-authored-by: lipeining <39306888+lipein...@users.noreply.github.com>
    Co-authored-by: lipeining <lipein...@joyy.com>
---
 src/Consumer.cc | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/src/Consumer.cc b/src/Consumer.cc
index 7dbd1f1..983f10f 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -164,6 +164,12 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker 
{
 
     consumer->SetCConsumer(this->consumerWrapper);
     consumer->SetListenerCallback(this->listener);
+
+    if (this->listener) {
+      // resume to enable MessageListener function callback
+      resume_message_listener(this->consumerWrapper->cConsumer);
+    }
+
     this->deferred.Resolve(obj);
   }
   void OnError(const Napi::Error &e) { 
this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); }
@@ -183,6 +189,11 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker 
{
     } else {
       worker->consumerWrapper->cConsumer = consumer;
       worker->listener = worker->consumerConfig->GetListenerCallback();
+
+      if (worker->listener) {
+        // pause, will resume in OnOK, to prevent MessageListener get a 
nullptr of consumer
+        pulsar_consumer_pause_message_listener(consumer);
+      }
     }
 
     delete worker->consumerConfig;

Reply via email to