This is an automated email from the ASF dual-hosted git repository.

shibd pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d1002a  Fix consumer reconnect state after subscribe failure (#577)
7d1002a is described below

commit 7d1002ae4f203ac142a7ad8661c179cee2ffe6c8
Author: Baodi Shi <[email protected]>
AuthorDate: Mon May 18 11:55:35 2026 +0800

    Fix consumer reconnect state after subscribe failure (#577)
---
 lib/ConsumerImpl.cc   |  3 ++-
 tests/ConsumerTest.cc | 36 ++++++++++++++++++++++++++++++++++++
 tests/PulsarFriend.h  |  5 +++++
 3 files changed, 43 insertions(+), 1 deletion(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index d268484..85f4994 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -371,8 +371,9 @@ Result ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result
         }
 
         if (consumerCreatedPromise_.isComplete()) {
-            // Consumer had already been initially created, we need to retry 
connecting in any case
+            // Clear the connection set before SUBSCRIBE so the next reconnect 
is not skipped.
             LOG_WARN(getName() << "Failed to reconnect consumer: " << 
strResult(result));
+            resetCnx();
             handleResult = ResultRetryable;
         } else {
             // Consumer was not yet created, retry to connect to broker if 
it's possible
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 795613e..5de5c75 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1596,4 +1596,40 @@ TEST(ConsumerTest, testCloseAfterSeek) {
     anotherClient.close();
 }
 
+TEST(ConsumerTest, 
testIsConnectedFalsePositiveAfterSubscribeRejectedOnReconnect) {
+    // A reconnect SUBSCRIBE failure happens after the initial subscribe has 
already completed.
+    const std::string topic =
+        "persistent://public/default/test-false-positive-" + 
std::to_string(time(nullptr));
+    Client client(lookupUrl);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+    ASSERT_TRUE(consumer.isConnected()) << "Precondition: consumer should be 
connected";
+
+    auto& consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+
+    // Capture the current live connection.
+    auto cnx = consumerImpl.getCnx().lock();
+    ASSERT_TRUE(cnx != nullptr) << "Precondition: cnx should be non-null";
+    LOG_INFO("Step 1 passed: consumer subscribed, cnx=" << cnx);
+
+    // Simulate the broker rejecting the SUBSCRIBE command during reconnect.
+    Result handleResult = PulsarFriend::consumerHandleCreateConsumer(consumer, 
cnx, ResultAuthorizationError);
+    LOG_INFO("Step 2: handleCreateConsumer returned " << handleResult);
+    EXPECT_EQ(ResultRetryable, handleResult)
+        << "handleCreateConsumer should return ResultRetryable for an 
already-created consumer";
+
+    // The failed SUBSCRIBE must clear the connection set before SUBSCRIBE.
+    auto cnxAfter = consumerImpl.getCnx().lock();
+    LOG_INFO("Step 3: cnx after handleCreateConsumer failure = " << cnxAfter);
+    LOG_INFO("Step 3: isConnected() = " << consumer.isConnected());
+
+    EXPECT_EQ(nullptr, cnxAfter)
+        << "After fix: connection_ must be cleared by resetCnx() so grabCnx() 
can retry";
+    EXPECT_FALSE(consumer.isConnected())
+        << "After fix: isConnected() must return false after SUBSCRIBE 
rejection";
+
+    consumer.close();
+    client.close();
+}
+
 }  // namespace pulsar
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index 8017e0b..0244743 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -102,6 +102,11 @@ class PulsarFriend {
         return *consumerImpl;
     }
 
+    static Result consumerHandleCreateConsumer(Consumer consumer, const 
ClientConnectionPtr& cnx,
+                                               Result result) {
+        return getConsumerImpl(consumer).handleCreateConsumer(cnx, result);
+    }
+
     static std::shared_ptr<ConsumerImpl> getConsumerImplPtr(Consumer consumer) 
{
         return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
     }

Reply via email to