Copilot commented on code in PR #533:
URL: https://github.com/apache/pulsar-client-cpp/pull/533#discussion_r2693924290
##########
lib/ConsumerImpl.cc:
##########
@@ -234,18 +237,19 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const
ClientConnectionPtr& c
// Register consumer so that we can handle other incomming commands (e.g.
ACTIVE_CONSUMER_CHANGE) after
Review Comment:
Spelling error in comment: "incomming" should be "incoming".
```suggestion
// Register consumer so that we can handle other incoming commands (e.g.
ACTIVE_CONSUMER_CHANGE) after
```
##########
tests/ConsumerSeekTest.cc:
##########
@@ -200,6 +206,57 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) {
ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest()));
}
+static void assertSeekWithTimeout(Consumer& consumer) {
+ using namespace std::chrono_literals;
+ auto promise = std::make_shared<std::promise<Result>>();
+ std::weak_ptr<std::promise<Result>> weakPromise = promise;
+ consumer.seekAsync(0L, [weakPromise](Result result) {
+ if (auto promise = weakPromise.lock()) {
+ promise->set_value(result);
+ }
+ });
+ auto future = promise->get_future();
+ ASSERT_EQ(future.wait_for(5s), std::future_status::ready);
+ ASSERT_EQ(future.get(), ResultOk);
+}
+
+// Verify the `seek` method won't be blocked forever in any order of the
Subscribe response and Seek response
+TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) {
+ Client client(lookupUrl);
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub",
consumer));
+
+ auto connection = *PulsarFriend::getConnections(client).begin();
+ auto mockServer = std::make_shared<MockServer>(connection);
+ connection->attachMockServer(mockServer);
+
+ mockServer->setRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 500}});
+ assertSeekWithTimeout(consumer);
+
+ mockServer->setRequestDelay({{"SUBSCRIBE", 500}, {"SEEK", 1000}});
+ assertSeekWithTimeout(consumer);
+
+ ASSERT_EQ(mockServer->close(), 0);
Review Comment:
The test expects that close() returns 0, but after the first
assertSeekWithTimeout completes, the timers should have been triggered and
removed from pendingTimers_. This means close() should return 0, but the
assertion may fail if there are leftover timers. Consider verifying this is the
expected behavior or add a comment explaining why 0 is expected.
##########
lib/MockServer.h:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <initializer_list>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+#include <vector>
+
+#include "ClientConnection.h"
+#include "ConsumerImpl.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "PulsarApi.pb.h"
+
+namespace pulsar {
+
+class MockServer : public std::enable_shared_from_this<MockServer> {
+ public:
+ using RequestDelayType = std::unordered_map<std::string, long /* delay in
milliseconds */>;
+
+ MockServer(const ClientConnectionPtr& connection) :
connection_(connection) {}
+
+ void setRequestDelay(std::initializer_list<typename
RequestDelayType::value_type> delays) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto&& delay : delays) {
+ requestDelays_[delay.first] = delay.second;
+ }
+ }
+
+ bool sendRequest(const std::string& request, uint64_t requestId) {
+ auto connection = connection_.lock();
+ if (!connection) {
+ return false;
+ }
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (auto iter = requestDelays_.find(request); iter !=
requestDelays_.end()) {
+ // Mock the `CLOSE_CONSUMER` command sent by broker, for
simplicity, disconnect all consumers
+ if (request == "SEEK") {
+ connection->executor_->postWork([connection] {
+ std::vector<uint64_t> consumerIds;
+ {
+ std::lock_guard<std::mutex> lock{connection->mutex_};
+ for (auto&& kv : connection->consumers_) {
+ if (auto consumer = kv.second.lock()) {
+
consumerIds.push_back(consumer->getConsumerId());
+ }
+ }
+ }
+ for (auto consumerId : consumerIds) {
+ proto::CommandCloseConsumer closeConsumerCmd;
+ closeConsumerCmd.set_consumer_id(consumerId);
+ connection->handleCloseConsumer(closeConsumerCmd);
+ }
+ });
+ }
+ long delayMs = iter->second;
+ auto timer = connection->executor_->createDeadlineTimer();
+ auto key = request + std::to_string(requestId);
+ pendingTimers_[key] = timer;
+ timer->expires_from_now(std::chrono::milliseconds(delayMs));
+
+ LOG_INFO("Mock sending request " << key << " with delay " <<
delayMs << " ms");
+ auto self = shared_from_this();
+ timer->async_wait([this, self, key, connection, requestId,
timer](const auto& ec) {
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ pendingTimers_.erase(key);
+ }
+ if (ec) {
+ LOG_INFO("Timer cancelled for request " << key);
+ return;
+ }
+ if (connection->isClosed()) {
+ LOG_INFO("Connection is closed, not completing request "
<< key);
+ return;
+ }
+ LOG_INFO("Completing delayed request " << key);
+ proto::CommandSuccess success;
+ success.set_request_id(requestId);
+ connection->handleSuccess(success);
+ });
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // Return the number of pending timers cancelled
+ auto close() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto result = pendingTimers_.size();
+ for (auto&& kv : pendingTimers_) {
+ try {
+ LOG_INFO("Cancelling timer for " << kv.first);
+ kv.second->cancel();
+ } catch (...) {
+ LOG_WARN("Failed to cancel timer for " << kv.first);
+ }
+ }
Review Comment:
After cancelling timers in the close() method, pendingTimers_ should be
cleared to avoid holding references to cancelled timers. Without clearing the
map, the cancelled timers remain in memory until the MockServer is destroyed.
```suggestion
}
pendingTimers_.clear();
```
##########
lib/MockServer.h:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <initializer_list>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+#include <vector>
+
+#include "ClientConnection.h"
+#include "ConsumerImpl.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "PulsarApi.pb.h"
+
+namespace pulsar {
+
+class MockServer : public std::enable_shared_from_this<MockServer> {
+ public:
+ using RequestDelayType = std::unordered_map<std::string, long /* delay in
milliseconds */>;
+
+ MockServer(const ClientConnectionPtr& connection) :
connection_(connection) {}
+
+ void setRequestDelay(std::initializer_list<typename
RequestDelayType::value_type> delays) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto&& delay : delays) {
+ requestDelays_[delay.first] = delay.second;
+ }
+ }
+
+ bool sendRequest(const std::string& request, uint64_t requestId) {
+ auto connection = connection_.lock();
+ if (!connection) {
+ return false;
+ }
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (auto iter = requestDelays_.find(request); iter !=
requestDelays_.end()) {
+ // Mock the `CLOSE_CONSUMER` command sent by broker, for
simplicity, disconnect all consumers
+ if (request == "SEEK") {
+ connection->executor_->postWork([connection] {
+ std::vector<uint64_t> consumerIds;
+ {
+ std::lock_guard<std::mutex> lock{connection->mutex_};
+ for (auto&& kv : connection->consumers_) {
+ if (auto consumer = kv.second.lock()) {
+
consumerIds.push_back(consumer->getConsumerId());
+ }
+ }
+ }
+ for (auto consumerId : consumerIds) {
+ proto::CommandCloseConsumer closeConsumerCmd;
+ closeConsumerCmd.set_consumer_id(consumerId);
+ connection->handleCloseConsumer(closeConsumerCmd);
+ }
+ });
Review Comment:
The mutex_ is held while calling connection->executor_->postWork(), which
can potentially cause contention if the postWork call takes time. Consider
releasing the mutex before calling postWork or restructuring the code to
minimize the critical section. The postWork call doesn't need the mutex
protection since the connection is already locked from the weak_ptr.
##########
lib/ConsumerImpl.cc:
##########
@@ -259,14 +263,23 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const
ClientConnectionPtr& c
// Keep a reference to ensure object is kept alive.
auto self = get_shared_this_ptr();
setFirstRequestIdAfterConnect(requestId);
- cnx->sendRequestWithId(cmd, requestId)
+ cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE")
.addListener([this, self, cnx, promise](Result result, const
ResponseData& responseData) {
Result handleResult = handleCreateConsumer(cnx, result);
if (handleResult == ResultOk) {
promise.setSuccess();
} else {
promise.setFailed(handleResult);
}
+ // Complete the seek callback after completing `promise`,
otherwise `reconnectionPending_` will
+ // still be true when the seek operation is done.
+ LockGuard lock{mutex_};
+ if (seekStatus_ == SeekStatus::COMPLETED) {
+ executor_->postWork([seekCallback{std::exchange(seekCallback_,
std::nullopt).value()}]() {
+ seekCallback(ResultOk);
+ });
+ seekStatus_ = SeekStatus::NOT_STARTED;
+ }
Review Comment:
If the handleResult is not ResultOk (i.e., handleCreateConsumer fails), but
seekStatus_ is COMPLETED, the code will still try to execute the seek callback.
However, the promise is set to failed, which means reconnection should happen
again. The seekCallback should only be executed when handleResult is ResultOk
to ensure proper synchronization between subscription success and seek
completion.
```suggestion
// Complete the seek callback after completing `promise`,
otherwise `reconnectionPending_` will
// still be true when the seek operation is done.
LockGuard lock{mutex_};
if (seekStatus_ == SeekStatus::COMPLETED && seekCallback_) {
auto cb = std::exchange(seekCallback_, std::nullopt);
executor_->postWork([seekCallback = std::move(*cb)]() {
seekCallback(ResultOk);
});
seekStatus_ = SeekStatus::NOT_STARTED;
}
} else {
promise.setFailed(handleResult);
}
```
##########
lib/MockServer.h:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <initializer_list>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+#include <vector>
+
+#include "ClientConnection.h"
+#include "ConsumerImpl.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "PulsarApi.pb.h"
+
+namespace pulsar {
+
+class MockServer : public std::enable_shared_from_this<MockServer> {
+ public:
+ using RequestDelayType = std::unordered_map<std::string, long /* delay in
milliseconds */>;
+
+ MockServer(const ClientConnectionPtr& connection) :
connection_(connection) {}
+
+ void setRequestDelay(std::initializer_list<typename
RequestDelayType::value_type> delays) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto&& delay : delays) {
+ requestDelays_[delay.first] = delay.second;
+ }
+ }
+
+ bool sendRequest(const std::string& request, uint64_t requestId) {
+ auto connection = connection_.lock();
+ if (!connection) {
+ return false;
+ }
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (auto iter = requestDelays_.find(request); iter !=
requestDelays_.end()) {
+ // Mock the `CLOSE_CONSUMER` command sent by broker, for
simplicity, disconnect all consumers
+ if (request == "SEEK") {
+ connection->executor_->postWork([connection] {
+ std::vector<uint64_t> consumerIds;
+ {
+ std::lock_guard<std::mutex> lock{connection->mutex_};
+ for (auto&& kv : connection->consumers_) {
+ if (auto consumer = kv.second.lock()) {
+
consumerIds.push_back(consumer->getConsumerId());
+ }
+ }
+ }
+ for (auto consumerId : consumerIds) {
+ proto::CommandCloseConsumer closeConsumerCmd;
+ closeConsumerCmd.set_consumer_id(consumerId);
+ connection->handleCloseConsumer(closeConsumerCmd);
+ }
+ });
+ }
+ long delayMs = iter->second;
+ auto timer = connection->executor_->createDeadlineTimer();
+ auto key = request + std::to_string(requestId);
+ pendingTimers_[key] = timer;
+ timer->expires_from_now(std::chrono::milliseconds(delayMs));
+
+ LOG_INFO("Mock sending request " << key << " with delay " <<
delayMs << " ms");
+ auto self = shared_from_this();
+ timer->async_wait([this, self, key, connection, requestId,
timer](const auto& ec) {
Review Comment:
The timer is captured in the lambda but is not used within it. This creates
unnecessary reference counting overhead. The timer should be removed from the
capture list since it's not needed after the timer is started - the timer will
remain alive through the executor's internal reference.
```suggestion
timer->async_wait([this, self, key, connection, requestId](const
auto& ec) {
```
##########
lib/ConsumerImpl.cc:
##########
@@ -1723,78 +1747,88 @@ bool ConsumerImpl::isConnected() const { return
!getCnx().expired() && state_ ==
uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ?
1 : 0; }
void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek,
const SeekArg& seekArg,
- const ResultCallback& callback) {
+ ResultCallback&& callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
LOG_ERROR(getName() << " Client Connection not ready for Consumer");
callback(ResultNotConnected);
return;
}
-
- auto expected = SeekStatus::NOT_STARTED;
- if (!seekStatus_.compare_exchange_strong(expected,
SeekStatus::IN_PROGRESS)) {
- LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the
status is "
- << static_cast<int>(expected));
+ bool hasPendingSeek = false;
+ // Save the previous last seek arg in case seek failed
+ decltype(lastSeekArg_) previousLastSeekArg;
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (seekStatus_ != SeekStatus::NOT_STARTED) {
+ hasPendingSeek = true;
+ } else {
+ seekStatus_ = SeekStatus::IN_PROGRESS;
+ if (seekCallback_.has_value()) {
+ // This should never happen
+ LOG_ERROR(getName() << "Previous seek callback is not
triggered unexpectedly");
+ executor_->postWork([callback{std::exchange(seekCallback_,
std::nullopt).value()}] {
+ callback(ResultTimeout);
+ });
+ }
+ seekCallback_ = std::move(callback);
+ previousLastSeekArg = lastSeekArg_;
+ lastSeekArg_ = seekArg;
+ }
+ }
+ if (hasPendingSeek) {
+ std::visit(
+ [this](auto&& arg) {
+ LOG_ERROR(getName() << "Attempted to seek " << arg << " when
there is a pending seek");
+ },
+ seekArg);
callback(ResultNotAllowedError);
return;
}
- const auto originalSeekMessageId = seekMessageId_.get();
- if (boost::get<uint64_t>(&seekArg)) {
- hasSoughtByTimestamp_.store(true, std::memory_order_release);
- } else {
- seekMessageId_ = *boost::get<MessageId>(&seekArg);
- hasSoughtByTimestamp_.store(false, std::memory_order_release);
- }
- seekStatus_ = SeekStatus::IN_PROGRESS;
- seekCallback_ = callback;
- LOG_INFO(getName() << " Seeking subscription to " << seekArg);
+ std::visit([this](auto&& arg) { LOG_INFO(getName() << "Seeking
subscription to " << arg); }, seekArg);
auto weakSelf = weak_from_this();
- cnx->sendRequestWithId(seek, requestId)
- .addListener([this, weakSelf, callback, originalSeekMessageId](Result
result,
- const
ResponseData& responseData) {
+ cnx->sendRequestWithId(seek, requestId, "SEEK")
+ .addListener([this, weakSelf, previousLastSeekArg](Result result,
const ResponseData& responseData) {
auto self = weakSelf.lock();
if (!self) {
- callback(result);
return;
}
if (result == ResultOk) {
- LOG_INFO(getName() << "Seek successfully");
- ackGroupingTrackerPtr_->flushAndClean();
- incomingMessages_.clear();
- Lock lock(mutexForMessageId_);
- lastDequedMessageId_ = MessageId::earliest();
- lock.unlock();
- if (getCnx().expired()) {
+ LockGuard lock(mutex_);
+ if (getCnx().expired() || reconnectionPending_) {
// It's during reconnection, complete the seek future
after connection is established
seekStatus_ = SeekStatus::COMPLETED;
+ LOG_INFO(getName() << "Delay the seek future until the
reconnection is done");
} else {
- if (!hasSoughtByTimestamp()) {
- startMessageId_ = seekMessageId_.get();
+ LOG_INFO(getName() << "Seek successfully");
+ ackGroupingTrackerPtr_->flushAndClean();
+ incomingMessages_.clear();
+ if (lastSeekArg_.has_value() &&
std::holds_alternative<MessageId>(lastSeekArg_.value())) {
+ startMessageId_ =
std::get<MessageId>(lastSeekArg_.value());
}
- seekCallback_.release()(result);
- }
+ if (!seekCallback_.has_value()) {
+ LOG_ERROR(getName() << "Seek callback is not set");
+ return;
+ }
+ executor_->postWork(
+ [self, callback{std::exchange(seekCallback_,
std::nullopt).value()}]() {
+ callback(ResultOk);
+ });
+ seekStatus_ = SeekStatus::NOT_STARTED;
+ } // else: complete the seek future after connection is
established
} else {
LOG_ERROR(getName() << "Failed to seek: " << result);
- seekMessageId_ = originalSeekMessageId;
- seekStatus_ = SeekStatus::NOT_STARTED;
- seekCallback_.release()(result);
+ LockGuard lock{mutex_};
+ lastSeekArg_ = previousLastSeekArg;
+ executor_->postWork([self,
callback{std::exchange(seekCallback_, std::nullopt).value()}]() {
+ callback(ResultOk);
+ });
Review Comment:
When the seek operation fails, the seekStatus_ is not reset to NOT_STARTED,
which means subsequent seek attempts will be blocked. The seekStatus_ should be
reset to NOT_STARTED in the error handling path to allow future seek operations.
```suggestion
seekStatus_ = SeekStatus::NOT_STARTED;
executor_->postWork(
[self, result, callback{std::exchange(seekCallback_,
std::nullopt).value()}]() {
callback(result);
});
```
##########
lib/ConsumerImpl.cc:
##########
@@ -1723,78 +1747,88 @@ bool ConsumerImpl::isConnected() const { return
!getCnx().expired() && state_ ==
uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ?
1 : 0; }
void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek,
const SeekArg& seekArg,
- const ResultCallback& callback) {
+ ResultCallback&& callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
LOG_ERROR(getName() << " Client Connection not ready for Consumer");
callback(ResultNotConnected);
return;
}
-
- auto expected = SeekStatus::NOT_STARTED;
- if (!seekStatus_.compare_exchange_strong(expected,
SeekStatus::IN_PROGRESS)) {
- LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the
status is "
- << static_cast<int>(expected));
+ bool hasPendingSeek = false;
+ // Save the previous last seek arg in case seek failed
+ decltype(lastSeekArg_) previousLastSeekArg;
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (seekStatus_ != SeekStatus::NOT_STARTED) {
+ hasPendingSeek = true;
+ } else {
+ seekStatus_ = SeekStatus::IN_PROGRESS;
+ if (seekCallback_.has_value()) {
+ // This should never happen
+ LOG_ERROR(getName() << "Previous seek callback is not
triggered unexpectedly");
+ executor_->postWork([callback{std::exchange(seekCallback_,
std::nullopt).value()}] {
+ callback(ResultTimeout);
+ });
+ }
+ seekCallback_ = std::move(callback);
+ previousLastSeekArg = lastSeekArg_;
+ lastSeekArg_ = seekArg;
+ }
+ }
+ if (hasPendingSeek) {
+ std::visit(
+ [this](auto&& arg) {
+ LOG_ERROR(getName() << "Attempted to seek " << arg << " when
there is a pending seek");
+ },
+ seekArg);
callback(ResultNotAllowedError);
return;
}
- const auto originalSeekMessageId = seekMessageId_.get();
- if (boost::get<uint64_t>(&seekArg)) {
- hasSoughtByTimestamp_.store(true, std::memory_order_release);
- } else {
- seekMessageId_ = *boost::get<MessageId>(&seekArg);
- hasSoughtByTimestamp_.store(false, std::memory_order_release);
- }
- seekStatus_ = SeekStatus::IN_PROGRESS;
- seekCallback_ = callback;
- LOG_INFO(getName() << " Seeking subscription to " << seekArg);
+ std::visit([this](auto&& arg) { LOG_INFO(getName() << "Seeking
subscription to " << arg); }, seekArg);
auto weakSelf = weak_from_this();
- cnx->sendRequestWithId(seek, requestId)
- .addListener([this, weakSelf, callback, originalSeekMessageId](Result
result,
- const
ResponseData& responseData) {
+ cnx->sendRequestWithId(seek, requestId, "SEEK")
+ .addListener([this, weakSelf, previousLastSeekArg](Result result,
const ResponseData& responseData) {
auto self = weakSelf.lock();
if (!self) {
- callback(result);
return;
}
if (result == ResultOk) {
- LOG_INFO(getName() << "Seek successfully");
- ackGroupingTrackerPtr_->flushAndClean();
- incomingMessages_.clear();
- Lock lock(mutexForMessageId_);
- lastDequedMessageId_ = MessageId::earliest();
- lock.unlock();
- if (getCnx().expired()) {
+ LockGuard lock(mutex_);
+ if (getCnx().expired() || reconnectionPending_) {
// It's during reconnection, complete the seek future
after connection is established
seekStatus_ = SeekStatus::COMPLETED;
+ LOG_INFO(getName() << "Delay the seek future until the
reconnection is done");
} else {
- if (!hasSoughtByTimestamp()) {
- startMessageId_ = seekMessageId_.get();
+ LOG_INFO(getName() << "Seek successfully");
+ ackGroupingTrackerPtr_->flushAndClean();
+ incomingMessages_.clear();
+ if (lastSeekArg_.has_value() &&
std::holds_alternative<MessageId>(lastSeekArg_.value())) {
+ startMessageId_ =
std::get<MessageId>(lastSeekArg_.value());
}
- seekCallback_.release()(result);
- }
+ if (!seekCallback_.has_value()) {
+ LOG_ERROR(getName() << "Seek callback is not set");
+ return;
+ }
+ executor_->postWork(
+ [self, callback{std::exchange(seekCallback_,
std::nullopt).value()}]() {
+ callback(ResultOk);
+ });
+ seekStatus_ = SeekStatus::NOT_STARTED;
+ } // else: complete the seek future after connection is
established
} else {
LOG_ERROR(getName() << "Failed to seek: " << result);
- seekMessageId_ = originalSeekMessageId;
- seekStatus_ = SeekStatus::NOT_STARTED;
- seekCallback_.release()(result);
+ LockGuard lock{mutex_};
+ lastSeekArg_ = previousLastSeekArg;
+ executor_->postWork([self,
callback{std::exchange(seekCallback_, std::nullopt).value()}]() {
+ callback(ResultOk);
+ });
Review Comment:
In the error handling path when seek fails (result != ResultOk), the
callback is being invoked with ResultOk instead of the actual error result.
This means that even when the seek operation fails, the callback will report
success to the caller. The callback should be invoked with the actual error
result that was received.
```suggestion
executor_->postWork(
[self, result, callback{std::exchange(seekCallback_,
std::nullopt).value()}]() {
callback(result);
});
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]