This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.8
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit d8fb3b7f576b2cff39243274ac53a22b48e8154a
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Nov 5 18:58:52 2025 +0800

    Fix possible zombie consumer when closing after reconnection (#518)
    
    (cherry picked from commit 268aa4e2fdc621137a1a1fff9587ddd200bb0eda)
---
 lib/ConsumerImpl.cc   | 29 +++++++++++++++++++++++++++--
 lib/HandlerBase.h     |  5 +++++
 tests/ConsumerTest.cc | 22 ++++++++++++++++++++++
 3 files changed, 54 insertions(+), 2 deletions(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 92d25cb..3d5d294 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -293,13 +293,38 @@ Result ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result
     Result handleResult = ResultOk;
 
     if (result == ResultOk) {
-        LOG_INFO(getName() << "Created consumer on broker " << 
cnx->cnxString());
         {
             Lock mutexLock(mutex_);
+            if (!changeToReadyState()) {
+                auto client = client_.lock();
+                if (client) {
+                    LOG_INFO(getName() << "Closing subscribed consumer since 
it was already closed");
+                    int requestId = client->newRequestId();
+                    auto name = getName();
+                    
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), 
requestId)
+                        .addListener([name](Result result, const 
ResponseData&) {
+                            if (result == ResultOk) {
+                                LOG_INFO(name << "Closed consumer successfully 
after subscribe completed");
+                            } else {
+                                LOG_WARN(name << "Failed to close consumer: " 
<< strResult(result));
+                            }
+                        });
+                } else {
+                    // This should not happen normally because if client is 
destroyed, the connection pool
+                    // should also be closed, which means all connections 
should be closed. Close the
+                    // connection to let broker know this registered consumer 
is inactive.
+                    LOG_WARN(getName()
+                             << "Client already closed when subscribe 
completed, close the connection "
+                             << cnx->cnxString());
+                    cnx->close(ResultNotConnected);
+                }
+                return ResultAlreadyClosed;
+            }
+
+            LOG_INFO(getName() << "Created consumer on broker " << 
cnx->cnxString());
             setCnx(cnx);
             incomingMessages_.clear();
             possibleSendToDeadLetterTopicMessages_.clear();
-            state_ = Ready;
             backoff_.reset();
             if (!messageListener_ && config_.getReceiverQueueSize() == 0) {
                 // Complicated logic since we don't have a isLocked() function 
for mutex
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index df7dc24..967322f 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -148,6 +148,11 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
         firstRequestIdAfterConnect_.store(requestId, 
std::memory_order_release);
     }
 
+    bool changeToReadyState() noexcept {
+        State expected = Pending;
+        return state_ == Ready || state_.compare_exchange_strong(expected, 
Ready);
+    }
+
    private:
     DeadlineTimerPtr timer_;
     DeadlineTimerPtr creationTimer_;
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 3aa1dd3..f1bca77 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1573,4 +1573,26 @@ TEST(ConsumerTest, 
testConsumerListenerShouldNotSegfaultAfterClose) {
     ASSERT_EQ(ResultOk, client.close());
 }
 
+TEST(ConsumerTest, testCloseAfterSeek) {
+    const auto topic = "test-close-after-seek-" + 
std::to_string(time(nullptr));
+    const auto subscription = "sub";
+    Client client(lookupUrl);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer));
+    ASSERT_EQ(ResultOk, consumer.seek(TimeUtils::currentTimeMillis()));
+    consumer.closeAsync(nullptr);
+
+    // Test the previous consumer will be closed even after seek is done, at 
the moment the connection might
+    // not be established.
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer));
+
+    // Test creating a consumer from a different client should also work for 
this case
+    Client anotherClient(lookupUrl);
+    consumer.closeAsync(nullptr);
+    ASSERT_EQ(ResultOk, anotherClient.subscribe(topic, subscription, 
consumer));
+
+    client.close();
+    anotherClient.close();
+}
+
 }  // namespace pulsar

Reply via email to