Copilot commented on code in PR #533:
URL: https://github.com/apache/pulsar-client-cpp/pull/533#discussion_r2693924290


##########
lib/ConsumerImpl.cc:
##########
@@ -234,18 +237,19 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& c
 
     // Register consumer so that we can handle other incomming commands (e.g. 
ACTIVE_CONSUMER_CHANGE) after

Review Comment:
   Spelling error in comment: "incomming" should be "incoming".
   ```suggestion
       // Register consumer so that we can handle other incoming commands (e.g. 
ACTIVE_CONSUMER_CHANGE) after
   ```



##########
tests/ConsumerSeekTest.cc:
##########
@@ -200,6 +206,57 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) {
     ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest()));
 }
 
+static void assertSeekWithTimeout(Consumer& consumer) {
+    using namespace std::chrono_literals;
+    auto promise = std::make_shared<std::promise<Result>>();
+    std::weak_ptr<std::promise<Result>> weakPromise = promise;
+    consumer.seekAsync(0L, [weakPromise](Result result) {
+        if (auto promise = weakPromise.lock()) {
+            promise->set_value(result);
+        }
+    });
+    auto future = promise->get_future();
+    ASSERT_EQ(future.wait_for(5s), std::future_status::ready);
+    ASSERT_EQ(future.get(), ResultOk);
+}
+
+// Verify the `seek` method won't be blocked forever in any order of the 
Subscribe response and Seek response
+TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) {
+    Client client(lookupUrl);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub", 
consumer));
+
+    auto connection = *PulsarFriend::getConnections(client).begin();
+    auto mockServer = std::make_shared<MockServer>(connection);
+    connection->attachMockServer(mockServer);
+
+    mockServer->setRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 500}});
+    assertSeekWithTimeout(consumer);
+
+    mockServer->setRequestDelay({{"SUBSCRIBE", 500}, {"SEEK", 1000}});
+    assertSeekWithTimeout(consumer);
+
+    ASSERT_EQ(mockServer->close(), 0);

Review Comment:
   The test expects that close() returns 0, but after the first 
assertSeekWithTimeout completes, the timers should have been triggered and 
removed from pendingTimers_. This means close() should return 0, but the 
assertion may fail if there are leftover timers. Consider verifying this is the 
expected behavior or add a comment explaining why 0 is expected.



##########
lib/MockServer.h:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <initializer_list>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+#include <vector>
+
+#include "ClientConnection.h"
+#include "ConsumerImpl.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "PulsarApi.pb.h"
+
+namespace pulsar {
+
+class MockServer : public std::enable_shared_from_this<MockServer> {
+   public:
+    using RequestDelayType = std::unordered_map<std::string, long /* delay in 
milliseconds */>;
+
+    MockServer(const ClientConnectionPtr& connection) : 
connection_(connection) {}
+
+    void setRequestDelay(std::initializer_list<typename 
RequestDelayType::value_type> delays) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        for (auto&& delay : delays) {
+            requestDelays_[delay.first] = delay.second;
+        }
+    }
+
+    bool sendRequest(const std::string& request, uint64_t requestId) {
+        auto connection = connection_.lock();
+        if (!connection) {
+            return false;
+        }
+        std::lock_guard<std::mutex> lock(mutex_);
+        if (auto iter = requestDelays_.find(request); iter != 
requestDelays_.end()) {
+            // Mock the `CLOSE_CONSUMER` command sent by broker, for 
simplicity, disconnect all consumers
+            if (request == "SEEK") {
+                connection->executor_->postWork([connection] {
+                    std::vector<uint64_t> consumerIds;
+                    {
+                        std::lock_guard<std::mutex> lock{connection->mutex_};
+                        for (auto&& kv : connection->consumers_) {
+                            if (auto consumer = kv.second.lock()) {
+                                
consumerIds.push_back(consumer->getConsumerId());
+                            }
+                        }
+                    }
+                    for (auto consumerId : consumerIds) {
+                        proto::CommandCloseConsumer closeConsumerCmd;
+                        closeConsumerCmd.set_consumer_id(consumerId);
+                        connection->handleCloseConsumer(closeConsumerCmd);
+                    }
+                });
+            }
+            long delayMs = iter->second;
+            auto timer = connection->executor_->createDeadlineTimer();
+            auto key = request + std::to_string(requestId);
+            pendingTimers_[key] = timer;
+            timer->expires_from_now(std::chrono::milliseconds(delayMs));
+
+            LOG_INFO("Mock sending request " << key << " with delay " << 
delayMs << " ms");
+            auto self = shared_from_this();
+            timer->async_wait([this, self, key, connection, requestId, 
timer](const auto& ec) {
+                {
+                    std::lock_guard<std::mutex> lock(mutex_);
+                    pendingTimers_.erase(key);
+                }
+                if (ec) {
+                    LOG_INFO("Timer cancelled for request " << key);
+                    return;
+                }
+                if (connection->isClosed()) {
+                    LOG_INFO("Connection is closed, not completing request " 
<< key);
+                    return;
+                }
+                LOG_INFO("Completing delayed request " << key);
+                proto::CommandSuccess success;
+                success.set_request_id(requestId);
+                connection->handleSuccess(success);
+            });
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    // Return the number of pending timers cancelled
+    auto close() {
+        std::lock_guard<std::mutex> lock(mutex_);
+        auto result = pendingTimers_.size();
+        for (auto&& kv : pendingTimers_) {
+            try {
+                LOG_INFO("Cancelling timer for " << kv.first);
+                kv.second->cancel();
+            } catch (...) {
+                LOG_WARN("Failed to cancel timer for " << kv.first);
+            }
+        }

Review Comment:
   After cancelling timers in the close() method, pendingTimers_ should be 
cleared to avoid holding references to cancelled timers. Without clearing the 
map, the cancelled timers remain in memory until the MockServer is destroyed.
   ```suggestion
           }
           pendingTimers_.clear();
   ```



##########
lib/MockServer.h:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <initializer_list>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+#include <vector>
+
+#include "ClientConnection.h"
+#include "ConsumerImpl.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "PulsarApi.pb.h"
+
+namespace pulsar {
+
+class MockServer : public std::enable_shared_from_this<MockServer> {
+   public:
+    using RequestDelayType = std::unordered_map<std::string, long /* delay in 
milliseconds */>;
+
+    MockServer(const ClientConnectionPtr& connection) : 
connection_(connection) {}
+
+    void setRequestDelay(std::initializer_list<typename 
RequestDelayType::value_type> delays) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        for (auto&& delay : delays) {
+            requestDelays_[delay.first] = delay.second;
+        }
+    }
+
+    bool sendRequest(const std::string& request, uint64_t requestId) {
+        auto connection = connection_.lock();
+        if (!connection) {
+            return false;
+        }
+        std::lock_guard<std::mutex> lock(mutex_);
+        if (auto iter = requestDelays_.find(request); iter != 
requestDelays_.end()) {
+            // Mock the `CLOSE_CONSUMER` command sent by broker, for 
simplicity, disconnect all consumers
+            if (request == "SEEK") {
+                connection->executor_->postWork([connection] {
+                    std::vector<uint64_t> consumerIds;
+                    {
+                        std::lock_guard<std::mutex> lock{connection->mutex_};
+                        for (auto&& kv : connection->consumers_) {
+                            if (auto consumer = kv.second.lock()) {
+                                
consumerIds.push_back(consumer->getConsumerId());
+                            }
+                        }
+                    }
+                    for (auto consumerId : consumerIds) {
+                        proto::CommandCloseConsumer closeConsumerCmd;
+                        closeConsumerCmd.set_consumer_id(consumerId);
+                        connection->handleCloseConsumer(closeConsumerCmd);
+                    }
+                });

Review Comment:
   The mutex_ is held while calling connection->executor_->postWork(), which 
can potentially cause contention if the postWork call takes time. Consider 
releasing the mutex before calling postWork or restructuring the code to 
minimize the critical section. The postWork call doesn't need the mutex 
protection since the connection is already locked from the weak_ptr.



##########
lib/ConsumerImpl.cc:
##########
@@ -259,14 +263,23 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& c
     // Keep a reference to ensure object is kept alive.
     auto self = get_shared_this_ptr();
     setFirstRequestIdAfterConnect(requestId);
-    cnx->sendRequestWithId(cmd, requestId)
+    cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE")
         .addListener([this, self, cnx, promise](Result result, const 
ResponseData& responseData) {
             Result handleResult = handleCreateConsumer(cnx, result);
             if (handleResult == ResultOk) {
                 promise.setSuccess();
             } else {
                 promise.setFailed(handleResult);
             }
+            // Complete the seek callback after completing `promise`, 
otherwise `reconnectionPending_` will
+            // still be true when the seek operation is done.
+            LockGuard lock{mutex_};
+            if (seekStatus_ == SeekStatus::COMPLETED) {
+                executor_->postWork([seekCallback{std::exchange(seekCallback_, 
std::nullopt).value()}]() {
+                    seekCallback(ResultOk);
+                });
+                seekStatus_ = SeekStatus::NOT_STARTED;
+            }

Review Comment:
   If the handleResult is not ResultOk (i.e., handleCreateConsumer fails), but 
seekStatus_ is COMPLETED, the code will still try to execute the seek callback. 
However, the promise is set to failed, which means reconnection should happen 
again. The seekCallback should only be executed when handleResult is ResultOk 
to ensure proper synchronization between subscription success and seek 
completion.
   ```suggestion
                   // Complete the seek callback after completing `promise`, 
otherwise `reconnectionPending_` will
                   // still be true when the seek operation is done.
                   LockGuard lock{mutex_};
                   if (seekStatus_ == SeekStatus::COMPLETED && seekCallback_) {
                       auto cb = std::exchange(seekCallback_, std::nullopt);
                       executor_->postWork([seekCallback = std::move(*cb)]() {
                           seekCallback(ResultOk);
                       });
                       seekStatus_ = SeekStatus::NOT_STARTED;
                   }
               } else {
                   promise.setFailed(handleResult);
               }
   ```



##########
lib/MockServer.h:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <initializer_list>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+#include <vector>
+
+#include "ClientConnection.h"
+#include "ConsumerImpl.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "PulsarApi.pb.h"
+
+namespace pulsar {
+
+class MockServer : public std::enable_shared_from_this<MockServer> {
+   public:
+    using RequestDelayType = std::unordered_map<std::string, long /* delay in 
milliseconds */>;
+
+    MockServer(const ClientConnectionPtr& connection) : 
connection_(connection) {}
+
+    void setRequestDelay(std::initializer_list<typename 
RequestDelayType::value_type> delays) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        for (auto&& delay : delays) {
+            requestDelays_[delay.first] = delay.second;
+        }
+    }
+
+    bool sendRequest(const std::string& request, uint64_t requestId) {
+        auto connection = connection_.lock();
+        if (!connection) {
+            return false;
+        }
+        std::lock_guard<std::mutex> lock(mutex_);
+        if (auto iter = requestDelays_.find(request); iter != 
requestDelays_.end()) {
+            // Mock the `CLOSE_CONSUMER` command sent by broker, for 
simplicity, disconnect all consumers
+            if (request == "SEEK") {
+                connection->executor_->postWork([connection] {
+                    std::vector<uint64_t> consumerIds;
+                    {
+                        std::lock_guard<std::mutex> lock{connection->mutex_};
+                        for (auto&& kv : connection->consumers_) {
+                            if (auto consumer = kv.second.lock()) {
+                                
consumerIds.push_back(consumer->getConsumerId());
+                            }
+                        }
+                    }
+                    for (auto consumerId : consumerIds) {
+                        proto::CommandCloseConsumer closeConsumerCmd;
+                        closeConsumerCmd.set_consumer_id(consumerId);
+                        connection->handleCloseConsumer(closeConsumerCmd);
+                    }
+                });
+            }
+            long delayMs = iter->second;
+            auto timer = connection->executor_->createDeadlineTimer();
+            auto key = request + std::to_string(requestId);
+            pendingTimers_[key] = timer;
+            timer->expires_from_now(std::chrono::milliseconds(delayMs));
+
+            LOG_INFO("Mock sending request " << key << " with delay " << 
delayMs << " ms");
+            auto self = shared_from_this();
+            timer->async_wait([this, self, key, connection, requestId, 
timer](const auto& ec) {

Review Comment:
   The timer is captured in the lambda but is not used within it. This creates 
unnecessary reference counting overhead. The timer should be removed from the 
capture list since it's not needed after the timer is started - the timer will 
remain alive through the executor's internal reference.
   ```suggestion
               timer->async_wait([this, self, key, connection, requestId](const 
auto& ec) {
   ```



##########
lib/ConsumerImpl.cc:
##########
@@ -1723,78 +1747,88 @@ bool ConsumerImpl::isConnected() const { return 
!getCnx().expired() && state_ ==
 uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 
1 : 0; }
 
 void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, 
const SeekArg& seekArg,
-                                     const ResultCallback& callback) {
+                                     ResultCallback&& callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         LOG_ERROR(getName() << " Client Connection not ready for Consumer");
         callback(ResultNotConnected);
         return;
     }
-
-    auto expected = SeekStatus::NOT_STARTED;
-    if (!seekStatus_.compare_exchange_strong(expected, 
SeekStatus::IN_PROGRESS)) {
-        LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the 
status is "
-                            << static_cast<int>(expected));
+    bool hasPendingSeek = false;
+    // Save the previous last seek arg in case seek failed
+    decltype(lastSeekArg_) previousLastSeekArg;
+    {
+        std::lock_guard<std::mutex> lock(mutex_);
+        if (seekStatus_ != SeekStatus::NOT_STARTED) {
+            hasPendingSeek = true;
+        } else {
+            seekStatus_ = SeekStatus::IN_PROGRESS;
+            if (seekCallback_.has_value()) {
+                // This should never happen
+                LOG_ERROR(getName() << "Previous seek callback is not 
triggered unexpectedly");
+                executor_->postWork([callback{std::exchange(seekCallback_, 
std::nullopt).value()}] {
+                    callback(ResultTimeout);
+                });
+            }
+            seekCallback_ = std::move(callback);
+            previousLastSeekArg = lastSeekArg_;
+            lastSeekArg_ = seekArg;
+        }
+    }
+    if (hasPendingSeek) {
+        std::visit(
+            [this](auto&& arg) {
+                LOG_ERROR(getName() << "Attempted to seek " << arg << " when 
there is a pending seek");
+            },
+            seekArg);
         callback(ResultNotAllowedError);
         return;
     }
 
-    const auto originalSeekMessageId = seekMessageId_.get();
-    if (boost::get<uint64_t>(&seekArg)) {
-        hasSoughtByTimestamp_.store(true, std::memory_order_release);
-    } else {
-        seekMessageId_ = *boost::get<MessageId>(&seekArg);
-        hasSoughtByTimestamp_.store(false, std::memory_order_release);
-    }
-    seekStatus_ = SeekStatus::IN_PROGRESS;
-    seekCallback_ = callback;
-    LOG_INFO(getName() << " Seeking subscription to " << seekArg);
+    std::visit([this](auto&& arg) { LOG_INFO(getName() << "Seeking 
subscription to " << arg); }, seekArg);
 
     auto weakSelf = weak_from_this();
 
-    cnx->sendRequestWithId(seek, requestId)
-        .addListener([this, weakSelf, callback, originalSeekMessageId](Result 
result,
-                                                                       const 
ResponseData& responseData) {
+    cnx->sendRequestWithId(seek, requestId, "SEEK")
+        .addListener([this, weakSelf, previousLastSeekArg](Result result, 
const ResponseData& responseData) {
             auto self = weakSelf.lock();
             if (!self) {
-                callback(result);
                 return;
             }
             if (result == ResultOk) {
-                LOG_INFO(getName() << "Seek successfully");
-                ackGroupingTrackerPtr_->flushAndClean();
-                incomingMessages_.clear();
-                Lock lock(mutexForMessageId_);
-                lastDequedMessageId_ = MessageId::earliest();
-                lock.unlock();
-                if (getCnx().expired()) {
+                LockGuard lock(mutex_);
+                if (getCnx().expired() || reconnectionPending_) {
                     // It's during reconnection, complete the seek future 
after connection is established
                     seekStatus_ = SeekStatus::COMPLETED;
+                    LOG_INFO(getName() << "Delay the seek future until the 
reconnection is done");
                 } else {
-                    if (!hasSoughtByTimestamp()) {
-                        startMessageId_ = seekMessageId_.get();
+                    LOG_INFO(getName() << "Seek successfully");
+                    ackGroupingTrackerPtr_->flushAndClean();
+                    incomingMessages_.clear();
+                    if (lastSeekArg_.has_value() && 
std::holds_alternative<MessageId>(lastSeekArg_.value())) {
+                        startMessageId_ = 
std::get<MessageId>(lastSeekArg_.value());
                     }
-                    seekCallback_.release()(result);
-                }
+                    if (!seekCallback_.has_value()) {
+                        LOG_ERROR(getName() << "Seek callback is not set");
+                        return;
+                    }
+                    executor_->postWork(
+                        [self, callback{std::exchange(seekCallback_, 
std::nullopt).value()}]() {
+                            callback(ResultOk);
+                        });
+                    seekStatus_ = SeekStatus::NOT_STARTED;
+                }  // else: complete the seek future after connection is 
established
             } else {
                 LOG_ERROR(getName() << "Failed to seek: " << result);
-                seekMessageId_ = originalSeekMessageId;
-                seekStatus_ = SeekStatus::NOT_STARTED;
-                seekCallback_.release()(result);
+                LockGuard lock{mutex_};
+                lastSeekArg_ = previousLastSeekArg;
+                executor_->postWork([self, 
callback{std::exchange(seekCallback_, std::nullopt).value()}]() {
+                    callback(ResultOk);
+                });

Review Comment:
   When the seek operation fails, the seekStatus_ is not reset to NOT_STARTED, 
which means subsequent seek attempts will be blocked. The seekStatus_ should be 
reset to NOT_STARTED in the error handling path to allow future seek operations.
   ```suggestion
                   seekStatus_ = SeekStatus::NOT_STARTED;
                   executor_->postWork(
                       [self, result, callback{std::exchange(seekCallback_, 
std::nullopt).value()}]() {
                           callback(result);
                       });
   ```



##########
lib/ConsumerImpl.cc:
##########
@@ -1723,78 +1747,88 @@ bool ConsumerImpl::isConnected() const { return 
!getCnx().expired() && state_ ==
 uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 
1 : 0; }
 
 void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, 
const SeekArg& seekArg,
-                                     const ResultCallback& callback) {
+                                     ResultCallback&& callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         LOG_ERROR(getName() << " Client Connection not ready for Consumer");
         callback(ResultNotConnected);
         return;
     }
-
-    auto expected = SeekStatus::NOT_STARTED;
-    if (!seekStatus_.compare_exchange_strong(expected, 
SeekStatus::IN_PROGRESS)) {
-        LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the 
status is "
-                            << static_cast<int>(expected));
+    bool hasPendingSeek = false;
+    // Save the previous last seek arg in case seek failed
+    decltype(lastSeekArg_) previousLastSeekArg;
+    {
+        std::lock_guard<std::mutex> lock(mutex_);
+        if (seekStatus_ != SeekStatus::NOT_STARTED) {
+            hasPendingSeek = true;
+        } else {
+            seekStatus_ = SeekStatus::IN_PROGRESS;
+            if (seekCallback_.has_value()) {
+                // This should never happen
+                LOG_ERROR(getName() << "Previous seek callback is not 
triggered unexpectedly");
+                executor_->postWork([callback{std::exchange(seekCallback_, 
std::nullopt).value()}] {
+                    callback(ResultTimeout);
+                });
+            }
+            seekCallback_ = std::move(callback);
+            previousLastSeekArg = lastSeekArg_;
+            lastSeekArg_ = seekArg;
+        }
+    }
+    if (hasPendingSeek) {
+        std::visit(
+            [this](auto&& arg) {
+                LOG_ERROR(getName() << "Attempted to seek " << arg << " when 
there is a pending seek");
+            },
+            seekArg);
         callback(ResultNotAllowedError);
         return;
     }
 
-    const auto originalSeekMessageId = seekMessageId_.get();
-    if (boost::get<uint64_t>(&seekArg)) {
-        hasSoughtByTimestamp_.store(true, std::memory_order_release);
-    } else {
-        seekMessageId_ = *boost::get<MessageId>(&seekArg);
-        hasSoughtByTimestamp_.store(false, std::memory_order_release);
-    }
-    seekStatus_ = SeekStatus::IN_PROGRESS;
-    seekCallback_ = callback;
-    LOG_INFO(getName() << " Seeking subscription to " << seekArg);
+    std::visit([this](auto&& arg) { LOG_INFO(getName() << "Seeking 
subscription to " << arg); }, seekArg);
 
     auto weakSelf = weak_from_this();
 
-    cnx->sendRequestWithId(seek, requestId)
-        .addListener([this, weakSelf, callback, originalSeekMessageId](Result 
result,
-                                                                       const 
ResponseData& responseData) {
+    cnx->sendRequestWithId(seek, requestId, "SEEK")
+        .addListener([this, weakSelf, previousLastSeekArg](Result result, 
const ResponseData& responseData) {
             auto self = weakSelf.lock();
             if (!self) {
-                callback(result);
                 return;
             }
             if (result == ResultOk) {
-                LOG_INFO(getName() << "Seek successfully");
-                ackGroupingTrackerPtr_->flushAndClean();
-                incomingMessages_.clear();
-                Lock lock(mutexForMessageId_);
-                lastDequedMessageId_ = MessageId::earliest();
-                lock.unlock();
-                if (getCnx().expired()) {
+                LockGuard lock(mutex_);
+                if (getCnx().expired() || reconnectionPending_) {
                     // It's during reconnection, complete the seek future 
after connection is established
                     seekStatus_ = SeekStatus::COMPLETED;
+                    LOG_INFO(getName() << "Delay the seek future until the 
reconnection is done");
                 } else {
-                    if (!hasSoughtByTimestamp()) {
-                        startMessageId_ = seekMessageId_.get();
+                    LOG_INFO(getName() << "Seek successfully");
+                    ackGroupingTrackerPtr_->flushAndClean();
+                    incomingMessages_.clear();
+                    if (lastSeekArg_.has_value() && 
std::holds_alternative<MessageId>(lastSeekArg_.value())) {
+                        startMessageId_ = 
std::get<MessageId>(lastSeekArg_.value());
                     }
-                    seekCallback_.release()(result);
-                }
+                    if (!seekCallback_.has_value()) {
+                        LOG_ERROR(getName() << "Seek callback is not set");
+                        return;
+                    }
+                    executor_->postWork(
+                        [self, callback{std::exchange(seekCallback_, 
std::nullopt).value()}]() {
+                            callback(ResultOk);
+                        });
+                    seekStatus_ = SeekStatus::NOT_STARTED;
+                }  // else: complete the seek future after connection is 
established
             } else {
                 LOG_ERROR(getName() << "Failed to seek: " << result);
-                seekMessageId_ = originalSeekMessageId;
-                seekStatus_ = SeekStatus::NOT_STARTED;
-                seekCallback_.release()(result);
+                LockGuard lock{mutex_};
+                lastSeekArg_ = previousLastSeekArg;
+                executor_->postWork([self, 
callback{std::exchange(seekCallback_, std::nullopt).value()}]() {
+                    callback(ResultOk);
+                });

Review Comment:
   In the error handling path when seek fails (result != ResultOk), the 
callback is being invoked with ResultOk instead of the actual error result. 
This means that even when the seek operation fails, the callback will report 
success to the caller. The callback should be invoked with the actual error 
result that was received.
   ```suggestion
                   executor_->postWork(
                       [self, result, callback{std::exchange(seekCallback_, 
std::nullopt).value()}]() {
                           callback(result);
                       });
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to