This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 0b893a1 Fix the seek method could be blocked forever when subscribe
RPC is slower than seek RPC (#533)
0b893a1 is described below
commit 0b893a1622aa7dd702d48afee33af321bb2c06dd
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jan 16 14:14:50 2026 +0800
Fix the seek method could be blocked forever when subscribe RPC is slower
than seek RPC (#533)
---
lib/ClientConnection.cc | 31 ++++--
lib/ClientConnection.h | 22 ++++-
lib/ConsumerImpl.cc | 246 ++++++++++++++++++++++++++--------------------
lib/ConsumerImpl.h | 63 ++++++------
lib/HandlerBase.cc | 2 +-
lib/HandlerBase.h | 2 +-
lib/MockServer.h | 139 ++++++++++++++++++++++++++
lib/ProducerImpl.cc | 10 +-
lib/Synchronized.h | 47 ---------
tests/ConsumerSeekTest.cc | 58 +++++++++++
10 files changed, 420 insertions(+), 200 deletions(-)
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 86dfd9d..4f7a1dd 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -21,6 +21,7 @@
#include <openssl/x509.h>
#include <pulsar/MessageIdBuilder.h>
+#include <chrono>
#include <fstream>
#include "AsioDefines.h"
@@ -31,6 +32,7 @@
#include "ConsumerImpl.h"
#include "ExecutorService.h"
#include "LogUtils.h"
+#include "MockServer.h"
#include "OpSendMsg.h"
#include "ProducerImpl.h"
#include "PulsarApi.pb.h"
@@ -1005,15 +1007,17 @@ Future<Result, BrokerConsumerStatsImpl>
ClientConnection::newConsumerStats(uint6
void ClientConnection::newTopicLookup(const std::string& topicName, bool
authoritative,
const std::string& listenerName,
uint64_t requestId,
const LookupDataResultPromisePtr&
promise) {
- newLookup(Commands::newLookup(topicName, authoritative, requestId,
listenerName), requestId, promise);
+ newLookup(Commands::newLookup(topicName, authoritative, requestId,
listenerName), requestId, "LOOKUP",
+ promise);
}
void ClientConnection::newPartitionedMetadataLookup(const std::string&
topicName, uint64_t requestId,
const
LookupDataResultPromisePtr& promise) {
- newLookup(Commands::newPartitionMetadataRequest(topicName, requestId),
requestId, promise);
+ newLookup(Commands::newPartitionMetadataRequest(topicName, requestId),
requestId, "PARTITIONED_METADATA",
+ promise);
}
-void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
+void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId,
const char* requestType,
const LookupDataResultPromisePtr& promise) {
Lock lock(mutex_);
std::shared_ptr<LookupDataResultPtr> lookupDataResult;
@@ -1042,6 +1046,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd,
uint64_t requestId,
pendingLookupRequests_.insert(std::make_pair(requestId, requestData));
numOfPendingLookupRequest_++;
lock.unlock();
+ LOG_DEBUG(cnxString_ << "Inserted lookup request " << requestType << "
(req_id: " << requestId << ")");
sendCommand(cmd);
}
@@ -1158,12 +1163,15 @@ void ClientConnection::sendPendingCommands() {
}
}
-Future<Result, ResponseData> ClientConnection::sendRequestWithId(const
SharedBuffer& cmd, int requestId) {
+Future<Result, ResponseData> ClientConnection::sendRequestWithId(const
SharedBuffer& cmd, int requestId,
+ const char*
requestType) {
Lock lock(mutex_);
if (isClosed()) {
lock.unlock();
Promise<Result, ResponseData> promise;
+ LOG_DEBUG(cnxString_ << "Fail " << requestType << "(req_id: " <<
requestId
+ << ") to a closed connection");
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
@@ -1182,7 +1190,17 @@ Future<Result, ResponseData>
ClientConnection::sendRequestWithId(const SharedBuf
pendingRequests_.insert(std::make_pair(requestId, requestData));
lock.unlock();
- sendCommand(cmd);
+ LOG_DEBUG(cnxString_ << "Inserted request " << requestType << " (req_id: "
<< requestId << ")");
+ if (mockingRequests_.load(std::memory_order_acquire)) {
+ if (mockServer_ == nullptr) {
+ LOG_WARN(cnxString_ << "Mock server is unexpectedly null when
processing " << requestType);
+ sendCommand(cmd);
+ } else if (!mockServer_->sendRequest(requestType, requestId)) {
+ sendCommand(cmd);
+ }
+ } else {
+ sendCommand(cmd);
+ }
return requestData.promise.getFuture();
}
@@ -1625,9 +1643,6 @@ void ClientConnection::handleConsumerStatsResponse(
void ClientConnection::handleLookupTopicRespose(
const proto::CommandLookupTopicResponse& lookupTopicResponse) {
- LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: "
- << lookupTopicResponse.request_id());
-
Lock lock(mutex_);
auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
if (it != pendingLookupRequests_.end()) {
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 18a7d84..aae53d2 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -115,6 +115,7 @@ struct ResponseData {
typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
+class MockServer;
class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<ClientConnection> {
enum State : uint8_t
{
@@ -123,6 +124,8 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
Ready,
Disconnected
};
+ using RequestDelayType =
+ std::unordered_map<std::string /* request type */, long /* delay in
milliseconds */>;
public:
typedef std::shared_ptr<ASIO::ip::tcp::socket> SocketPtr;
@@ -185,7 +188,8 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
* Send a request with a specific Id over the connection. The future will
be
* triggered when the response for this request is received
*/
- Future<Result, ResponseData> sendRequestWithId(const SharedBuffer& cmd,
int requestId);
+ Future<Result, ResponseData> sendRequestWithId(const SharedBuffer& cmd,
int requestId,
+ const char* requestType);
const std::string& brokerAddress() const;
@@ -208,6 +212,13 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
Future<Result, SchemaInfo> newGetSchema(const std::string& topicName,
const std::string& version,
uint64_t requestId);
+ void attachMockServer(const std::shared_ptr<MockServer>& mockServer) {
+ mockServer_ = mockServer;
+ // Mark that requests will first go through the mock server, if the
mock server cannot process it,
+ // fall back to the normal logic
+ mockingRequests_.store(true, std::memory_order_release);
+ }
+
private:
struct PendingRequestData {
Promise<Result, ResponseData> promise;
@@ -264,7 +275,8 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
void handleSendPair(const ASIO_ERROR& err);
void sendPendingCommands();
- void newLookup(const SharedBuffer& cmd, uint64_t requestId, const
LookupDataResultPromisePtr& promise);
+ void newLookup(const SharedBuffer& cmd, uint64_t requestId, const char*
requestType,
+ const LookupDataResultPromisePtr& promise);
void handleRequestTimeout(const ASIO_ERROR& ec, const PendingRequestData&
pendingRequestData);
@@ -308,6 +320,8 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
}
}
+ void mockSendCommand(const char* requestType, uint64_t requestId, const
SharedBuffer& cmd);
+
std::atomic<State> state_{Pending};
TimeDuration operationsTimeout_;
AuthenticationPtr authentication_;
@@ -391,6 +405,9 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
DeadlineTimerPtr keepAliveTimer_;
DeadlineTimerPtr consumerStatsRequestTimer_;
+ std::atomic_bool mockingRequests_{false};
+ std::shared_ptr<MockServer> mockServer_;
+
void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const
std::vector<uint64_t>& consumerStatsRequests);
void startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests);
@@ -405,6 +422,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
friend class PulsarFriend;
friend class ConsumerTest;
+ friend class MockServer;
void checkServerError(ServerError error, const std::string& message);
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 430b851..a645f58 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -23,6 +23,8 @@
#include <pulsar/MessageIdBuilder.h>
#include <algorithm>
+#include <utility>
+#include <variant>
#include "AckGroupingTracker.h"
#include "AckGroupingTrackerDisabled.h"
@@ -123,7 +125,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr& client,
const std::string& topic
negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client,
*this, conf)),
ackGroupingTrackerPtr_(newAckGroupingTracker(topic, conf, client)),
readCompacted_(conf.isReadCompacted()),
- startMessageId_(getStartMessageId(startMessageId,
conf.isStartMessageIdInclusive())),
+ startMessageId_(pulsar::getStartMessageId(startMessageId,
conf.isStartMessageIdInclusive())),
maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
@@ -189,7 +191,8 @@ ConsumerImpl::~ConsumerImpl() {
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
auto requestId = newRequestId();
- cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_,
requestId), requestId);
+ cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_,
requestId), requestId,
+ "CLOSE_CONSUMER");
cnx->removeConsumer(consumerId_);
LOG_INFO(consumerStr_ << "Closed consumer for race condition: " <<
consumerId_);
} else {
@@ -232,20 +235,19 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const
ClientConnectionPtr& c
return promise.getFuture();
}
- // Register consumer so that we can handle other incomming commands (e.g.
ACTIVE_CONSUMER_CHANGE) after
- // sending the subscribe request.
- cnx->registerConsumer(consumerId_, get_shared_this_ptr());
+ optional<MessageId> subscribeMessageId;
+ {
+ LockGuard lock{mutex_};
+ setCnx(cnx);
+ cnx->registerConsumer(consumerId_, get_shared_this_ptr());
+ LOG_DEBUG(cnx->cnxString() << "Registered consumer " << consumerId_);
- if (duringSeek()) {
- ackGroupingTrackerPtr_->flushAndClean();
+ clearReceiveQueue();
+ subscribeMessageId =
+ (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ?
startMessageId_ : std::nullopt;
+ lastDequedMessageId_ = MessageId::earliest();
}
- Lock lockForMessageId(mutexForMessageId_);
- clearReceiveQueue();
- const auto subscribeMessageId =
- (subscriptionMode_ == Commands::SubscriptionModeNonDurable) ?
startMessageId_.get() : std::nullopt;
- lockForMessageId.unlock();
-
unAckedMessageTrackerPtr_->clear();
ClientImplPtr client = client_.lock();
@@ -259,13 +261,22 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const
ClientConnectionPtr& c
// Keep a reference to ensure object is kept alive.
auto self = get_shared_this_ptr();
setFirstRequestIdAfterConnect(requestId);
- cnx->sendRequestWithId(cmd, requestId)
+ cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE")
.addListener([this, self, cnx, promise](Result result, const
ResponseData& responseData) {
Result handleResult = handleCreateConsumer(cnx, result);
- if (handleResult == ResultOk) {
- promise.setSuccess();
- } else {
+ if (handleResult != ResultOk) {
promise.setFailed(handleResult);
+ return;
+ }
+ promise.setSuccess();
+ // Complete the seek callback after completing `promise`,
otherwise `reconnectionPending_` will
+ // still be true when the seek operation is done.
+ LockGuard lock{mutex_};
+ if (seekStatus_ == SeekStatus::COMPLETED) {
+ executor_->postWork([seekCallback{std::exchange(seekCallback_,
std::nullopt).value()}]() {
+ seekCallback(ResultOk);
+ });
+ seekStatus_ = SeekStatus::NOT_STARTED;
}
});
@@ -301,7 +312,8 @@ Result ConsumerImpl::handleCreateConsumer(const
ClientConnectionPtr& cnx, Result
LOG_INFO(getName() << "Closing subscribed consumer since
it was already closed");
int requestId = client->newRequestId();
auto name = getName();
-
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId),
requestId)
+
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId),
requestId,
+ "CLOSE_CONSUMER")
.addListener([name](Result result, const
ResponseData&) {
if (result == ResultOk) {
LOG_INFO(name << "Closed consumer successfully
after subscribe completed");
@@ -321,8 +333,8 @@ Result ConsumerImpl::handleCreateConsumer(const
ClientConnectionPtr& cnx, Result
return ResultAlreadyClosed;
}
+ mutexLock.unlock();
LOG_INFO(getName() << "Created consumer on broker " <<
cnx->cnxString());
- setCnx(cnx);
incomingMessages_.clear();
possibleSendToDeadLetterTopicMessages_.clear();
backoff_.reset();
@@ -354,7 +366,8 @@ Result ConsumerImpl::handleCreateConsumer(const
ClientConnectionPtr& cnx, Result
// in case it was indeed created, otherwise it might prevent new
subscribe operation,
// since we are not closing the connection
auto requestId = newRequestId();
- cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_,
requestId), requestId);
+ cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_,
requestId), requestId,
+ "CLOSE_CONSUMER");
}
if (consumerCreatedPromise_.isComplete()) {
@@ -408,7 +421,7 @@ void ConsumerImpl::unsubscribeAsync(const ResultCallback&
originalCallback) {
auto requestId = newRequestId();
SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId);
auto self = get_shared_this_ptr();
- cnx->sendRequestWithId(cmd, requestId)
+ cnx->sendRequestWithId(cmd, requestId, "UNSUBSCRIBE")
.addListener([self, callback](Result result, const ResponseData&)
{ callback(result); });
} else {
Result result = ResultNotConnected;
@@ -502,9 +515,10 @@ optional<SharedBuffer>
ConsumerImpl::processMessageChunk(const SharedBuffer& pay
auto& chunkedMsgCtx = it->second;
if (it == chunkedMessageCache_.end() ||
!chunkedMsgCtx.validateChunkId(chunkId)) {
- auto startMessageId =
startMessageId_.get().value_or(MessageId::earliest());
- if (!config_.isStartMessageIdInclusive() && startMessageId.ledgerId()
== messageId.ledgerId() &&
- startMessageId.entryId() == messageId.entryId()) {
+ auto startMessageId = getStartMessageId();
+ if (!config_.isStartMessageIdInclusive() && startMessageId &&
+ startMessageId->ledgerId() == messageId.ledgerId() &&
+ startMessageId->entryId() == messageId.entryId()) {
// When the start message id is not inclusive, the last chunk of
the previous chunked message will
// be delivered, which is expected and we only need to filter it
out.
chunkedMessageCache_.remove(uuid);
@@ -621,17 +635,14 @@ void ConsumerImpl::messageReceived(const
ClientConnectionPtr& cnx, const proto::
words[i] = msg.ack_set(i);
}
BitSet ackSet{std::move(words)};
- Lock lock(mutex_);
numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m,
ackSet, msg.redelivery_count());
} else {
// try convert key value data.
m.impl_->convertPayloadToKeyValue(config_.getSchema());
- const auto startMessageId = startMessageId_.get();
- if (isPersistent_ && startMessageId &&
- m.getMessageId().ledgerId() == startMessageId.value().ledgerId() &&
- m.getMessageId().entryId() == startMessageId.value().entryId() &&
- isPriorEntryIndex(m.getMessageId().entryId())) {
+ const auto startMessageId = getStartMessageId();
+ if (isPersistent_ && startMessageId && m.getMessageId().ledgerId() ==
startMessageId->ledgerId() &&
+ isPrior(m.getMessageId().entryId(), startMessageId->entryId())) {
LOG_DEBUG(getName() << " Ignoring message from before the
startMessageId: "
<< startMessageId.value());
return;
@@ -753,7 +764,7 @@ uint32_t
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
auto batchSize = batchedMessage.impl_->metadata.num_messages_in_batch();
LOG_DEBUG("Received Batch messages of size - " << batchSize
<< " -- msgId: " <<
batchedMessage.getMessageId());
- const auto startMessageId = startMessageId_.get();
+ const auto startMessageId = getStartMessageId();
int skippedMessages = 0;
@@ -783,9 +794,9 @@ uint32_t
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
// If we are receiving a batch message, we need to discard
messages that were prior
// to the startMessageId
- if (isPersistent_ && msgId.ledgerId() ==
startMessageId.value().ledgerId() &&
- msgId.entryId() == startMessageId.value().entryId() &&
- isPriorBatchIndex(msgId.batchIndex())) {
+ if (isPersistent_ && msgId.ledgerId() ==
startMessageId->ledgerId() &&
+ msgId.entryId() == startMessageId->entryId() &&
+ isPrior(msgId.batchIndex(), startMessageId->batchIndex())) {
LOG_DEBUG(getName() << "Ignoring message from before the
startMessageId"
<< msg.getMessageId());
++skippedMessages;
@@ -925,7 +936,7 @@ void ConsumerImpl::internalListener() {
trackMessage(msg.getMessageId());
try {
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
- lastDequedMessageId_ = msg.getMessageId();
+ setLastDequedMessageId(msg.getMessageId());
Consumer consumer{get_shared_this_ptr()};
Message interceptMsg =
interceptors_->beforeConsume(Consumer(shared_from_this()), msg);
messageListener_(consumer, interceptMsg);
@@ -1098,10 +1109,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int
timeout) {
}
void ConsumerImpl::messageProcessed(Message& msg, bool track) {
- Lock lock(mutexForMessageId_);
- lastDequedMessageId_ = msg.getMessageId();
- lock.unlock();
-
+ setLastDequedMessageId(msg.getMessageId());
incomingMessagesSize_.fetch_sub(msg.getLength());
ClientConnectionPtr currentCnx = getCnx().lock();
@@ -1123,20 +1131,22 @@ void ConsumerImpl::messageProcessed(Message& msg, bool
track) {
* was
* not seen by the application
* `startMessageId_` is updated so that we can discard messages after delivery
restarts.
+ * NOTE: `mutex_` must be locked before calling this method.
*/
void ConsumerImpl::clearReceiveQueue() {
- if (duringSeek()) {
- if (hasSoughtByTimestamp()) {
- // Invalidate startMessageId_ so that isPriorBatchIndex and
isPriorEntryIndex checks will be
- // skipped, and hasMessageAvailableAsync won't use startMessageId_
in compare.
- startMessageId_ = std::nullopt;
+ if (seekStatus_ != SeekStatus::NOT_STARTED) {
+ // Flush the pending ACKs in case newly arrived messages are filtered
out by the previous pending ACKs
+ ackGroupingTrackerPtr_->flushAndClean();
+ if (lastSeekArg_.has_value()) {
+ if (std::holds_alternative<MessageId>(lastSeekArg_.value())) {
+ startMessageId_ = std::get<MessageId>(lastSeekArg_.value());
+ } else {
+ // Invalidate startMessageId_ so that `isPrior` checks will be
skipped, and
+ // `hasMessageAvailableAsync` won't use `startMessageId_` in
compare.
+ startMessageId_ = std::nullopt;
+ }
} else {
- startMessageId_ = seekMessageId_.get();
- }
- SeekStatus expected = SeekStatus::COMPLETED;
- if (seekStatus_.compare_exchange_strong(expected,
SeekStatus::NOT_STARTED)) {
- auto seekCallback = seekCallback_.release();
- executor_->postWork([seekCallback] { seekCallback(ResultOk); });
+ LOG_ERROR(getName() << "SeekStatus is not NOT_STARTED but
lastSeekArg_ is not set");
}
return;
} else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
@@ -1374,7 +1384,7 @@ void ConsumerImpl::closeAsync(const ResultCallback&
originalCallback) {
auto requestId = newRequestId();
auto self = get_shared_this_ptr();
- cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId),
requestId)
+ cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId),
requestId, "CLOSE_CONSUMER")
.addListener([self, callback](Result result, const ResponseData&) {
callback(result); });
}
@@ -1550,10 +1560,12 @@ void ConsumerImpl::seekAsync(const MessageId& msgId,
const ResultCallback& callb
}
const auto requestId = newRequestId();
- seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId,
msgId), SeekArg{msgId}, callback);
+ auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {};
+ seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId,
msgId), SeekArg{msgId},
+ std::move(nonNullCallback));
}
-void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback&
callback) {
+void ConsumerImpl::seekAsync(SeekTimestampType timestamp, const
ResultCallback& callback) {
const auto state = state_.load();
if (state == Closed || state == Closing) {
LOG_ERROR(getName() << "Client connection already closed.");
@@ -1564,8 +1576,9 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, const
ResultCallback& callback)
}
const auto requestId = newRequestId();
+ auto nonNullCallback = (callback != nullptr) ? callback : [](Result) {};
seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId,
timestamp), SeekArg{timestamp},
- callback);
+ std::move(nonNullCallback));
}
bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
@@ -1577,16 +1590,16 @@ void ConsumerImpl::hasMessageAvailableAsync(const
HasMessageAvailableCallback& c
}
bool compareMarkDeletePosition;
{
- std::lock_guard<std::mutex> lock{mutexForMessageId_};
+ LockGuard lock{mutex_};
compareMarkDeletePosition =
// there is no message received by consumer, so we cannot compare
the last position with the last
// received position
lastDequedMessageId_ == MessageId::earliest() &&
// If the start message id is latest, we should seek to the actual
last message first.
- (startMessageId_.get().value_or(MessageId::earliest()) ==
MessageId::latest() ||
+ (startMessageId_.value_or(MessageId::earliest()) ==
MessageId::latest() ||
// If there is a previous seek operation by timestamp, the start
message id will be incorrect, so
// we cannot compare the start position with the last position.
- hasSoughtByTimestamp());
+ (lastSeekArg_.has_value() &&
std::holds_alternative<SeekTimestampType>(lastSeekArg_.value())));
}
if (compareMarkDeletePosition) {
auto self = get_shared_this_ptr();
@@ -1607,7 +1620,15 @@ void ConsumerImpl::hasMessageAvailableAsync(const
HasMessageAvailableCallback& c
callback(ResultOk, false);
}
};
- if (self->config_.isStartMessageIdInclusive() &&
!self->hasSoughtByTimestamp()) {
+ bool lastSeekIsByTimestamp = false;
+ {
+ LockGuard lock{self->mutex_};
+ if (self->lastSeekArg_.has_value() &&
+
std::holds_alternative<SeekTimestampType>(self->lastSeekArg_.value())) {
+ lastSeekIsByTimestamp = true;
+ }
+ }
+ if (self->config_.isStartMessageIdInclusive() &&
!lastSeekIsByTimestamp) {
self->seekAsync(response.getLastMessageId(), [callback,
handleResponse](Result result) {
if (result != ResultOk) {
callback(result, {});
@@ -1664,9 +1685,10 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const
BackoffPtr& backoff, Time
.addListener([this, self, callback](Result result, const
GetLastMessageIdResponse& response) {
if (result == ResultOk) {
LOG_DEBUG(getName() << "getLastMessageId: " <<
response);
- Lock lock(mutexForMessageId_);
- lastMessageIdInBroker_ = response.getLastMessageId();
- lock.unlock();
+ {
+ LockGuard lock{mutex_};
+ lastMessageIdInBroker_ =
response.getLastMessageId();
+ }
} else {
LOG_ERROR(getName() << "Failed to getLastMessageId: "
<< result);
}
@@ -1723,78 +1745,89 @@ bool ConsumerImpl::isConnected() const { return
!getCnx().expired() && state_ ==
uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ?
1 : 0; }
void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek,
const SeekArg& seekArg,
- const ResultCallback& callback) {
+ ResultCallback&& callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
LOG_ERROR(getName() << " Client Connection not ready for Consumer");
callback(ResultNotConnected);
return;
}
-
- auto expected = SeekStatus::NOT_STARTED;
- if (!seekStatus_.compare_exchange_strong(expected,
SeekStatus::IN_PROGRESS)) {
- LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the
status is "
- << static_cast<int>(expected));
+ bool hasPendingSeek = false;
+ // Save the previous last seek arg in case seek failed
+ decltype(lastSeekArg_) previousLastSeekArg;
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (seekStatus_ != SeekStatus::NOT_STARTED) {
+ hasPendingSeek = true;
+ } else {
+ seekStatus_ = SeekStatus::IN_PROGRESS;
+ if (seekCallback_.has_value()) {
+ // This should never happen
+ LOG_ERROR(getName() << "Previous seek callback is not
triggered unexpectedly");
+ executor_->postWork([callback{std::exchange(seekCallback_,
std::nullopt).value()}] {
+ callback(ResultTimeout);
+ });
+ }
+ seekCallback_ = std::move(callback);
+ previousLastSeekArg = lastSeekArg_;
+ lastSeekArg_ = seekArg;
+ }
+ }
+ if (hasPendingSeek) {
+ std::visit(
+ [this](auto&& arg) {
+ LOG_ERROR(getName() << "Attempted to seek " << arg << " when
there is a pending seek");
+ },
+ seekArg);
callback(ResultNotAllowedError);
return;
}
- const auto originalSeekMessageId = seekMessageId_.get();
- if (boost::get<uint64_t>(&seekArg)) {
- hasSoughtByTimestamp_.store(true, std::memory_order_release);
- } else {
- seekMessageId_ = *boost::get<MessageId>(&seekArg);
- hasSoughtByTimestamp_.store(false, std::memory_order_release);
- }
- seekStatus_ = SeekStatus::IN_PROGRESS;
- seekCallback_ = callback;
- LOG_INFO(getName() << " Seeking subscription to " << seekArg);
+ std::visit([this](auto&& arg) { LOG_INFO(getName() << "Seeking
subscription to " << arg); }, seekArg);
auto weakSelf = weak_from_this();
- cnx->sendRequestWithId(seek, requestId)
- .addListener([this, weakSelf, callback, originalSeekMessageId](Result
result,
- const
ResponseData& responseData) {
+ cnx->sendRequestWithId(seek, requestId, "SEEK")
+ .addListener([this, weakSelf, previousLastSeekArg](Result result,
const ResponseData& responseData) {
auto self = weakSelf.lock();
if (!self) {
- callback(result);
return;
}
if (result == ResultOk) {
- LOG_INFO(getName() << "Seek successfully");
- ackGroupingTrackerPtr_->flushAndClean();
- incomingMessages_.clear();
- Lock lock(mutexForMessageId_);
- lastDequedMessageId_ = MessageId::earliest();
- lock.unlock();
- if (getCnx().expired()) {
+ LockGuard lock(mutex_);
+ if (getCnx().expired() || reconnectionPending_) {
// It's during reconnection, complete the seek future
after connection is established
seekStatus_ = SeekStatus::COMPLETED;
+ LOG_INFO(getName() << "Delay the seek future until the
reconnection is done");
} else {
- if (!hasSoughtByTimestamp()) {
- startMessageId_ = seekMessageId_.get();
+ LOG_INFO(getName() << "Seek successfully");
+ ackGroupingTrackerPtr_->flushAndClean();
+ incomingMessages_.clear();
+ if (lastSeekArg_.has_value() &&
std::holds_alternative<MessageId>(lastSeekArg_.value())) {
+ startMessageId_ =
std::get<MessageId>(lastSeekArg_.value());
}
- seekCallback_.release()(result);
- }
+ if (!seekCallback_.has_value()) {
+ LOG_ERROR(getName() << "Seek callback is not set");
+ return;
+ }
+ executor_->postWork(
+ [self, callback{std::exchange(seekCallback_,
std::nullopt).value()}]() {
+ callback(ResultOk);
+ });
+ seekStatus_ = SeekStatus::NOT_STARTED;
+ } // else: complete the seek future after connection is
established
} else {
LOG_ERROR(getName() << "Failed to seek: " << result);
- seekMessageId_ = originalSeekMessageId;
+ LockGuard lock{mutex_};
seekStatus_ = SeekStatus::NOT_STARTED;
- seekCallback_.release()(result);
+ lastSeekArg_ = previousLastSeekArg;
+ executor_->postWork([self,
callback{std::exchange(seekCallback_, std::nullopt).value()}]() {
+ callback(ResultOk);
+ });
}
});
}
-bool ConsumerImpl::isPriorBatchIndex(int32_t idx) {
- return config_.isStartMessageIdInclusive() ? idx <
startMessageId_.get().value().batchIndex()
- : idx <=
startMessageId_.get().value().batchIndex();
-}
-
-bool ConsumerImpl::isPriorEntryIndex(int64_t idx) {
- return config_.isStartMessageIdInclusive() ? idx <
startMessageId_.get().value().entryId()
- : idx <=
startMessageId_.get().value().entryId();
-}
-
bool ConsumerImpl::hasEnoughMessagesForBatchReceive() const {
if (batchReceivePolicy_.getMaxNumMessages() <= 0 &&
batchReceivePolicy_.getMaxNumBytes() <= 0) {
return false;
@@ -1928,7 +1961,7 @@ void ConsumerImpl::doImmediateAck(const
ClientConnectionPtr& cnx, const MessageI
auto requestId = newRequestId();
cnx->sendRequestWithId(
Commands::newAck(consumerId_, msgId.ledgerId(),
msgId.entryId(), ackSet, ackType, requestId),
- requestId)
+ requestId, "ACK")
.addListener([callback](Result result, const ResponseData&) {
if (callback) {
callback(result);
@@ -1958,7 +1991,8 @@ void ConsumerImpl::doImmediateAck(const
ClientConnectionPtr& cnx, const std::set
if
(Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion()))
{
if (config_.isAckReceiptEnabled()) {
auto requestId = newRequestId();
- cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_,
ackMsgIds, requestId), requestId)
+ cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_,
ackMsgIds, requestId), requestId,
+ "ACK")
.addListener([callback](Result result, const ResponseData&) {
if (callback) {
callback(result);
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 63eb51d..0da82a2 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -21,13 +21,15 @@
#include <pulsar/Reader.h>
-#include <boost/variant.hpp>
+#include <atomic>
#include <cstdint>
#include <functional>
#include <list>
#include <memory>
+#include <optional>
#include <set>
#include <utility>
+#include <variant>
#include "BrokerConsumerStatsImpl.h"
#include "Commands.h"
@@ -37,7 +39,6 @@
#include "MapCache.h"
#include "MessageIdImpl.h"
#include "NegativeAcksTracker.h"
-#include "Synchronized.h"
#include "TestUtil.h"
#include "TimeUtils.h"
#include "UnboundedBlockingQueue.h"
@@ -79,9 +80,9 @@ const static std::string DLQ_GROUP_TOPIC_SUFFIX = "-DLQ";
enum class SeekStatus : std::uint8_t
{
- NOT_STARTED,
- IN_PROGRESS,
- COMPLETED
+ NOT_STARTED, // there is no pending seek RPC so that it's allowed to seek
+ IN_PROGRESS, // the seek RPC is in progress
+ COMPLETED // the seek RPC is done but the connection is not
established yet
};
class ConsumerImpl : public ConsumerImplBase {
@@ -138,7 +139,9 @@ class ConsumerImpl : public ConsumerImplBase {
void getBrokerConsumerStatsAsync(const BrokerConsumerStatsCallback&
callback) override;
void getLastMessageIdAsync(const BrokerGetLastMessageIdCallback& callback)
override;
void seekAsync(const MessageId& msgId, const ResultCallback& callback)
override;
- void seekAsync(uint64_t timestamp, const ResultCallback& callback)
override;
+ using SeekTimestampType = uint64_t;
+ using SeekArg = std::variant<SeekTimestampType, MessageId>;
+ void seekAsync(SeekTimestampType timestamp, const ResultCallback&
callback) override;
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;
@@ -191,8 +194,10 @@ class ConsumerImpl : public ConsumerImplBase {
void drainIncomingMessageQueue(size_t count);
uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr&
cnx, Message& batchedMessage,
const BitSet& ackSet, int
redeliveryCount);
- bool isPriorBatchIndex(int32_t idx);
- bool isPriorEntryIndex(int64_t idx);
+ template <typename T>
+ bool isPrior(T index, T startIndex) const noexcept {
+ return config_.isStartMessageIdInclusive() ? (index < startIndex) :
(index <= startIndex);
+ }
void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, const
BrokerConsumerStatsCallback&);
enum class DecryptionResult : uint8_t
@@ -218,19 +223,9 @@ class ConsumerImpl : public ConsumerImplBase {
const BrokerGetLastMessageIdCallback&
callback);
void clearReceiveQueue();
- using SeekArg = boost::variant<uint64_t, MessageId>;
- friend std::ostream& operator<<(std::ostream& os, const SeekArg& seekArg) {
- auto ptr = boost::get<uint64_t>(&seekArg);
- if (ptr) {
- os << *ptr;
- } else {
- os << *boost::get<MessageId>(&seekArg);
- }
- return os;
- }
void seekAsyncInternal(long requestId, const SharedBuffer& seek, const
SeekArg& seekArg,
- const ResultCallback& callback);
+ ResultCallback&& callback);
void processPossibleToDLQ(const MessageId& messageId, const
ProcessDLQCallBack& cb);
std::mutex mutexForReceiveWithZeroQueueSize;
@@ -269,19 +264,13 @@ class ConsumerImpl : public ConsumerImplBase {
std::shared_ptr<Promise<Result, Producer>> deadLetterProducer_;
std::mutex createProducerLock_;
- // Make the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`
thread safe
- mutable std::mutex mutexForMessageId_;
MessageId lastDequedMessageId_{MessageId::earliest()};
MessageId lastMessageIdInBroker_{MessageId::earliest()};
+ optional<MessageId> startMessageId_;
- std::atomic<SeekStatus> seekStatus_{SeekStatus::NOT_STARTED};
- Synchronized<ResultCallback> seekCallback_{[](Result) {}};
- Synchronized<optional<MessageId>> startMessageId_;
- Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
- std::atomic<bool> hasSoughtByTimestamp_{false};
-
- bool hasSoughtByTimestamp() const { return
hasSoughtByTimestamp_.load(std::memory_order_acquire); }
- bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }
+ SeekStatus seekStatus_{SeekStatus::NOT_STARTED};
+ optional<ResultCallback> seekCallback_;
+ optional<SeekArg> lastSeekArg_;
class ChunkedMessageCtx {
public:
@@ -380,7 +369,8 @@ class ConsumerImpl : public ConsumerImplBase {
const ClientConnectionPtr& cnx,
MessageId& messageId);
bool hasMoreMessages() const {
- std::lock_guard<std::mutex> lock{mutexForMessageId_};
+ LockGuard lock{mutex_};
+
if (lastMessageIdInBroker_.entryId() == -1L) {
return false;
}
@@ -388,7 +378,7 @@ class ConsumerImpl : public ConsumerImplBase {
const auto inclusive = config_.isStartMessageIdInclusive();
if (lastDequedMessageId_ == MessageId::earliest()) {
// If startMessageId_ is none, use latest so that this method will
return false
- const auto startMessageId =
startMessageId_.get().value_or(MessageId::latest());
+ const auto startMessageId =
startMessageId_.value_or(MessageId::latest());
return inclusive ? (lastMessageIdInBroker_ >= startMessageId)
: (lastMessageIdInBroker_ > startMessageId);
} else {
@@ -396,11 +386,22 @@ class ConsumerImpl : public ConsumerImplBase {
}
}
+ auto getStartMessageId() const {
+ LockGuard lock{mutex_};
+ return startMessageId_;
+ }
+ auto setLastDequedMessageId(const MessageId& messageId) {
+ LockGuard lock{mutex_};
+ lastDequedMessageId_ = messageId;
+ }
+
void doImmediateAck(const ClientConnectionPtr& cnx, const MessageId&
msgId, CommandAck_AckType ackType,
const ResultCallback& callback);
void doImmediateAck(const ClientConnectionPtr& cnx, const
std::set<MessageId>& msgIds,
const ResultCallback& callback);
+ using LockGuard = std::lock_guard<std::mutex>;
+
friend class PulsarFriend;
friend class MultiTopicsConsumerImpl;
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 37c6e2d..1a4f573 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -44,9 +44,9 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const
std::string& topic,
state_(NotStarted),
backoff_(backoff),
epoch_(0),
+ reconnectionPending_(false),
timer_(executor_->createDeadlineTimer()),
creationTimer_(executor_->createDeadlineTimer()),
- reconnectionPending_(false),
redirectedClusterURI_("") {}
HandlerBase::~HandlerBase() {
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index acce15d..0e733f0 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -143,6 +143,7 @@ class HandlerBase : public
std::enable_shared_from_this<HandlerBase> {
std::atomic<State> state_;
Backoff backoff_;
uint64_t epoch_;
+ std::atomic<bool> reconnectionPending_;
Result convertToTimeoutIfNecessary(Result result, ptime startTimestamp)
const;
@@ -160,7 +161,6 @@ class HandlerBase : public
std::enable_shared_from_this<HandlerBase> {
DeadlineTimerPtr creationTimer_;
mutable std::mutex connectionMutex_;
- std::atomic<bool> reconnectionPending_;
ClientConnectionWeakPtr connection_;
std::string redirectedClusterURI_;
std::atomic<long> firstRequestIdAfterConnect_{-1L};
diff --git a/lib/MockServer.h b/lib/MockServer.h
new file mode 100644
index 0000000..8e4a213
--- /dev/null
+++ b/lib/MockServer.h
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <initializer_list>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "ClientConnection.h"
+#include "ConsumerImpl.h"
+#include "ExecutorService.h"
+#include "LogUtils.h"
+#include "PulsarApi.pb.h"
+
+namespace pulsar {
+
+class MockServer : public std::enable_shared_from_this<MockServer> {
+ public:
+ using RequestDelayType = std::unordered_map<std::string, long /* delay in
milliseconds */>;
+
+ MockServer(const ClientConnectionPtr& connection) :
connection_(connection) {
+ requestDelays_["CLOSE_CONSUMER"] = 1;
+ }
+
+ void setRequestDelay(std::initializer_list<typename
RequestDelayType::value_type> delays) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto&& delay : delays) {
+ requestDelays_[delay.first] = delay.second;
+ }
+ }
+
+ bool sendRequest(const std::string& request, uint64_t requestId) {
+ auto connection = connection_.lock();
+ if (!connection) {
+ return false;
+ }
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (auto iter = requestDelays_.find(request); iter !=
requestDelays_.end()) {
+ // Mock the `CLOSE_CONSUMER` command sent by broker, for
simplicity, disconnect all consumers
+ if (request == "SEEK") {
+ schedule(connection, "CLOSE_CONSUMER" +
std::to_string(requestId),
+ requestDelays_["CLOSE_CONSUMER"], [connection] {
+ std::vector<uint64_t> consumerIds;
+ {
+ std::lock_guard<std::mutex>
lock{connection->mutex_};
+ for (auto&& kv : connection->consumers_) {
+ if (auto consumer = kv.second.lock()) {
+
consumerIds.push_back(consumer->getConsumerId());
+ }
+ }
+ }
+ for (auto consumerId : consumerIds) {
+ proto::CommandCloseConsumer closeConsumerCmd;
+ closeConsumerCmd.set_consumer_id(consumerId);
+
connection->handleCloseConsumer(closeConsumerCmd);
+ }
+ });
+ }
+ schedule(connection, request + std::to_string(requestId),
iter->second, [connection, requestId] {
+ proto::CommandSuccess success;
+ success.set_request_id(requestId);
+ connection->handleSuccess(success);
+ });
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // Return the number of pending timers cancelled
+ auto close() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto result = pendingTimers_.size();
+ for (auto&& kv : pendingTimers_) {
+ try {
+ LOG_INFO("Cancelling timer for " << kv.first);
+ kv.second->cancel();
+ } catch (...) {
+ LOG_WARN("Failed to cancel timer for " << kv.first);
+ }
+ }
+ pendingTimers_.clear();
+ return result;
+ }
+
+ private:
+ mutable std::mutex mutex_;
+ std::unordered_map<std::string, long> requestDelays_;
+ std::unordered_map<std::string, DeadlineTimerPtr> pendingTimers_;
+ ClientConnectionWeakPtr connection_;
+
+ void schedule(ClientConnectionPtr& connection, const std::string& key,
long delayMs,
+ std::function<void()>&& task) {
+ auto timer = connection->executor_->createDeadlineTimer();
+ pendingTimers_[key] = timer;
+ timer->expires_from_now(std::chrono::milliseconds(delayMs));
+ LOG_INFO("Mock scheduling " << key << " with delay " << delayMs << "
ms");
+ auto self = shared_from_this();
+ timer->async_wait([this, self, key, connection,
task{std::move(task)}](const auto& ec) {
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ pendingTimers_.erase(key);
+ }
+ if (ec) {
+ LOG_INFO("Timer cancelled for " << key);
+ return;
+ }
+ if (connection->isClosed()) {
+ LOG_INFO("Connection is closed, not completing request " <<
key);
+ return;
+ }
+ LOG_INFO("Completing delayed request " << key);
+ task();
+ });
+ }
+
+ DECLARE_LOG_OBJECT()
+};
+
+} // namespace pulsar
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 9d6a9a0..7fd14c7 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -160,7 +160,7 @@ Future<Result, bool> ProducerImpl::connectionOpened(const
ClientConnectionPtr& c
// Keep a reference to ensure object is kept alive.
auto self = shared_from_this();
setFirstRequestIdAfterConnect(requestId);
- cnx->sendRequestWithId(cmd, requestId)
+ cnx->sendRequestWithId(cmd, requestId, "PRODUCER")
.addListener([this, self, cnx, promise](Result result, const
ResponseData& responseData) {
Result handleResult = handleCreateProducer(cnx, result,
responseData);
if (handleResult == ResultOk) {
@@ -204,7 +204,8 @@ Result ProducerImpl::handleCreateProducer(const
ClientConnectionPtr& cnx, Result
auto client = client_.lock();
if (client) {
int requestId = client->newRequestId();
- cnx->sendRequestWithId(Commands::newCloseProducer(producerId_,
requestId), requestId);
+ cnx->sendRequestWithId(Commands::newCloseProducer(producerId_,
requestId), requestId,
+ "CLOSE_PRODUCER");
}
}
if (!producerCreatedPromise_.isComplete()) {
@@ -266,7 +267,8 @@ Result ProducerImpl::handleCreateProducer(const
ClientConnectionPtr& cnx, Result
auto client = client_.lock();
if (client) {
int requestId = client->newRequestId();
- cnx->sendRequestWithId(Commands::newCloseProducer(producerId_,
requestId), requestId);
+ cnx->sendRequestWithId(Commands::newCloseProducer(producerId_,
requestId), requestId,
+ "CLOSE_PRODUCER");
}
}
@@ -818,7 +820,7 @@ void ProducerImpl::closeAsync(CloseCallback
originalCallback) {
int requestId = client->newRequestId();
auto self = shared_from_this();
- cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId),
requestId)
+ cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId),
requestId, "CLOSE_PRODUCER")
.addListener([self, callback](Result result, const ResponseData&) {
callback(result); });
}
diff --git a/lib/Synchronized.h b/lib/Synchronized.h
deleted file mode 100644
index 5449a9f..0000000
--- a/lib/Synchronized.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#pragma once
-
-#include <mutex>
-
-template <typename T>
-class Synchronized {
- public:
- explicit Synchronized(const T& value) : value_(value) {}
-
- T get() const {
- std::lock_guard<std::mutex> lock(mutex_);
- return value_;
- }
-
- T&& release() {
- std::lock_guard<std::mutex> lock(mutex_);
- return std::move(value_);
- }
-
- Synchronized& operator=(const T& value) {
- std::lock_guard<std::mutex> lock(mutex_);
- value_ = value;
- return *this;
- }
-
- private:
- T value_;
- mutable std::mutex mutex_;
-};
diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc
index f03ea5e..f66c27d 100644
--- a/tests/ConsumerSeekTest.cc
+++ b/tests/ConsumerSeekTest.cc
@@ -19,12 +19,18 @@
#include <gtest/gtest.h>
#include <pulsar/Client.h>
+#include <chrono>
+#include <future>
+#include <memory>
#include <set>
#include <stdexcept>
#include <string>
#include "HttpHelper.h"
+#include "lib/ClientConnection.h"
#include "lib/LogUtils.h"
+#include "lib/MockServer.h"
+#include "tests/PulsarFriend.h"
DECLARE_LOG_OBJECT()
@@ -200,6 +206,58 @@ TEST_F(ConsumerSeekTest, testNoInternalConsumer) {
ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest()));
}
+static void assertSeekWithTimeout(Consumer& consumer) {
+ using namespace std::chrono_literals;
+ auto promise = std::make_shared<std::promise<Result>>();
+ std::weak_ptr<std::promise<Result>> weakPromise = promise;
+ consumer.seekAsync(0L, [weakPromise](Result result) {
+ if (auto promise = weakPromise.lock()) {
+ promise->set_value(result);
+ }
+ });
+ auto future = promise->get_future();
+ ASSERT_EQ(future.wait_for(5s), std::future_status::ready);
+ ASSERT_EQ(future.get(), ResultOk);
+}
+
+// Verify the `seek` method won't be blocked forever in any order of the
Subscribe response and Seek response
+TEST_F(ConsumerSeekTest, testSubscribeSeekRaces) {
+ Client client(lookupUrl);
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe("testSubscribeSeekRaces", "sub",
consumer));
+
+ auto connection = *PulsarFriend::getConnections(client).begin();
+ auto mockServer = std::make_shared<MockServer>(connection);
+ connection->attachMockServer(mockServer);
+
+ mockServer->setRequestDelay({{"SUBSCRIBE", 1000}, {"SEEK", 500}});
+ assertSeekWithTimeout(consumer);
+
+ mockServer->setRequestDelay({{"SUBSCRIBE", 500}, {"SEEK", 1000}});
+ assertSeekWithTimeout(consumer);
+
+ ASSERT_EQ(mockServer->close(), 0);
+ client.close();
+}
+
+TEST_F(ConsumerSeekTest, testReconnectionSlow) {
+ Client client(lookupUrl,
ClientConfiguration().setInitialBackoffIntervalMs(500));
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe("testReconnectionSlow", "sub",
consumer));
+
+ auto connection = *PulsarFriend::getConnections(client).begin();
+ auto mockServer = std::make_shared<MockServer>(connection);
+ connection->attachMockServer(mockServer);
+
+ // Make seek response received before `connectionOpened` is called
+ mockServer->setRequestDelay({{"SEEK", 500}, {"CLOSE_CONSUMER", 1000}});
+ assertSeekWithTimeout(consumer);
+
+ // The CLOSE_CONSUMER request is in still flight
+ ASSERT_EQ(mockServer->close(), 1);
+ client.close();
+}
+
INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true,
false));
} // namespace pulsar