This is an automated email from the ASF dual-hosted git repository. nkurihar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push: new 661e965 fix: use pause and resume api to ensure orderly init listener->consumer (#182) (#183) 661e965 is described below commit 661e965d0089613a3b14ff5d51f589a6c7f9c5bb Author: Masahiro Sakamoto <massa...@yahoo-corp.jp> AuthorDate: Tue Dec 7 13:13:32 2021 +0900 fix: use pause and resume api to ensure orderly init listener->consumer (#182) (#183) Co-authored-by: lipeining <lipein...@joyy.com> Co-authored-by: lipeining <39306888+lipein...@users.noreply.github.com> Co-authored-by: lipeining <lipein...@joyy.com> --- src/Consumer.cc | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/Consumer.cc b/src/Consumer.cc index 7dbd1f1..983f10f 100644 --- a/src/Consumer.cc +++ b/src/Consumer.cc @@ -164,6 +164,12 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker { consumer->SetCConsumer(this->consumerWrapper); consumer->SetListenerCallback(this->listener); + + if (this->listener) { + // resume to enable MessageListener function callback + resume_message_listener(this->consumerWrapper->cConsumer); + } + this->deferred.Resolve(obj); } void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); } @@ -183,6 +189,11 @@ class ConsumerNewInstanceWorker : public Napi::AsyncWorker { } else { worker->consumerWrapper->cConsumer = consumer; worker->listener = worker->consumerConfig->GetListenerCallback(); + + if (worker->listener) { + // pause, will resume in OnOK, to prevent MessageListener get a nullptr of consumer + pulsar_consumer_pause_message_listener(consumer); + } } delete worker->consumerConfig;