This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b893a1  Fix the seek method could be blocked forever when subscribe 
RPC is slower than seek RPC (#533)
0b893a1 is described below

commit 0b893a1622aa7dd702d48afee33af321bb2c06dd
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jan 16 14:14:50 2026 +0800

    Fix the seek method could be blocked forever when subscribe RPC is slower 
than seek RPC (#533)
---
 lib/ClientConnection.cc   |  31 ++++--
 lib/ClientConnection.h    |  22 ++++-
 lib/ConsumerImpl.cc       | 246 ++++++++++++++++++++++++++--------------------
 lib/ConsumerImpl.h        |  63 ++++++------
 lib/HandlerBase.cc        |   2 +-
 lib/HandlerBase.h         |   2 +-
 lib/MockServer.h          | 139 ++++++++++++++++++++++++++
 lib/ProducerImpl.cc       |  10 +-
 lib/Synchronized.h        |  47 ---------
 tests/ConsumerSeekTest.cc |  58 +++++++++++
 10 files changed, 420 insertions(+), 200 deletions(-)

diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 86dfd9d..4f7a1dd 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -21,6 +21,7 @@
 #include <openssl/x509.h>
 #include <pulsar/MessageIdBuilder.h>
 
+#include <chrono>
 #include <fstream>
 
 #include "AsioDefines.h"
@@ -31,6 +32,7 @@
 #include "ConsumerImpl.h"
 #include "ExecutorService.h"
 #include "LogUtils.h"
+#include "MockServer.h"
 #include "OpSendMsg.h"
 #include "ProducerImpl.h"
 #include "PulsarApi.pb.h"
@@ -1005,15 +1007,17 @@ Future<Result, BrokerConsumerStatsImpl> 
ClientConnection::newConsumerStats(uint6
 void ClientConnection::newTopicLookup(const std::string& topicName, bool 
authoritative,
                                       const std::string& listenerName, 
uint64_t requestId,
                                       const LookupDataResultPromisePtr& 
promise) {
-    newLookup(Commands::newLookup(topicName, authoritative, requestId, 
listenerName), requestId, promise);
+    newLookup(Commands::newLookup(topicName, authoritative, requestId, 
listenerName), requestId, "LOOKUP",
+              promise);
 }
 
 void ClientConnection::newPartitionedMetadataLookup(const std::string& 
topicName, uint64_t requestId,
                                                     const 
LookupDataResultPromisePtr& promise) {
-    newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), 
requestId, promise);
+    newLookup(Commands::newPartitionMetadataRequest(topicName, requestId), 
requestId, "PARTITIONED_METADATA",
+              promise);
 }
 
-void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
+void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, 
const char* requestType,
                                  const LookupDataResultPromisePtr& promise) {
     Lock lock(mutex_);
     std::shared_ptr<LookupDataResultPtr> lookupDataResult;
@@ -1042,6 +1046,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, 
uint64_t requestId,
     pendingLookupRequests_.insert(std::make_pair(requestId, requestData));
     numOfPendingLookupRequest_++;
     lock.unlock();
+    LOG_DEBUG(cnxString_ << "Inserted lookup request " << requestType << " 
(req_id: " << requestId << ")");
     sendCommand(cmd);
 }
 
@@ -1158,12 +1163,15 @@ void ClientConnection::sendPendingCommands() {
     }
 }
 
-Future<Result, ResponseData> ClientConnection::sendRequestWithId(const 
SharedBuffer& cmd, int requestId) {
+Future<Result, ResponseData> ClientConnection::sendRequestWithId(const 
SharedBuffer& cmd, int requestId,
+                                                                 const char* 
requestType) {
     Lock lock(mutex_);
 
     if (isClosed()) {
         lock.unlock();
         Promise<Result, ResponseData> promise;
+        LOG_DEBUG(cnxString_ << "Fail " << requestType << "(req_id: " << 
requestId
+                             << ") to a closed connection");
         promise.setFailed(ResultNotConnected);
         return promise.getFuture();
     }
@@ -1182,7 +1190,17 @@ Future<Result, ResponseData> 
ClientConnection::sendRequestWithId(const SharedBuf
     pendingRequests_.insert(std::make_pair(requestId, requestData));
     lock.unlock();
 
-    sendCommand(cmd);
+    LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: " 
<< requestId << ")");
+    if (mockingRequests_.load(std::memory_order_acquire)) {
+        if (mockServer_ == nullptr) {
+            LOG_WARN(cnxString_ << "Mock server is unexpectedly null when 
processing " << requestType);
+            sendCommand(cmd);
+        } else if (!mockServer_->sendRequest(requestType, requestId)) {
+            sendCommand(cmd);
+        }
+    } else {
+        sendCommand(cmd);
+    }
     return requestData.promise.getFuture();
 }
 
@@ -1625,9 +1643,6 @@ void ClientConnection::handleConsumerStatsResponse(
 
 void ClientConnection::handleLookupTopicRespose(
     const proto::CommandLookupTopicResponse& lookupTopicResponse) {
-    LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: "
-                         << lookupTopicResponse.request_id());
-
     Lock lock(mutex_);
     auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
     if (it != pendingLookupRequests_.end()) {
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 18a7d84..aae53d2 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -115,6 +115,7 @@ struct ResponseData {
 
 typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
 
+class MockServer;
 class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<ClientConnection> {
     enum State : uint8_t
     {
@@ -123,6 +124,8 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
         Ready,
         Disconnected
     };
+    using RequestDelayType =
+        std::unordered_map<std::string /* request type */, long /* delay in 
milliseconds */>;
 
    public:
     typedef std::shared_ptr<ASIO::ip::tcp::socket> SocketPtr;
@@ -185,7 +188,8 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      * Send a request with a specific Id over the connection. The future will 
be
      * triggered when the response for this request is received
      */
-    Future<Result, ResponseData> sendRequestWithId(const SharedBuffer& cmd, 
int requestId);
+    Future<Result, ResponseData> sendRequestWithId(const SharedBuffer& cmd, 
int requestId,
+                                                   const char* requestType);
 
     const std::string& brokerAddress() const;
 
@@ -208,6 +212,13 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     Future<Result, SchemaInfo> newGetSchema(const std::string& topicName, 
const std::string& version,
                                             uint64_t requestId);
 
+    void attachMockServer(const std::shared_ptr<MockServer>& mockServer) {
+        mockServer_ = mockServer;
+        // Mark that requests will first go through the mock server, if the 
mock server cannot process it,
+        // fall back to the normal logic
+        mockingRequests_.store(true, std::memory_order_release);
+    }
+
    private:
     struct PendingRequestData {
         Promise<Result, ResponseData> promise;
@@ -264,7 +275,8 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
     void handleSendPair(const ASIO_ERROR& err);
     void sendPendingCommands();
-    void newLookup(const SharedBuffer& cmd, uint64_t requestId, const 
LookupDataResultPromisePtr& promise);
+    void newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* 
requestType,
+                   const LookupDataResultPromisePtr& promise);
 
     void handleRequestTimeout(const ASIO_ERROR& ec, const PendingRequestData& 
pendingRequestData);
 
@@ -308,6 +320,8 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
         }
     }
 
+    void mockSendCommand(const char* requestType, uint64_t requestId, const 
SharedBuffer& cmd);
+
     std::atomic<State> state_{Pending};
     TimeDuration operationsTimeout_;
     AuthenticationPtr authentication_;
@@ -391,6 +405,9 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     DeadlineTimerPtr keepAliveTimer_;
     DeadlineTimerPtr consumerStatsRequestTimer_;
 
+    std::atomic_bool mockingRequests_{false};
+    std::shared_ptr<MockServer> mockServer_;
+
     void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const 
std::vector<uint64_t>& consumerStatsRequests);
 
     void startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests);
@@ -405,6 +422,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
 
     friend class PulsarFriend;
     friend class ConsumerTest;
+    friend class MockServer;
 
     void checkServerError(ServerError error, const std::string& message);
 
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 430b851..a645f58 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -23,6 +23,8 @@
 #include <pulsar/MessageIdBuilder.h>
 
 #include <algorithm>
+#include <utility>
+#include <variant>
 
 #include "AckGroupingTracker.h"
 #include "AckGroupingTrackerDisabled.h"
@@ -123,7 +125,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client, 
const std::string& topic
       negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client, 
*this, conf)),
       ackGroupingTrackerPtr_(newAckGroupingTracker(topic, conf, client)),
       readCompacted_(conf.isReadCompacted()),
-      startMessageId_(getStartMessageId(startMessageId, 
conf.isStartMessageIdInclusive())),
+      startMessageId_(pulsar::getStartMessageId(startMessageId, 
conf.isStartMessageIdInclusive())),
       maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
       
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
       
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
@@ -189,7 +191,8 @@ ConsumerImpl::~ConsumerImpl() {
         ClientConnectionPtr cnx = getCnx().lock();
         if (cnx) {
             auto requestId = newRequestId();
-            cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, 
requestId), requestId);
+            cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, 
requestId), requestId,
+                                   "CLOSE_CONSUMER");
             cnx->removeConsumer(consumerId_);
             LOG_INFO(consumerStr_ << "Closed consumer for race condition: " << 
consumerId_);
         } else {
@@ -232,20 +235,19 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& c
         return promise.getFuture();
     }
 
-    // Register consumer so that we can handle other incomming commands (e.g. 
ACTIVE_CONSUMER_CHANGE) after
-    // sending the subscribe request.
-    cnx->registerConsumer(consumerId_, get_shared_this_ptr());
+    optional<MessageId> subscribeMessageId;
+    {
+        LockGuard lock{mutex_};
+        setCnx(cnx);
+        cnx->registerConsumer(consumerId_, get_shared_this_ptr());
+        LOG_DEBUG(cnx->cnxString() << "Registered consumer " << consumerId_);
 
-    if (duringSeek()) {
-        ackGroupingTrackerPtr_->flushAndClean();
+        clearReceiveQueue();
+        subscribeMessageId =
+            (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? 
startMessageId_ : std::nullopt;
+        lastDequedMessageId_ = MessageId::earliest();
     }
 
-    Lock lockForMessageId(mutexForMessageId_);
-    clearReceiveQueue();
-    const auto subscribeMessageId =
-        (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ? 
startMessageId_.get() : std::nullopt;
-    lockForMessageId.unlock();
-
     unAckedMessageTrackerPtr_->clear();
 
     ClientImplPtr client = client_.lock();
@@ -259,13 +261,22 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& c
     // Keep a reference to ensure object is kept alive.
     auto self = get_shared_this_ptr();
     setFirstRequestIdAfterConnect(requestId);
-    cnx->sendRequestWithId(cmd, requestId)
+    cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE")
         .addListener([this, self, cnx, promise](Result result, const 
ResponseData& responseData) {
             Result handleResult = handleCreateConsumer(cnx, result);
-            if (handleResult == ResultOk) {
-                promise.setSuccess();
-            } else {
+            if (handleResult != ResultOk) {
                 promise.setFailed(handleResult);
+                return;
+            }
+            promise.setSuccess();
+            // Complete the seek callback after completing `promise`, 
otherwise `reconnectionPending_` will
+            // still be true when the seek operation is done.
+            LockGuard lock{mutex_};
+            if (seekStatus_ == SeekStatus::COMPLETED) {
+                executor_->postWork([seekCallback{std::exchange(seekCallback_, 
std::nullopt).value()}]() {
+                    seekCallback(ResultOk);
+                });
+                seekStatus_ = SeekStatus::NOT_STARTED;
             }
         });
 
@@ -301,7 +312,8 @@ Result ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result
                     LOG_INFO(getName() << "Closing subscribed consumer since 
it was already closed");
                     int requestId = client->newRequestId();
                     auto name = getName();
-                    
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), 
requestId)
+                    
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), 
requestId,
+                                           "CLOSE_CONSUMER")
                         .addListener([name](Result result, const 
ResponseData&) {
                             if (result == ResultOk) {
                                 LOG_INFO(name << "Closed consumer successfully 
after subscribe completed");
@@ -321,8 +333,8 @@ Result ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result
                 return ResultAlreadyClosed;
             }
 
+            mutexLock.unlock();
             LOG_INFO(getName() << "Created consumer on broker " << 
cnx->cnxString());
-            setCnx(cnx);
             incomingMessages_.clear();
             possibleSendToDeadLetterTopicMessages_.clear();
             backoff_.reset();
@@ -354,7 +366,8 @@ Result ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result
             // in case it was indeed created, otherwise it might prevent new 
subscribe operation,
             // since we are not closing the connection
             auto requestId = newRequestId();
-            cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, 
requestId), requestId);
+            cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, 
requestId), requestId,
+                                   "CLOSE_CONSUMER");
         }
 
         if (consumerCreatedPromise_.isComplete()) {
@@ -408,7 +421,7 @@ void ConsumerImpl::unsubscribeAsync(const ResultCallback& 
originalCallback) {
         auto requestId = newRequestId();
         SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId);
         auto self = get_shared_this_ptr();
-        cnx->sendRequestWithId(cmd, requestId)
+        cnx->sendRequestWithId(cmd, requestId, "UNSUBSCRIBE")
             .addListener([self, callback](Result result, const ResponseData&) 
{ callback(result); });
     } else {
         Result result = ResultNotConnected;
@@ -502,9 +515,10 @@ optional<SharedBuffer> 
ConsumerImpl::processMessageChunk(const SharedBuffer& pay
 
     auto& chunkedMsgCtx = it->second;
     if (it == chunkedMessageCache_.end() || 
!chunkedMsgCtx.validateChunkId(chunkId)) {
-        auto startMessageId = 
startMessageId_.get().value_or(MessageId::earliest());
-        if (!config_.isStartMessageIdInclusive() && startMessageId.ledgerId() 
== messageId.ledgerId() &&
-            startMessageId.entryId() == messageId.entryId()) {
+        auto startMessageId = getStartMessageId();
+        if (!config_.isStartMessageIdInclusive() && startMessageId &&
+            startMessageId->ledgerId() == messageId.ledgerId() &&
+            startMessageId->entryId() == messageId.entryId()) {
             // When the start message id is not inclusive, the last chunk of 
the previous chunked message will
             // be delivered, which is expected and we only need to filter it 
out.
             chunkedMessageCache_.remove(uuid);
@@ -621,17 +635,14 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
             words[i] = msg.ack_set(i);
         }
         BitSet ackSet{std::move(words)};
-        Lock lock(mutex_);
         numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, 
ackSet, msg.redelivery_count());
     } else {
         // try convert key value data.
         m.impl_->convertPayloadToKeyValue(config_.getSchema());
 
-        const auto startMessageId = startMessageId_.get();
-        if (isPersistent_ && startMessageId &&
-            m.getMessageId().ledgerId() == startMessageId.value().ledgerId() &&
-            m.getMessageId().entryId() == startMessageId.value().entryId() &&
-            isPriorEntryIndex(m.getMessageId().entryId())) {
+        const auto startMessageId = getStartMessageId();
+        if (isPersistent_ && startMessageId && m.getMessageId().ledgerId() == 
startMessageId->ledgerId() &&
+            isPrior(m.getMessageId().entryId(), startMessageId->entryId())) {
             LOG_DEBUG(getName() << " Ignoring message from before the 
startMessageId: "
                                 << startMessageId.value());
             return;
@@ -753,7 +764,7 @@ uint32_t 
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
     auto batchSize = batchedMessage.impl_->metadata.num_messages_in_batch();
     LOG_DEBUG("Received Batch messages of size - " << batchSize
                                                    << " -- msgId: " << 
batchedMessage.getMessageId());
-    const auto startMessageId = startMessageId_.get();
+    const auto startMessageId = getStartMessageId();
 
     int skippedMessages = 0;
 
@@ -783,9 +794,9 @@ uint32_t 
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
 
             // If we are receiving a batch message, we need to discard 
messages that were prior
             // to the startMessageId
-            if (isPersistent_ && msgId.ledgerId() == 
startMessageId.value().ledgerId() &&
-                msgId.entryId() == startMessageId.value().entryId() &&
-                isPriorBatchIndex(msgId.batchIndex())) {
+            if (isPersistent_ && msgId.ledgerId() == 
startMessageId->ledgerId() &&
+                msgId.entryId() == startMessageId->entryId() &&
+                isPrior(msgId.batchIndex(), startMessageId->batchIndex())) {
                 LOG_DEBUG(getName() << "Ignoring message from before the 
startMessageId"
                                     << msg.getMessageId());
                 ++skippedMessages;
@@ -925,7 +936,7 @@ void ConsumerImpl::internalListener() {
     trackMessage(msg.getMessageId());
     try {
         consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
-        lastDequedMessageId_ = msg.getMessageId();
+        setLastDequedMessageId(msg.getMessageId());
         Consumer consumer{get_shared_this_ptr()};
         Message interceptMsg = 
interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
         messageListener_(consumer, interceptMsg);
@@ -1098,10 +1109,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int 
timeout) {
 }
 
 void ConsumerImpl::messageProcessed(Message& msg, bool track) {
-    Lock lock(mutexForMessageId_);
-    lastDequedMessageId_ = msg.getMessageId();
-    lock.unlock();
-
+    setLastDequedMessageId(msg.getMessageId());
     incomingMessagesSize_.fetch_sub(msg.getLength());
 
     ClientConnectionPtr currentCnx = getCnx().lock();
@@ -1123,20 +1131,22 @@ void ConsumerImpl::messageProcessed(Message& msg, bool 
track) {
  * was
  * not seen by the application
  * `startMessageId_` is updated so that we can discard messages after delivery 
restarts.
+ * NOTE: `mutex_` must be locked before calling this method.
  */
 void ConsumerImpl::clearReceiveQueue() {
-    if (duringSeek()) {
-        if (hasSoughtByTimestamp()) {
-            // Invalidate startMessageId_ so that isPriorBatchIndex and 
isPriorEntryIndex checks will be
-            // skipped, and hasMessageAvailableAsync won't use startMessageId_ 
in compare.
-            startMessageId_ = std::nullopt;
+    if (seekStatus_ != SeekStatus::NOT_STARTED) {
+        // Flush the pending ACKs in case newly arrived messages are filtered 
out by the previous pending ACKs
+        ackGroupingTrackerPtr_->flushAndClean();
+        if (lastSeekArg_.has_value()) {
+            if (std::holds_alternative<MessageId>(lastSeekArg_.value())) {
+                startMessageId_ = std::get<MessageId>(lastSeekArg_.value());
+            } else {
+                // Invalidate startMessageId_ so that `isPrior` checks will be 
skipped, and
+                // `hasMessageAvailableAsync` won't use `startMessageId_` in 
compare.
+                startMessageId_ = std::nullopt;
+            }
         } else {
-            startMessageId_ = seekMessageId_.get();
-        }
-        SeekStatus expected = SeekStatus::COMPLETED;
-        if (seekStatus_.compare_exchange_strong(expected, 
SeekStatus::NOT_STARTED)) {
-            auto seekCallback = seekCallback_.release();
-            executor_->postWork([seekCallback] { seekCallback(ResultOk); });
+            LOG_ERROR(getName() << "SeekStatus is not NOT_STARTED but 
lastSeekArg_ is not set");
         }
         return;
     } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
@@ -1374,7 +1384,7 @@ void ConsumerImpl::closeAsync(const ResultCallback& 
originalCallback) {
 
     auto requestId = newRequestId();
     auto self = get_shared_this_ptr();
-    cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), 
requestId)
+    cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), 
requestId, "CLOSE_CONSUMER")
         .addListener([self, callback](Result result, const ResponseData&) { 
callback(result); });
 }
 
@@ -1550,10 +1560,12 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, 
const ResultCallback& callb
     }
 
     const auto requestId = newRequestId();
-    seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
msgId), SeekArg{msgId}, callback);
+    auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {};
+    seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
msgId), SeekArg{msgId},
+                      std::move(nonNullCallback));
 }
 
-void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& 
callback) {
+void ConsumerImpl::seekAsync(SeekTimestampType timestamp, const 
ResultCallback& callback) {
     const auto state = state_.load();
     if (state == Closed || state == Closing) {
         LOG_ERROR(getName() << "Client connection already closed.");
@@ -1564,8 +1576,9 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, const 
ResultCallback& callback)
     }
 
     const auto requestId = newRequestId();
+    auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {};
     seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
timestamp), SeekArg{timestamp},
-                      callback);
+                      std::move(nonNullCallback));
 }
 
 bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
@@ -1577,16 +1590,16 @@ void ConsumerImpl::hasMessageAvailableAsync(const 
HasMessageAvailableCallback& c
     }
     bool compareMarkDeletePosition;
     {
-        std::lock_guard<std::mutex> lock{mutexForMessageId_};
+        LockGuard lock{mutex_};
         compareMarkDeletePosition =
             // there is no message received by consumer, so we cannot compare 
the last position with the last
             // received position
             lastDequedMessageId_ == MessageId::earliest() &&
             // If the start message id is latest, we should seek to the actual 
last message first.
-            (startMessageId_.get().value_or(MessageId::earliest()) == 
MessageId::latest() ||
+            (startMessageId_.value_or(MessageId::earliest()) == 
MessageId::latest() ||
              // If there is a previous seek operation by timestamp, the start 
message id will be incorrect, so
              // we cannot compare the start position with the last position.
-             hasSoughtByTimestamp());
+             (lastSeekArg_.has_value() && 
std::holds_alternative<SeekTimestampType>(lastSeekArg_.value())));
     }
     if (compareMarkDeletePosition) {
         auto self = get_shared_this_ptr();
@@ -1607,7 +1620,15 @@ void ConsumerImpl::hasMessageAvailableAsync(const 
HasMessageAvailableCallback& c
                     callback(ResultOk, false);
                 }
             };
-            if (self->config_.isStartMessageIdInclusive() && 
!self->hasSoughtByTimestamp()) {
+            bool lastSeekIsByTimestamp = false;
+            {
+                LockGuard lock{self->mutex_};
+                if (self->lastSeekArg_.has_value() &&
+                    
std::holds_alternative<SeekTimestampType>(self->lastSeekArg_.value())) {
+                    lastSeekIsByTimestamp = true;
+                }
+            }
+            if (self->config_.isStartMessageIdInclusive() && 
!lastSeekIsByTimestamp) {
                 self->seekAsync(response.getLastMessageId(), [callback, 
handleResponse](Result result) {
                     if (result != ResultOk) {
                         callback(result, {});
@@ -1664,9 +1685,10 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const 
BackoffPtr& backoff, Time
                 .addListener([this, self, callback](Result result, const 
GetLastMessageIdResponse& response) {
                     if (result == ResultOk) {
                         LOG_DEBUG(getName() << "getLastMessageId: " << 
response);
-                        Lock lock(mutexForMessageId_);
-                        lastMessageIdInBroker_ = response.getLastMessageId();
-                        lock.unlock();
+                        {
+                            LockGuard lock{mutex_};
+                            lastMessageIdInBroker_ = 
response.getLastMessageId();
+                        }
                     } else {
                         LOG_ERROR(getName() << "Failed to getLastMessageId: " 
<< result);
                     }
@@ -1723,78 +1745,89 @@ bool ConsumerImpl::isConnected() const { return 
!getCnx().expired() && state_ ==
 uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 
1 : 0; }
 
 void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, 
const SeekArg& seekArg,
-                                     const ResultCallback& callback) {
+                                     ResultCallback&& callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         LOG_ERROR(getName() << " Client Connection not ready for Consumer");
         callback(ResultNotConnected);
         return;
     }
-
-    auto expected = SeekStatus::NOT_STARTED;
-    if (!seekStatus_.compare_exchange_strong(expected, 
SeekStatus::IN_PROGRESS)) {
-        LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the 
status is "
-                            << static_cast<int>(expected));
+    bool hasPendingSeek = false;
+    // Save the previous last seek arg in case seek failed
+    decltype(lastSeekArg_) previousLastSeekArg;
+    {
+        std::lock_guard<std::mutex> lock(mutex_);
+        if (seekStatus_ != SeekStatus::NOT_STARTED) {
+            hasPendingSeek = true;
+        } else {
+            seekStatus_ = SeekStatus::IN_PROGRESS;
+            if (seekCallback_.has_value()) {
+                // This should never happen
+                LOG_ERROR(getName() << "Previous seek callback is not 
triggered unexpectedly");
+                executor_->postWork([callback{std::exchange(seekCallback_, 
std::nullopt).value()}] {
+                    callback(ResultTimeout);
+                });
+            }
+            seekCallback_ = std::move(callback);
+            previousLastSeekArg = lastSeekArg_;
+            lastSeekArg_ = seekArg;
+        }
+    }
+    if (hasPendingSeek) {
+        std::visit(
+            [this](auto&& arg) {
+                LOG_ERROR(getName() << "Attempted to seek " << arg << " when 
there is a pending seek");
+            },
+            seekArg);
         callback(ResultNotAllowedError);
         return;
     }
 
-    const auto originalSeekMessageId = seekMessageId_.get();
-    if (boost::get<uint64_t>(&seekArg)) {
-        hasSoughtByTimestamp_.store(true, std::memory_order_release);
-    } else {
-        seekMessageId_ = *boost::get<MessageId>(&seekArg);
-        hasSoughtByTimestamp_.store(false, std::memory_order_release);
-    }
-    seekStatus_ = SeekStatus::IN_PROGRESS;
-    seekCallback_ = callback;
-    LOG_INFO(getName() << " Seeking subscription to " << seekArg);
+    std::visit([this](auto&& arg) { LOG_INFO(getName() << "Seeking 
subscription to " << arg); }, seekArg);
 
     auto weakSelf = weak_from_this();
 
-    cnx->sendRequestWithId(seek, requestId)
-        .addListener([this, weakSelf, callback, originalSeekMessageId](Result 
result,
-                                                                       const 
ResponseData& responseData) {
+    cnx->sendRequestWithId(seek, requestId, "SEEK")
+        .addListener([this, weakSelf, previousLastSeekArg](Result result, 
const ResponseData& responseData) {
             auto self = weakSelf.lock();
             if (!self) {
-                callback(result);
                 return;
             }
             if (result == ResultOk) {
-                LOG_INFO(getName() << "Seek successfully");
-                ackGroupingTrackerPtr_->flushAndClean();
-                incomingMessages_.clear();
-                Lock lock(mutexForMessageId_);
-                lastDequedMessageId_ = MessageId::earliest();
-                lock.unlock();
-                if (getCnx().expired()) {
+                LockGuard lock(mutex_);
+                if (getCnx().expired() || reconnectionPending_) {
                     // It's during reconnection, complete the seek future 
after connection is established
                     seekStatus_ = SeekStatus::COMPLETED;
+                    LOG_INFO(getName() << "Delay the seek future until the 
reconnection is done");
                 } else {
-                    if (!hasSoughtByTimestamp()) {
-                        startMessageId_ = seekMessageId_.get();
+                    LOG_INFO(getName() << "Seek successfully");
+                    ackGroupingTrackerPtr_->flushAndClean();
+                    incomingMessages_.clear();
+                    if (lastSeekArg_.has_value() && 
std::holds_alternative<MessageId>(lastSeekArg_.value())) {
+                        startMessageId_ = 
std::get<MessageId>(lastSeekArg_.value());
                     }
-                    seekCallback_.release()(result);
-                }
+                    if (!seekCallback_.has_value()) {
+                        LOG_ERROR(getName() << "Seek callback is not set");
+                        return;
+                    }
+                    executor_->postWork(
+                        [self, callback{std::exchange(seekCallback_, 
std::nullopt).value()}]() {
+                            callback(ResultOk);
+                        });
+                    seekStatus_ = SeekStatus::NOT_STARTED;
+                }  // else: complete the seek future after connection is 
established
             } else {
                 LOG_ERROR(getName() << "Failed to seek: " << result);
-                seekMessageId_ = originalSeekMessageId;
+                LockGuard lock{mutex_};
                 seekStatus_ = SeekStatus::NOT_STARTED;
-                seekCallback_.release()(result);
+                lastSeekArg_ = previousLastSeekArg;
+                executor_->postWork([self, 
callback{std::exchange(seekCallback_, std::nullopt).value()}]() {
+                    callback(ResultOk);
+                });
             }
         });
 }
 
-bool ConsumerImpl::isPriorBatchIndex(int32_t idx) {
-    return config_.isStartMessageIdInclusive() ? idx < 
startMessageId_.get().value().batchIndex()
-                                               : idx <= 
startMessageId_.get().value().batchIndex();
-}
-
-bool ConsumerImpl::isPriorEntryIndex(int64_t idx) {
-    return config_.isStartMessageIdInclusive() ? idx < 
startMessageId_.get().value().entryId()
-                                               : idx <= 
startMessageId_.get().value().entryId();
-}
-
 bool ConsumerImpl::hasEnoughMessagesForBatchReceive() const {
     if (batchReceivePolicy_.getMaxNumMessages() <= 0 && 
batchReceivePolicy_.getMaxNumBytes() <= 0) {
         return false;
@@ -1928,7 +1961,7 @@ void ConsumerImpl::doImmediateAck(const 
ClientConnectionPtr& cnx, const MessageI
         auto requestId = newRequestId();
         cnx->sendRequestWithId(
                Commands::newAck(consumerId_, msgId.ledgerId(), 
msgId.entryId(), ackSet, ackType, requestId),
-               requestId)
+               requestId, "ACK")
             .addListener([callback](Result result, const ResponseData&) {
                 if (callback) {
                     callback(result);
@@ -1958,7 +1991,8 @@ void ConsumerImpl::doImmediateAck(const 
ClientConnectionPtr& cnx, const std::set
     if 
(Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion()))
 {
         if (config_.isAckReceiptEnabled()) {
             auto requestId = newRequestId();
-            cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, 
ackMsgIds, requestId), requestId)
+            cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, 
ackMsgIds, requestId), requestId,
+                                   "ACK")
                 .addListener([callback](Result result, const ResponseData&) {
                     if (callback) {
                         callback(result);
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 63eb51d..0da82a2 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -21,13 +21,15 @@
 
 #include <pulsar/Reader.h>
 
-#include <boost/variant.hpp>
+#include <atomic>
 #include <cstdint>
 #include <functional>
 #include <list>
 #include <memory>
+#include <optional>
 #include <set>
 #include <utility>
+#include <variant>
 
 #include "BrokerConsumerStatsImpl.h"
 #include "Commands.h"
@@ -37,7 +39,6 @@
 #include "MapCache.h"
 #include "MessageIdImpl.h"
 #include "NegativeAcksTracker.h"
-#include "Synchronized.h"
 #include "TestUtil.h"
 #include "TimeUtils.h"
 #include "UnboundedBlockingQueue.h"
@@ -79,9 +80,9 @@ const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ";
 
 enum class SeekStatus : std::uint8_t
 {
-    NOT_STARTED,
-    IN_PROGRESS,
-    COMPLETED
+    NOT_STARTED,  // there is no pending seek RPC so that it's allowed to seek
+    IN_PROGRESS,  // the seek RPC is in progress
+    COMPLETED     // the seek RPC is done but the connection is not 
established yet
 };
 
 class ConsumerImpl : public ConsumerImplBase {
@@ -138,7 +139,9 @@ class ConsumerImpl : public ConsumerImplBase {
     void getBrokerConsumerStatsAsync(const BrokerConsumerStatsCallback& 
callback) override;
     void getLastMessageIdAsync(const BrokerGetLastMessageIdCallback& callback) 
override;
     void seekAsync(const MessageId& msgId, const ResultCallback& callback) 
override;
-    void seekAsync(uint64_t timestamp, const ResultCallback& callback) 
override;
+    using SeekTimestampType = uint64_t;
+    using SeekArg = std::variant<SeekTimestampType, MessageId>;
+    void seekAsync(SeekTimestampType timestamp, const ResultCallback& 
callback) override;
     void negativeAcknowledge(const MessageId& msgId) override;
     bool isConnected() const override;
     uint64_t getNumberOfConnectedConsumer() override;
@@ -191,8 +194,10 @@ class ConsumerImpl : public ConsumerImplBase {
     void drainIncomingMessageQueue(size_t count);
     uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& 
cnx, Message& batchedMessage,
                                                 const BitSet& ackSet, int 
redeliveryCount);
-    bool isPriorBatchIndex(int32_t idx);
-    bool isPriorEntryIndex(int64_t idx);
+    template <typename T>
+    bool isPrior(T index, T startIndex) const noexcept {
+        return config_.isStartMessageIdInclusive() ? (index < startIndex) : 
(index <= startIndex);
+    }
     void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, const 
BrokerConsumerStatsCallback&);
 
     enum class DecryptionResult : uint8_t
@@ -218,19 +223,9 @@ class ConsumerImpl : public ConsumerImplBase {
                                        const BrokerGetLastMessageIdCallback& 
callback);
 
     void clearReceiveQueue();
-    using SeekArg = boost::variant<uint64_t, MessageId>;
-    friend std::ostream& operator<<(std::ostream& os, const SeekArg& seekArg) {
-        auto ptr = boost::get<uint64_t>(&seekArg);
-        if (ptr) {
-            os << *ptr;
-        } else {
-            os << *boost::get<MessageId>(&seekArg);
-        }
-        return os;
-    }
 
     void seekAsyncInternal(long requestId, const SharedBuffer& seek, const 
SeekArg& seekArg,
-                           const ResultCallback& callback);
+                           ResultCallback&& callback);
     void processPossibleToDLQ(const MessageId& messageId, const 
ProcessDLQCallBack& cb);
 
     std::mutex mutexForReceiveWithZeroQueueSize;
@@ -269,19 +264,13 @@ class ConsumerImpl : public ConsumerImplBase {
     std::shared_ptr<Promise<Result, Producer>> deadLetterProducer_;
     std::mutex createProducerLock_;
 
-    // Make the access to `lastDequedMessageId_` and `lastMessageIdInBroker_` 
thread safe
-    mutable std::mutex mutexForMessageId_;
     MessageId lastDequedMessageId_{MessageId::earliest()};
     MessageId lastMessageIdInBroker_{MessageId::earliest()};
+    optional<MessageId> startMessageId_;
 
-    std::atomic<SeekStatus> seekStatus_{SeekStatus::NOT_STARTED};
-    Synchronized<ResultCallback> seekCallback_{[](Result) {}};
-    Synchronized<optional<MessageId>> startMessageId_;
-    Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
-    std::atomic<bool> hasSoughtByTimestamp_{false};
-
-    bool hasSoughtByTimestamp() const { return 
hasSoughtByTimestamp_.load(std::memory_order_acquire); }
-    bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }
+    SeekStatus seekStatus_{SeekStatus::NOT_STARTED};
+    optional<ResultCallback> seekCallback_;
+    optional<SeekArg> lastSeekArg_;
 
     class ChunkedMessageCtx {
        public:
@@ -380,7 +369,8 @@ class ConsumerImpl : public ConsumerImplBase {
                                                const ClientConnectionPtr& cnx, 
MessageId& messageId);
 
     bool hasMoreMessages() const {
-        std::lock_guard<std::mutex> lock{mutexForMessageId_};
+        LockGuard lock{mutex_};
+
         if (lastMessageIdInBroker_.entryId() == -1L) {
             return false;
         }
@@ -388,7 +378,7 @@ class ConsumerImpl : public ConsumerImplBase {
         const auto inclusive = config_.isStartMessageIdInclusive();
         if (lastDequedMessageId_ == MessageId::earliest()) {
             // If startMessageId_ is none, use latest so that this method will 
return false
-            const auto startMessageId = 
startMessageId_.get().value_or(MessageId::latest());
+            const auto startMessageId = 
startMessageId_.value_or(MessageId::latest());
             return inclusive ? (lastMessageIdInBroker_ >= startMessageId)
                              : (lastMessageIdInBroker_ > startMessageId);
         } else {
@@ -396,11 +386,22 @@ class ConsumerImpl : public ConsumerImplBase {
         }
     }
 
+    auto getStartMessageId() const {
+        LockGuard lock{mutex_};
+        return startMessageId_;
+    }
+    auto setLastDequedMessageId(const MessageId& messageId) {
+        LockGuard lock{mutex_};
+        lastDequedMessageId_ = messageId;
+    }
+
     void doImmediateAck(const ClientConnectionPtr& cnx, const MessageId& 
msgId, CommandAck_AckType ackType,
                         const ResultCallback& callback);
     void doImmediateAck(const ClientConnectionPtr& cnx, const 
std::set<MessageId>& msgIds,
                         const ResultCallback& callback);
 
+    using LockGuard = std::lock_guard<std::mutex>;
+
     friend class PulsarFriend;
     friend class MultiTopicsConsumerImpl;
 
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 37c6e2d..1a4f573 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -44,9 +44,9 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const 
std::string& topic,
       state_(NotStarted),
       backoff_(backoff),
       epoch_(0),
+      reconnectionPending_(false),
       timer_(executor_->createDeadlineTimer()),
       creationTimer_(executor_->createDeadlineTimer()),
-      reconnectionPending_(false),
       redirectedClusterURI_("") {}
 
 HandlerBase::~HandlerBase() {
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index acce15d..0e733f0 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -143,6 +143,7 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
     std::atomic<State> state_;
     Backoff backoff_;
     uint64_t epoch_;
+    std::atomic<bool> reconnectionPending_;
 
     Result convertToTimeoutIfNecessary(Result result, ptime startTimestamp) 
const;
 
@@ -160,7 +161,6 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
     DeadlineTimerPtr creationTimer_;
 
     mutable std::mutex connectionMutex_;
-    std::atomic<bool> reconnectionPending_;
     ClientConnectionWeakPtr connection_;
     std::string redirectedClusterURI_;
     std::atomic<long> firstRequestIdAfterConnect_{-1L};
diff --git a/lib/MockServer.h b/lib/MockServer.h
new file mode 100644
index 0000000..8e4a213
--- /dev/null
+++ b/lib/MockServer.h
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <initializer_list>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "ClientConnection.h"
+#include "ConsumerImpl.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "PulsarApi.pb.h"
+
+namespace pulsar {
+
+class MockServer : public std::enable_shared_from_this<MockServer> {
+   public:
+    using RequestDelayType = std::unordered_map<std::string, long /* delay in 
milliseconds */>;
+
+    MockServer(const ClientConnectionPtr& connection) : 
connection_(connection) {
+        requestDelays_["CLOSE_CONSUMER"] = 1;
+    }
+
+    void setRequestDelay(std::initializer_list<typename 
RequestDelayType::value_type> delays) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        for (auto&& delay : delays) {
+            requestDelays_[delay.first] = delay.second;
+        }
+    }
+
+    bool sendRequest(const std::string& request, uint64_t requestId) {
+        auto connection = connection_.lock();
+        if (!connection) {
+            return false;
+        }
+        std::lock_guard<std::mutex> lock(mutex_);
+        if (auto iter = requestDelays_.find(request); iter != 
requestDelays_.end()) {
+            // Mock the `CLOSE_CONSUMER` command sent by broker, for 
simplicity, disconnect all consumers
+            if (request == "SEEK") {
+                schedule(connection, "CLOSE_CONSUMER" + 
std::to_string(requestId),
+                         requestDelays_["CLOSE_CONSUMER"], [connection] {
+                             std::vector<uint64_t> consumerIds;
+                             {
+                                 std::lock_guard<std::mutex> 
lock{connection->mutex_};
+                                 for (auto&& kv : connection->consumers_) {
+                                     if (auto consumer = kv.second.lock()) {
+                                         
consumerIds.push_back(consumer->getConsumerId());
+                                     }
+                                 }
+                             }
+                             for (auto consumerId : consumerIds) {
+                                 proto::CommandCloseConsumer closeConsumerCmd;
+                                 closeConsumerCmd.set_consumer_id(consumerId);
+                                 
connection->handleCloseConsumer(closeConsumerCmd);
+                             }
+                         });
+            }
+            schedule(connection, request + std::to_string(requestId), 
iter->second, [connection, requestId] {
+                proto::CommandSuccess success;
+                success.set_request_id(requestId);
+                connection->handleSuccess(success);
+            });
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    // Return the number of pending timers cancelled
+    auto close() {
+        std::lock_guard<std::mutex> lock(mutex_);
+        auto result = pendingTimers_.size();
+        for (auto&& kv : pendingTimers_) {
+            try {
+                LOG_INFO("Cancelling timer for " << kv.first);
+                kv.second->cancel();
+            } catch (...) {
+                LOG_WARN("Failed to cancel timer for " << kv.first);
+            }
+        }
+        pendingTimers_.clear();
+        return result;
+    }
+
+   private:
+    mutable std::mutex mutex_;
+    std::unordered_map<std::string, long> requestDelays_;
+    std::unordered_map<std::string, DeadlineTimerPtr> pendingTimers_;
+    ClientConnectionWeakPtr connection_;
+
+    void schedule(ClientConnectionPtr& connection, const std::string& key, 
long delayMs,
+                  std::function<void()>&& task) {
+        auto timer = connection->executor_->createDeadlineTimer();
+        pendingTimers_[key] = timer;
+        timer->expires_from_now(std::chrono::milliseconds(delayMs));
+        LOG_INFO("Mock scheduling " << key << " with delay " << delayMs << " 
ms");
+        auto self = shared_from_this();
+        timer->async_wait([this, self, key, connection, 
task{std::move(task)}](const auto& ec) {
+            {
+                std::lock_guard<std::mutex> lock(mutex_);
+                pendingTimers_.erase(key);
+            }
+            if (ec) {
+                LOG_INFO("Timer cancelled for " << key);
+                return;
+            }
+            if (connection->isClosed()) {
+                LOG_INFO("Connection is closed, not completing request " << 
key);
+                return;
+            }
+            LOG_INFO("Completing delayed request " << key);
+            task();
+        });
+    }
+
+    DECLARE_LOG_OBJECT()
+};
+
+}  // namespace pulsar
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 9d6a9a0..7fd14c7 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -160,7 +160,7 @@ Future<Result, bool> ProducerImpl::connectionOpened(const 
ClientConnectionPtr& c
     // Keep a reference to ensure object is kept alive.
     auto self = shared_from_this();
     setFirstRequestIdAfterConnect(requestId);
-    cnx->sendRequestWithId(cmd, requestId)
+    cnx->sendRequestWithId(cmd, requestId, "PRODUCER")
         .addListener([this, self, cnx, promise](Result result, const 
ResponseData& responseData) {
             Result handleResult = handleCreateProducer(cnx, result, 
responseData);
             if (handleResult == ResultOk) {
@@ -204,7 +204,8 @@ Result ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result
             auto client = client_.lock();
             if (client) {
                 int requestId = client->newRequestId();
-                cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, 
requestId), requestId);
+                cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, 
requestId), requestId,
+                                       "CLOSE_PRODUCER");
             }
         }
         if (!producerCreatedPromise_.isComplete()) {
@@ -266,7 +267,8 @@ Result ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result
             auto client = client_.lock();
             if (client) {
                 int requestId = client->newRequestId();
-                cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, 
requestId), requestId);
+                cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, 
requestId), requestId,
+                                       "CLOSE_PRODUCER");
             }
         }
 
@@ -818,7 +820,7 @@ void ProducerImpl::closeAsync(CloseCallback 
originalCallback) {
 
     int requestId = client->newRequestId();
     auto self = shared_from_this();
-    cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), 
requestId)
+    cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), 
requestId, "CLOSE_PRODUCER")
         .addListener([self, callback](Result result, const ResponseData&) { 
callback(result); });
 }
 
diff --git a/lib/Synchronized.h b/lib/Synchronized.h
deleted file mode 100644
index 5449a9f..0000000
--- a/lib/Synchronized.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#pragma once
-
-#include <mutex>
-
-template <typename T>
-class Synchronized {
-   public:
-    explicit Synchronized(const T& value) : value_(value) {}
-
-    T get() const {
-        std::lock_guard<std::mutex> lock(mutex_);
-        return value_;
-    }
-
-    T&& release() {
-        std::lock_guard<std::mutex> lock(mutex_);
-        return std::move(value_);
-    }
-
-    Synchronized& operator=(const T& value) {
-        std::lock_guard<std::mutex> lock(mutex_);
-        value_ = value;
-        return *this;
-    }
-
-   private:
-    T value_;
-    mutable std::mutex mutex_;
-};
diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc
index f03ea5e..f66c27d 100644
--- a/tests/ConsumerSeekTest.cc
+++ b/tests/ConsumerSeekTest.cc
@@ -19,12 +19,18 @@
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
 
+#include <chrono>
+#include <future>
+#include <memory>
 #include <set>
 #include <stdexcept>
 #include <string>
 
 #include "HttpHelper.h"
+#include "lib/ClientConnection.h"
 #include "lib/LogUtils.h"
+#include "lib/MockServer.h"
+#include "tests/PulsarFriend.h"
 
 DECLARE_LOG_OBJECT()
 
@@ -200,6 +206,58 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) {
     ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest()));
 }
 
+static void assertSeekWithTimeout(Consumer& consumer) {
+    using namespace std::chrono_literals;
+    auto promise = std::make_shared<std::promise<Result>>();
+    std::weak_ptr<std::promise<Result>> weakPromise = promise;
+    consumer.seekAsync(0L, [weakPromise](Result result) {
+        if (auto promise = weakPromise.lock()) {
+            promise->set_value(result);
+        }
+    });
+    auto future = promise->get_future();
+    ASSERT_EQ(future.wait_for(5s), std::future_status::ready);
+    ASSERT_EQ(future.get(), ResultOk);
+}
+
+// Verify the `seek` method won't be blocked forever in any order of the 
Subscribe response and Seek response
+TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) {
+    Client client(lookupUrl);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", 
consumer));
+
+    auto connection = *PulsarFriend::getConnections(client).begin();
+    auto mockServer = std::make_shared<MockServer>(connection);
+    connection->attachMockServer(mockServer);
+
+    mockServer->setRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 500}});
+    assertSeekWithTimeout(consumer);
+
+    mockServer->setRequestDelay({{"SUBSCRIBE", 500}, {"SEEK", 1000}});
+    assertSeekWithTimeout(consumer);
+
+    ASSERT_EQ(mockServer->close(), 0);
+    client.close();
+}
+
+TEST_F(ConsumerSeekTest, testReconnectionSlow) {
+    Client client(lookupUrl, 
ClientConfiguration().setInitialBackoffIntervalMs(500));
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe("testReconnectionSlow", "sub", 
consumer));
+
+    auto connection = *PulsarFriend::getConnections(client).begin();
+    auto mockServer = std::make_shared<MockServer>(connection);
+    connection->attachMockServer(mockServer);
+
+    // Make seek response received before `connectionOpened` is called
+    mockServer->setRequestDelay({{"SEEK", 500}, {"CLOSE_CONSUMER", 1000}});
+    assertSeekWithTimeout(consumer);
+
+    // The CLOSE_CONSUMER request is in still flight
+    ASSERT_EQ(mockServer->close(), 1);
+    client.close();
+}
+
 INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, 
false));
 
 }  // namespace pulsar

Reply via email to