This is an automated email from the ASF dual-hosted git repository.
BewareMyPower pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 2ca2eac feat: introduce a v2 createProducer API to carry error
message when fail (#579)
2ca2eac is described below
commit 2ca2eacbb781280316249dc166212de6610d3b87
Author: Yunze Xu <[email protected]>
AuthorDate: Wed May 27 12:10:37 2026 +0800
feat: introduce a v2 createProducer API to carry error message when fail
(#579)
---
include/pulsar/Client.h | 20 +++-
include/pulsar/Result.h | 16 +++
lib/BinaryProtoLookupService.cc | 72 +++++++------
lib/BinaryProtoLookupService.h | 14 +--
lib/Client.cc | 37 ++++++-
lib/ClientConnection.cc | 193 ++++++++++++++++++++---------------
lib/ClientConnection.h | 15 +--
lib/ClientConnectionAdaptor.h | 4 +-
lib/ClientImpl.cc | 121 ++++++++++++----------
lib/ClientImpl.h | 11 +-
lib/ConnectionPool.cc | 26 ++---
lib/ConnectionPool.h | 13 +--
lib/ConsumerImpl.cc | 53 +++++-----
lib/ConsumerImpl.h | 2 +-
lib/ConsumerImplBase.h | 5 +-
lib/HTTPLookupService.cc | 97 ++++++++++--------
lib/HTTPLookupService.h | 12 +--
lib/HandlerBase.cc | 7 +-
lib/HandlerBase.h | 2 +-
lib/LookupDataResult.h | 2 +-
lib/LookupService.h | 7 +-
lib/MultiTopicsConsumerImpl.cc | 13 +--
lib/PartitionedProducerImpl.cc | 25 ++---
lib/PartitionedProducerImpl.h | 6 +-
lib/PendingRequest.h | 8 +-
lib/ProducerImpl.cc | 47 ++++-----
lib/ProducerImpl.h | 10 +-
lib/ProducerImplBase.h | 2 +-
lib/RetryableLookupService.h | 47 +++++++--
lib/RetryableOperation.h | 24 ++---
lib/RetryableOperationCache.h | 12 +--
tests/AuthTokenTest.cc | 28 +++++
tests/ClientTest.cc | 8 +-
tests/ConnectionTest.cc | 4 +-
tests/LookupServiceTest.cc | 26 ++---
tests/MockClientImpl.h | 9 +-
tests/MultiTopicsConsumerTest.cc | 2 +-
tests/RetryableOperationCacheTest.cc | 14 +--
tests/SchemaTest.cc | 16 ++-
tests/VariantHelper.h | 28 +++++
40 files changed, 633 insertions(+), 425 deletions(-)
diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h
index e9813e3..4114075 100644
--- a/include/pulsar/Client.h
+++ b/include/pulsar/Client.h
@@ -36,6 +36,7 @@
#include <memory>
#include <string>
+#include <variant>
namespace pulsar {
typedef std::function<void(Result, Producer)> CreateProducerCallback;
@@ -45,6 +46,8 @@ typedef std::function<void(Result, TableView)>
TableViewCallback;
typedef std::function<void(Result, const std::vector<std::string>&)>
GetPartitionsCallback;
typedef std::function<void(Result)> CloseCallback;
+using CreateProducerV2Callback = std::function<void(std::variant<Error,
Producer>)>;
+
class ClientImpl;
class PulsarFriend;
class PulsarWrapper;
@@ -108,7 +111,9 @@ class PULSAR_PUBLIC Client {
* @return ResultOk if the producer has been successfully created
* @return ResultError if there was an error
*/
- Result createProducer(const std::string& topic, const
ProducerConfiguration& conf, Producer& producer);
+ [[deprecated("use createProducerV2")]] Result createProducer(const
std::string& topic,
+ const
ProducerConfiguration& conf,
+ Producer&
producer);
/**
* Asynchronously create a producer with the default ProducerConfiguration
for publishing on a specific
@@ -118,7 +123,8 @@ class PULSAR_PUBLIC Client {
* @param callback the callback that is triggered when the producer is
created successfully or not
* @param callback Callback function that is invoked when the operation is
completed
*/
- void createProducerAsync(const std::string& topic, const
CreateProducerCallback& callback);
+ [[deprecated("use createProducerAsyncV2")]] void createProducerAsync(
+ const std::string& topic, const CreateProducerCallback& callback);
/**
* Asynchronously create a producer with the customized
ProducerConfiguration for publishing on a specific
@@ -127,8 +133,14 @@ class PULSAR_PUBLIC Client {
* @param topic the name of the topic where to produce
* @param conf the customized ProducerConfiguration
*/
- void createProducerAsync(const std::string& topic, const
ProducerConfiguration& conf,
- const CreateProducerCallback& callback);
+ [[deprecated("use createProducerAsyncV2")]] void createProducerAsync(
+ const std::string& topic, const ProducerConfiguration& conf, const
CreateProducerCallback& callback);
+
+ void createProducerAsyncV2(const std::string& topic, const
ProducerConfiguration& conf,
+ CreateProducerV2Callback callback);
+
+ std::variant<Error, Producer> createProducerV2(const std::string& topic,
+ const
ProducerConfiguration& conf);
/**
* Subscribe to a given topic and subscription combination with the
default ConsumerConfiguration
diff --git a/include/pulsar/Result.h b/include/pulsar/Result.h
index a6c30d4..b94c08c 100644
--- a/include/pulsar/Result.h
+++ b/include/pulsar/Result.h
@@ -23,6 +23,8 @@
#include <cstdint>
#include <iosfwd>
+#include <ostream>
+#include <string>
namespace pulsar {
@@ -101,6 +103,20 @@ enum Result : int8_t
PULSAR_PUBLIC const char* strResult(Result result);
PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, pulsar::Result result);
+
+struct PULSAR_PUBLIC Error {
+ Result result;
+ std::string message;
+};
+
+inline std::ostream& operator<<(std::ostream& os, const Error& error) {
+ os << error.result;
+ if (!error.message.empty()) {
+ os << " " << error.message;
+ }
+ return os;
+}
+
} // namespace pulsar
#endif /* ERROR_HPP_ */
diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc
index a110e96..7712e76 100644
--- a/lib/BinaryProtoLookupService.cc
+++ b/lib/BinaryProtoLookupService.cc
@@ -47,10 +47,10 @@ auto BinaryProtoLookupService::findBroker(const
std::string& address, bool autho
// NOTE: we can use move capture for topic since C++14
cnxPool_.getConnectionAsync(address).addListener([this, promise, topic,
address, authoritative,
- redirectCount](Result
result,
+ redirectCount](const
auto& error,
const
ClientConnectionWeakPtr& weakCnx) {
- if (result != ResultOk) {
- promise->setFailed(result);
+ if (error.result != ResultOk) {
+ promise->setFailed(error.result);
return;
}
auto cnx = weakCnx.lock();
@@ -62,10 +62,10 @@ auto BinaryProtoLookupService::findBroker(const
std::string& address, bool autho
auto lookupPromise = std::make_shared<LookupDataResultPromise>();
cnx->newTopicLookup(topic, authoritative, listenerName_,
newRequestId(), lookupPromise);
lookupPromise->getFuture().addListener([this, cnx, promise, topic,
address, redirectCount](
- Result result, const
LookupDataResultPtr& data) {
- if (result != ResultOk || !data) {
- LOG_ERROR("Lookup failed for " << topic << ", result " <<
result);
- promise->setFailed(result);
+ const Error& error, const
LookupDataResultPtr& data) {
+ if (error.result != ResultOk || !data) {
+ LOG_ERROR("Lookup failed for " << topic << ", result " <<
error);
+ promise->setFailed(error.result);
return;
}
@@ -96,15 +96,11 @@ auto BinaryProtoLookupService::findBroker(const
std::string& address, bool autho
return promise->getFuture();
}
-/*
- * @param topicName topic to get number of partitions.
- *
- */
-Future<Result, LookupDataResultPtr>
BinaryProtoLookupService::getPartitionMetadataAsync(
+Future<Error, LookupDataResultPtr>
BinaryProtoLookupService::getPartitionMetadataAsync(
const TopicNamePtr& topicName) {
LookupDataResultPromisePtr promise =
std::make_shared<LookupDataResultPromise>();
if (!topicName) {
- promise->setFailed(ResultInvalidTopicName);
+ promise->setFailed(Error{ResultInvalidTopicName, ""});
return promise->getFuture();
}
std::string lookupName = topicName->toString();
@@ -115,16 +111,17 @@ Future<Result, LookupDataResultPtr>
BinaryProtoLookupService::getPartitionMetada
return promise->getFuture();
}
-void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const
std::string& topicName, Result result,
+void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const
std::string& topicName,
+ const Error&
error,
const
ClientConnectionWeakPtr& clientCnx,
const
LookupDataResultPromisePtr& promise) {
- if (result != ResultOk) {
- promise->setFailed(result);
+ if (error.result != ResultOk) {
+ promise->setFailed(error);
return;
}
auto conn = clientCnx.lock();
if (!conn) {
- promise->setFailed(ResultConnectError);
+ promise->setFailed(Error{ResultConnectError, ""});
return;
}
LookupDataResultPromisePtr lookupPromise =
std::make_shared<LookupDataResultPromise>();
@@ -135,7 +132,7 @@ void
BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
std::placeholders::_2,
clientCnx, promise));
}
-void BinaryProtoLookupService::handlePartitionMetadataLookup(const
std::string& topicName, Result result,
+void BinaryProtoLookupService::handlePartitionMetadataLookup(const
std::string& topicName, const Error& error,
const
LookupDataResultPtr& data,
const
ClientConnectionWeakPtr& clientCnx,
const
LookupDataResultPromisePtr& promise) {
@@ -144,8 +141,8 @@ void
BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string&
<<
data->getBrokerUrl());
promise->setValue(data);
} else {
- LOG_DEBUG("PartitionMetadataLookup failed for " << topicName << ",
result " << result);
- promise->setFailed(result);
+ LOG_DEBUG("PartitionMetadataLookup failed for " << topicName << ",
result " << error);
+ promise->setFailed(error);
}
}
@@ -168,26 +165,28 @@ Future<Result, NamespaceTopicsPtr>
BinaryProtoLookupService::getTopicsOfNamespac
return promise->getFuture();
}
-Future<Result, SchemaInfo> BinaryProtoLookupService::getSchema(const
TopicNamePtr& topicName,
- const
std::string& version) {
- GetSchemaPromisePtr promise = std::make_shared<Promise<Result,
SchemaInfo>>();
-
+Future<Error, SchemaInfo> BinaryProtoLookupService::getSchema(const
TopicNamePtr& topicName,
+ const
std::string& version) {
+ GetSchemaPromisePtr promise = std::make_shared<Promise<Error,
SchemaInfo>>();
if (!topicName) {
- promise->setFailed(ResultInvalidTopicName);
+ promise->setFailed(Error{ResultInvalidTopicName, ""});
return promise->getFuture();
}
- cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
+
+ const auto topic = topicName->toString();
+ const auto address = serviceNameResolver_.resolveHost();
+ cnxPool_.getConnectionAsync(address, address)
.addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this,
topicName->toString(),
version, std::placeholders::_1,
std::placeholders::_2, promise));
-
return promise->getFuture();
}
void BinaryProtoLookupService::sendGetSchemaRequest(const std::string&
topicName, const std::string& version,
- Result result, const
ClientConnectionWeakPtr& clientCnx,
+ const Error& error,
+ const
ClientConnectionWeakPtr& clientCnx,
const GetSchemaPromisePtr&
promise) {
- if (result != ResultOk) {
- promise->setFailed(result);
+ if (error.result != ResultOk) {
+ promise->setFailed(error);
return;
}
@@ -195,11 +194,10 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const
std::string& topicName
uint64_t requestId = newRequestId();
LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName:
" << topicName
<< " version: " << version);
-
conn->newGetSchema(topicName, version, requestId)
- .addListener([promise](Result result, const SchemaInfo& schemaInfo) {
- if (result != ResultOk) {
- promise->setFailed(result);
+ .addListener([promise](const auto& error, const SchemaInfo&
schemaInfo) {
+ if (error.result != ResultOk) {
+ promise->setFailed(error);
return;
}
promise->setValue(schemaInfo);
@@ -208,11 +206,11 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const
std::string& topicName
void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const
std::string& nsName,
CommandGetTopicsOfNamespace_Mode mode,
- Result result,
+ const Error&
error,
const
ClientConnectionWeakPtr& clientCnx,
const
NamespaceTopicsPromisePtr& promise) {
- if (result != ResultOk) {
- promise->setFailed(result);
+ if (error.result != ResultOk) {
+ promise->setFailed(error.result);
return;
}
diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h
index 35dcb16..b5adc42 100644
--- a/lib/BinaryProtoLookupService.h
+++ b/lib/BinaryProtoLookupService.h
@@ -35,7 +35,7 @@ class ConnectionPool;
class LookupDataResult;
class ServiceNameResolver;
using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result,
NamespaceTopicsPtr>>;
-using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;
+using GetSchemaPromisePtr = std::shared_ptr<Promise<Error, SchemaInfo>>;
class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
public:
@@ -48,12 +48,12 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public
LookupService {
LookupResultFuture getBroker(const TopicName& topicName) override;
- Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr& topicName) override;
+ Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr& topicName) override;
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode)
override;
- Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const
std::string& version) override;
+ Future<Error, SchemaInfo> getSchema(const TopicNamePtr& topicName, const
std::string& version) override;
ServiceNameResolver& getServiceNameResolver() override { return
serviceNameResolver_; }
@@ -71,20 +71,20 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public
LookupService {
std::string listenerName_;
const int32_t maxLookupRedirects_;
- void sendPartitionMetadataLookupRequest(const std::string& topicName,
Result result,
+ void sendPartitionMetadataLookupRequest(const std::string& topicName,
const Error& error,
const ClientConnectionWeakPtr&
clientCnx,
const LookupDataResultPromisePtr&
promise);
- void handlePartitionMetadataLookup(const std::string& topicName, Result
result,
+ void handlePartitionMetadataLookup(const std::string& topicName, const
Error& error,
const LookupDataResultPtr& data,
const ClientConnectionWeakPtr&
clientCnx,
const LookupDataResultPromisePtr&
promise);
void sendGetTopicsOfNamespaceRequest(const std::string& nsName,
CommandGetTopicsOfNamespace_Mode mode,
- Result result, const
ClientConnectionWeakPtr& clientCnx,
+ const Error& error, const
ClientConnectionWeakPtr& clientCnx,
const NamespaceTopicsPromisePtr&
promise);
- void sendGetSchemaRequest(const std::string& topicName, const std::string&
version, Result result,
+ void sendGetSchemaRequest(const std::string& topicName, const std::string&
version, const Error& error,
const ClientConnectionWeakPtr& clientCnx, const
GetSchemaPromisePtr& promise);
void getTopicsOfNamespaceListener(Result result, const NamespaceTopicsPtr&
topicsPtr,
diff --git a/lib/Client.cc b/lib/Client.cc
index 48e92dd..32ab87c 100644
--- a/lib/Client.cc
+++ b/lib/Client.cc
@@ -19,6 +19,7 @@
#include <pulsar/Client.h>
#include <pulsar/ServiceInfoProvider.h>
+#include <future>
#include <iostream>
#include <memory>
#include <utility>
@@ -65,7 +66,31 @@ void Client::createProducerAsync(const std::string& topic,
const CreateProducerC
void Client::createProducerAsync(const std::string& topic, const
ProducerConfiguration& conf,
const CreateProducerCallback& callback) {
- impl_->createProducerAsync(topic, conf, callback);
+ impl_->createProducerAsync(topic, conf, [callback](const auto& v) {
+ if (const auto* error = std::get_if<Error>(&v)) {
+ callback(error->result, Producer());
+ } else {
+ callback(ResultOk, std::get<Producer>(v));
+ }
+ });
+}
+
+void Client::createProducerAsyncV2(const std::string& topic, const
ProducerConfiguration& conf,
+ CreateProducerV2Callback callback) {
+ impl_->createProducerAsync(topic, conf, std::move(callback));
+}
+
+std::variant<Error, Producer> Client::createProducerV2(const std::string&
topic,
+ const
ProducerConfiguration& conf) {
+ std::promise<std::variant<Error, Producer>> promise;
+ createProducerAsyncV2(topic, conf, [&promise](const auto& v) mutable {
+ if (const auto* error = std::get_if<Error>(&v)) {
+ promise.set_value(*error);
+ } else {
+ promise.set_value(std::get<Producer>(v));
+ }
+ });
+ return promise.get_future().get();
}
Result Client::subscribe(const std::string& topic, const std::string&
subscriptionName, Consumer& consumer) {
@@ -169,9 +194,9 @@ void Client::createTableViewAsync(const std::string& topic,
const TableViewConfi
}
Result Client::getPartitionsForTopic(const std::string& topic,
std::vector<std::string>& partitions) {
- Promise<Result, std::vector<std::string> > promise;
- getPartitionsForTopicAsync(topic,
WaitForCallbackValue<std::vector<std::string> >(promise));
- Future<Result, std::vector<std::string> > future = promise.getFuture();
+ Promise<Result, std::vector<std::string>> promise;
+ getPartitionsForTopicAsync(topic,
WaitForCallbackValue<std::vector<std::string>>(promise));
+ Future<Result, std::vector<std::string>> future = promise.getFuture();
return future.get(partitions);
}
@@ -199,7 +224,9 @@ uint64_t Client::getNumberOfConsumers() { return
impl_->getNumberOfConsumers();
void Client::getSchemaInfoAsync(const std::string& topic, int64_t version,
std::function<void(Result, const SchemaInfo&)>
callback) {
impl_->getSchema(TopicName::get(topic), (version >= 0) ?
toBigEndianBytes(version) : "")
- .addListener(std::move(callback));
+ .addListener([callback{std::move(callback)}](const Error& error, const
SchemaInfo& schemaInfo) {
+ callback(error.result, schemaInfo);
+ });
}
ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); }
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index a135ade..9559703 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -209,8 +209,7 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
LOG_INFO(cnxString() << "Create ClientConnection, timeout="
<< clientConfiguration.getConnectionTimeout());
if (!authentication_) {
- LOG_ERROR("Invalid authentication plugin");
- throw ResultAuthenticationError;
+ throw Error{ResultAuthenticationError, "Invalid authentication
plugin"};
return;
}
@@ -248,7 +247,7 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
ctx.load_verify_file(trustCertFilePath);
} else {
LOG_ERROR(trustCertFilePath << ": No such trustCertFile");
- throw ResultAuthenticationError;
+ throw Error{ResultAuthenticationError, trustCertFilePath +
": No such trustCertFile"};
}
} else {
ctx.set_default_verify_paths();
@@ -265,11 +264,11 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
tlsPrivateKey = authData->getTlsPrivateKey();
if (!file_exists(tlsCertificates)) {
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
- throw ResultAuthenticationError;
+ throw Error{ResultAuthenticationError, tlsCertificates + ": No
such tlsCertificates"};
}
if (!file_exists(tlsCertificates)) {
LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
- throw ResultAuthenticationError;
+ throw Error{ResultAuthenticationError, tlsCertificates + ": No
such tlsCertificates"};
}
ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem);
ctx.use_certificate_chain_file(tlsCertificates);
@@ -304,7 +303,7 @@ ClientConnection::~ClientConnection() {
void ClientConnection::handlePulsarConnected(const proto::CommandConnected&
cmdConnected) {
if (!cmdConnected.has_server_version()) {
LOG_ERROR(cnxString() << "Server version is not set");
- close();
+ close(Error{ResultConnectError, cnxString() + "Server version is not
set"});
return;
}
@@ -414,7 +413,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR&
err, const tcp::endp
std::atomic_store(&cnxStringPtr_,
std::make_shared<std::string>(cnxStringStream.str()));
} catch (const ASIO_SYSTEM_ERROR& e) {
LOG_ERROR("Failed to get endpoint: " << e.what());
- close(ResultRetryable);
+ close(Error{ResultRetryable, ""});
return;
}
if (logicalAddress_ == physicalAddress_) {
@@ -470,7 +469,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR&
err, const tcp::endp
Url service_url;
if (!Url::parse(physicalAddress_, service_url)) {
LOG_ERROR(cnxString() << "Invalid Url, unable to parse: "
<< err << " " << err.message());
- close();
+ close(Error{ResultConnectError, "Invalid Url, unable to
parse: " + err.message()});
return;
}
}
@@ -492,9 +491,9 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR&
err, const tcp::endp
cancelTimer(*connectTimer_);
}
if (err == ASIO::error::operation_aborted) {
- close();
+ close(Error{ResultConnectError, "Connection attempt was
canceled"});
} else {
- close(ResultRetryable);
+ close(Error{ResultRetryable, ""});
}
}
}
@@ -503,10 +502,10 @@ void ClientConnection::handleHandshake(const ASIO_ERROR&
err) {
if (err) {
if (err.value() == ASIO::ssl::error::stream_truncated) {
LOG_WARN(cnxString() << "Handshake failed: " << err.message());
- close(ResultRetryable);
+ close(Error{ResultRetryable, ""});
} else {
LOG_ERROR(cnxString() << "Handshake failed: " << err.message());
- close();
+ close(Error{ResultConnectError, "Handshake failed: " +
err.message()});
}
return;
}
@@ -515,16 +514,17 @@ void ClientConnection::handleHandshake(const ASIO_ERROR&
err) {
Result result = ResultOk;
SharedBuffer buffer;
try {
+ // TODO: pass error instead of result
buffer = Commands::newConnect(authentication_, logicalAddress_,
connectingThroughProxy,
clientVersion_, result);
} catch (const std::exception& e) {
LOG_ERROR(cnxString() << "Failed to create Connect command: " <<
e.what());
- close(ResultAuthenticationError);
+ close(Error{ResultAuthenticationError, "Failed to create Connect
command: " + std::string(e.what())});
return;
}
if (result != ResultOk) {
LOG_ERROR(cnxString() << "Failed to establish connection: " << result);
- close(result);
+ close(Error{result, ""});
return;
}
// Send CONNECT command to broker
@@ -541,7 +541,7 @@ void ClientConnection::handleSentPulsarConnect(const
ASIO_ERROR& err, const Shar
}
if (err) {
LOG_ERROR(cnxString() << "Failed to establish connection: " <<
err.message());
- close();
+ close(Error{ResultConnectError, "Failed to establish connection: " +
err.message()});
return;
}
@@ -555,7 +555,7 @@ void ClientConnection::handleSentAuthResponse(const
ASIO_ERROR& err, const Share
}
if (err) {
LOG_WARN(cnxString() << "Failed to send auth response: " <<
err.message());
- close();
+ close(Error{ResultRetryable, "Failed to send auth response: " +
err.message()});
return;
}
}
@@ -576,14 +576,15 @@ void ClientConnection::tcpConnectAsync() {
std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
if (!Url::parse(hostUrl, service_url)) {
LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " << err << "
" << err.message());
- close();
+ close(Error{ResultConnectError, "Invalid Url, unable to parse: " +
err.message()});
return;
}
if (service_url.protocol() != "pulsar" && service_url.protocol() !=
"pulsar+ssl") {
LOG_ERROR(cnxString() << "Invalid Url protocol '" <<
service_url.protocol()
<< "'. Valid values are 'pulsar' and
'pulsar+ssl'");
- close();
+ close(Error{ResultConnectError, "Invalid Url protocol '" +
service_url.protocol() +
+ "'. Valid values are 'pulsar' and
'pulsar+ssl'"});
return;
}
@@ -598,7 +599,7 @@ void ClientConnection::handleResolve(ASIO_ERROR err, const
tcp::resolver::result
if (err) {
std::string hostUrl = isSniProxy_ ? cnxString() : proxyServiceUrl_;
LOG_ERROR(hostUrl << "Resolve error: " << err << " : " <<
err.message());
- close();
+ close(Error{ResultConnectError, hostUrl + " Resolve error: " +
err.message()});
return;
}
@@ -625,7 +626,8 @@ void ClientConnection::handleResolve(ASIO_ERROR err, const
tcp::resolver::result
LOG_ERROR(cnxString() << "Connection to " << results << " was not
established in "
<< connectTimeout_.count() << " ms");
lock.unlock();
- close();
+ close(Error{ResultConnectError, "Connection was not established in
" +
+
std::to_string(connectTimeout_.count()) + " ms"});
} // else: the connection is closed or already established
});
@@ -660,7 +662,7 @@ void ClientConnection::handleRead(const ASIO_ERROR& err,
size_t bytesTransferred
} else {
LOG_ERROR(cnxString() << "Read operation failed: " <<
err.message());
}
- close(ResultDisconnected);
+ close(Error{ResultDisconnected, ""});
} else if (bytesTransferred < minReadSize) {
// Read the remaining part, use a slice of buffer to write on the next
// region
@@ -711,7 +713,7 @@ void ClientConnection::processIncomingBuffer() {
proto::BaseCommand incomingCmd;
if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
LOG_ERROR(cnxString() << "Error parsing protocol buffer command");
- close(ResultDisconnected);
+ close(Error{ResultDisconnected, ""});
return;
}
@@ -735,7 +737,7 @@ void ClientConnection::processIncomingBuffer() {
<< ", message ledger id " <<
incomingCmd.message().message_id().ledgerid()
<< ", entry id " <<
incomingCmd.message().message_id().entryid()
<< "] Error parsing broker entry metadata");
- close(ResultDisconnected);
+ close(Error{ResultDisconnected, ""});
return;
}
incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 +
brokerEntryMetadataSize);
@@ -753,7 +755,7 @@ void ClientConnection::processIncomingBuffer() {
<< ", message ledger id " <<
incomingCmd.message().message_id().ledgerid() //
<< ", entry id " <<
incomingCmd.message().message_id().entryid()
<< "] Error parsing message metadata");
- close(ResultDisconnected);
+ close(Error{ResultDisconnected, ""});
return;
}
@@ -886,7 +888,8 @@ void ClientConnection::handleIncomingCommand(BaseCommand&
incomingCmd) {
// Handle Pulsar Connected
if (incomingCmd.type() != BaseCommand::CONNECTED) {
// Wrong cmd
- close();
+ close(Error{ResultDisconnected, cnxString() + "Expected
CONNECTED command but received " +
+
Commands::messageType(incomingCmd.type())});
} else {
handlePulsarConnected(incomingCmd.connected());
}
@@ -987,7 +990,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand&
incomingCmd) {
default:
LOG_WARN(cnxString() << "Received invalid message from
server");
- close(ResultDisconnected);
+ close(Error{ResultDisconnected, cnxString() + "Received
invalid message from server"});
break;
}
}
@@ -1032,11 +1035,11 @@ void ClientConnection::newLookup(const SharedBuffer&
cmd, uint64_t requestId, co
Lock lock(mutex_);
if (isClosed()) {
lock.unlock();
- promise->setFailed(ResultNotConnected);
+ promise->setFailed(Error{ResultNotConnected, ""});
return;
} else if (numOfPendingLookupRequest_ >= maxPendingLookupRequest_) {
lock.unlock();
- promise->setFailed(ResultTooManyLookupRequestException);
+ promise->setFailed(Error{ResultTooManyLookupRequestException, ""});
return;
}
@@ -1048,13 +1051,14 @@ void ClientConnection::newLookup(const SharedBuffer&
cmd, uint64_t requestId, co
self->numOfPendingLookupRequest_--;
}
});
- request->getFuture().addListener([promise](Result result, const
LookupDataResultPtr& lookupDataResult) {
- if (result == ResultOk) {
- promise->setValue(lookupDataResult);
- } else {
- promise->setFailed(result);
- }
- });
+ request->getFuture().addListener(
+ [promise](const auto& error, const LookupDataResultPtr&
lookupDataResult) {
+ if (error.result == ResultOk) {
+ promise->setValue(lookupDataResult);
+ } else {
+ promise->setFailed(error);
+ }
+ });
numOfPendingLookupRequest_++;
lock.unlock();
@@ -1112,7 +1116,7 @@ void ClientConnection::handleSend(const ASIO_ERROR& err,
const SharedBuffer&) {
}
if (err) {
LOG_WARN(cnxString() << "Could not send message on connection: " <<
err << " " << err.message());
- close(ResultDisconnected);
+ close(Error{ResultDisconnected, ""});
} else {
sendPendingCommands();
}
@@ -1124,7 +1128,7 @@ void ClientConnection::handleSendPair(const ASIO_ERROR&
err) {
}
if (err) {
LOG_WARN(cnxString() << "Could not send pair message on connection: "
<< err << " " << err.message());
- close(ResultDisconnected);
+ close(Error{ResultDisconnected, ""});
} else {
sendPendingCommands();
}
@@ -1164,16 +1168,16 @@ void ClientConnection::sendPendingCommands() {
}
}
-Future<Result, ResponseData> ClientConnection::sendRequestWithId(const
SharedBuffer& cmd, int requestId,
- const char*
requestType) {
+Future<Error, ResponseData> ClientConnection::sendRequestWithId(const
SharedBuffer& cmd, int requestId,
+ const char*
requestType) {
Lock lock(mutex_);
if (isClosed()) {
lock.unlock();
LOG_DEBUG(cnxString() << "Fail " << requestType << "(req_id: " <<
requestId
<< ") to a closed connection");
- Promise<Result, ResponseData> promise;
- promise.setFailed(ResultNotConnected);
+ Promise<Error, ResponseData> promise;
+ promise.setFailed(Error{ResultNotConnected, ""});
return promise.getFuture();
}
@@ -1206,7 +1210,7 @@ void ClientConnection::handleKeepAliveTimeout(const
ASIO_ERROR& ec) {
if (havePendingPingRequest_) {
LOG_WARN(cnxString() << "Forcing connection to close after keep-alive
timeout");
- close(ResultDisconnected);
+ close(Error{ResultDisconnected, ""});
} else {
// Send keep alive probe to peer
LOG_DEBUG(cnxString() << "Sending ping message");
@@ -1234,10 +1238,10 @@ void ClientConnection::handleConsumerStatsTimeout(const
ASIO_ERROR& ec,
startConsumerStatsTimer(consumerStatsRequests);
}
-const std::future<void>& ClientConnection::close(Result result, bool
switchCluster) {
+const std::future<void>& ClientConnection::close(Error&& error, bool
switchCluster) {
Lock lock(mutex_);
if (closeFuture_) {
- connectPromise_.setFailed(result);
+ connectPromise_.setFailed(std::move(error));
return *closeFuture_;
}
auto promise = std::make_shared<std::promise<void>>();
@@ -1279,6 +1283,7 @@ const std::future<void>& ClientConnection::close(Result
result, bool switchClust
cancelTimer(*connectTimer_);
lock.unlock();
int refCount = weak_from_this().use_count();
+ auto result = error.result;
if (result != ResultAlreadyClosed /* closed by the pool */ &&
!isResultRetryable(result)) {
LOG_ERROR(cnxString() << "Connection closed with " << result << "
(refCnt: " << refCount << ")");
} else {
@@ -1327,37 +1332,33 @@ const std::future<void>& ClientConnection::close(Result
result, bool switchClust
}
self.reset();
- connectPromise_.setFailed(result);
+ connectPromise_.setFailed(error);
// Fail all pending requests after releasing the lock.
for (auto& kv : pendingRequests) {
- kv.second->fail(result);
+ kv.second->fail(error);
}
for (auto& kv : pendingLookupRequests) {
- kv.second->fail(result);
+ kv.second->fail(error);
}
for (auto& kv : pendingConsumerStatsMap) {
LOG_ERROR(cnxString() << " Closing Client Connection, please try again
later");
kv.second.setFailed(result);
}
for (auto& kv : pendingGetLastMessageIdRequests) {
- kv.second->fail(result);
+ kv.second->fail(error);
}
for (auto& kv : pendingGetNamespaceTopicsRequests) {
- kv.second->fail(result);
+ kv.second->fail(error);
}
for (auto& kv : pendingGetSchemaRequests) {
- kv.second->fail(result);
+ kv.second->fail(error);
}
return *closeFuture_;
}
bool ClientConnection::isClosed() const { return state_ == Disconnected; }
-Future<Result, ClientConnectionWeakPtr> ClientConnection::getConnectFuture() {
- return connectPromise_.getFuture();
-}
-
void ClientConnection::registerProducer(int producerId, const ProducerImplPtr&
producer) {
Lock lock(mutex_);
producers_.insert(std::make_pair(producerId, producer));
@@ -1405,12 +1406,21 @@ Future<Result, GetLastMessageIdResponse>
ClientConnection::newGetLastMessageId(u
});
lock.unlock();
+ // TODO: return Error instead
+ Promise<Result, GetLastMessageIdResponse> promise;
+ request->getFuture().addListener([promise](const auto& error, const auto&
response) {
+ if (error.result == ResultOk) {
+ promise.setValue(response);
+ } else {
+ promise.setFailed(error.result);
+ }
+ });
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ !=
nullptr &&
mockServer_->sendRequest("GET_LAST_MESSAGE_ID", requestId)) {
- return request->getFuture();
+ return promise.getFuture();
}
sendCommand(Commands::newGetLastMessageId(consumerId, requestId));
- return request->getFuture();
+ return promise.getFuture();
}
Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
@@ -1429,23 +1439,34 @@ Future<Result, NamespaceTopicsPtr>
ClientConnection::newGetTopicsOfNamespace(
LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to
broker, req_id: " << requestId);
});
lock.unlock();
+
+ // TODO: return Error instead
+ Promise<Result, NamespaceTopicsPtr> promise;
+ request->getFuture().addListener([promise](const auto& error, const auto&
response) {
+ if (error.result == ResultOk) {
+ promise.setValue(response);
+ } else {
+ promise.setFailed(error.result);
+ }
+ });
+
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ !=
nullptr &&
mockServer_->sendRequest("GET_TOPICS_OF_NAMESPACE", requestId)) {
- return request->getFuture();
+ return promise.getFuture();
}
sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId));
- return request->getFuture();
+ return promise.getFuture();
}
-Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string&
topicName,
- const std::string&
version, uint64_t requestId) {
+Future<Error, SchemaInfo> ClientConnection::newGetSchema(const std::string&
topicName,
+ const std::string&
version, uint64_t requestId) {
Lock lock(mutex_);
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString() << "Client is not connected to the broker");
- Promise<Result, SchemaInfo> promise;
- promise.setFailed(ResultNotConnected);
+ Promise<Error, SchemaInfo> promise;
+ promise.setFailed(Error{ResultNotConnected, ""});
return promise.getFuture();
}
@@ -1453,14 +1474,15 @@ Future<Result, SchemaInfo>
ClientConnection::newGetSchema(const std::string& top
insertRequest(pendingGetSchemaRequests_, requestId, [cnxString =
cnxString(), requestId]() {
LOG_WARN(cnxString << "GetSchema request timeout to broker,
req_id: " << requestId);
});
+ auto future = request->getFuture();
lock.unlock();
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ !=
nullptr &&
mockServer_->sendRequest("GET_SCHEMA", requestId)) {
- return request->getFuture();
+ return future;
}
sendCommand(Commands::newGetSchema(topicName, version, requestId));
- return request->getFuture();
+ return future;
}
void ClientConnection::checkServerError(ServerError error, const std::string&
message) {
@@ -1486,7 +1508,7 @@ void ClientConnection::handleSendReceipt(const
proto::CommandSendReceipt& sendRe
if (!producer->ackReceived(sequenceId, messageId)) {
// If the producer fails to process the ack, we need to close
the connection
// to give it a chance to recover from there
- close(ResultDisconnected);
+ close(Error{ResultDisconnected, ""});
}
}
} else {
@@ -1510,12 +1532,12 @@ void ClientConnection::handleSendError(const
proto::CommandSendError& error) {
if (!producer->removeCorruptMessage(sequenceId)) {
// If the producer fails to remove corrupt msg, we need to
close the
// connection to give it a chance to recover from there
- close(ResultDisconnected);
+ close(Error{ResultDisconnected, ""});
}
}
}
} else {
- close(ResultDisconnected);
+ close(Error{ResultDisconnected, ""});
}
}
@@ -1554,13 +1576,16 @@ void
ClientConnection::handlePartitionedMetadataResponse(
<< partitionMetadataResponse.request_id()
<< " error: " <<
partitionMetadataResponse.error()
<< " msg: " <<
partitionMetadataResponse.message());
- checkServerError(partitionMetadataResponse.error(),
partitionMetadataResponse.message());
- request->fail(
- getResult(partitionMetadataResponse.error(),
partitionMetadataResponse.message()));
+ auto msg = partitionMetadataResponse.message();
+ checkServerError(partitionMetadataResponse.error(), msg);
+ auto result = getResult(partitionMetadataResponse.error(),
msg);
+ request->fail(Error{result, std::move(msg)});
} else {
LOG_ERROR(cnxString() << "Failed partition-metadata lookup
req_id: "
<<
partitionMetadataResponse.request_id() << " with empty response: ");
- request->fail(ResultConnectError);
+ request->fail(
+ Error{ResultConnectError, "Empty response from broker for
request " +
+
std::to_string(partitionMetadataResponse.request_id())});
}
} else {
LookupDataResultPtr lookupResultPtr =
std::make_shared<LookupDataResult>();
@@ -1627,12 +1652,16 @@ void ClientConnection::handleLookupTopicRespose(
LOG_ERROR(cnxString() << "Failed lookup req_id: " <<
lookupTopicResponse.request_id()
<< " error: " <<
lookupTopicResponse.error()
<< " msg: " <<
lookupTopicResponse.message());
- checkServerError(lookupTopicResponse.error(),
lookupTopicResponse.message());
- request->fail(getResult(lookupTopicResponse.error(),
lookupTopicResponse.message()));
+ auto msg = lookupTopicResponse.message();
+ checkServerError(lookupTopicResponse.error(), msg);
+ auto result = getResult(lookupTopicResponse.error(),
lookupTopicResponse.message());
+ request->fail(Error{result, std::move(msg)});
} else {
LOG_ERROR(cnxString() << "Failed lookup req_id: " <<
lookupTopicResponse.request_id()
<< " with empty response: ");
- request->fail(ResultConnectError);
+ request->fail(
+ Error{ResultConnectError, "Empty response from broker for
request " +
+
std::to_string(lookupTopicResponse.request_id())});
}
} else {
LOG_DEBUG(cnxString() << "Received lookup response from server.
req_id: "
@@ -1702,6 +1731,7 @@ void ClientConnection::handleError(const
proto::CommandError& error) {
LOG_WARN(cnxString() << "Received error response from server: " << result
<< (error.has_message() ? (" (" + error.message() +
")") : "")
<< " -- req_id: " << error.request_id());
+ Error requestError{result, error.message()};
Lock lock(mutex_);
@@ -1711,7 +1741,7 @@ void ClientConnection::handleError(const
proto::CommandError& error) {
pendingRequests_.erase(it);
lock.unlock();
- request->fail(result);
+ request->fail(requestError);
} else {
auto it = pendingGetLastMessageIdRequests_.find(error.request_id());
if (it != pendingGetLastMessageIdRequests_.end()) {
@@ -1719,7 +1749,7 @@ void ClientConnection::handleError(const
proto::CommandError& error) {
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();
- request->fail(result);
+ request->fail(requestError);
} else {
auto it =
pendingGetNamespaceTopicsRequests_.find(error.request_id());
if (it != pendingGetNamespaceTopicsRequests_.end()) {
@@ -1727,7 +1757,7 @@ void ClientConnection::handleError(const
proto::CommandError& error) {
pendingGetNamespaceTopicsRequests_.erase(it);
lock.unlock();
- request->fail(result);
+ request->fail(requestError);
} else {
lock.unlock();
}
@@ -1857,10 +1887,11 @@ void ClientConnection::handleAuthChallenge() {
LOG_DEBUG(cnxString() << "Received auth challenge from broker");
Result result;
+ // TODO: use Error instead
SharedBuffer buffer = Commands::newAuthResponse(authentication_, result);
if (result != ResultOk) {
LOG_ERROR(cnxString() << "Failed to send auth response: " << result);
- close(result);
+ close(Error{result, "Failed to auth response"});
return;
}
auto self = shared_from_this();
@@ -1955,7 +1986,7 @@ void ClientConnection::handleGetSchemaResponse(const
proto::CommandGetSchemaResp
: "")
<< " -- req_id: " <<
response.request_id());
}
- request->fail(result);
+ request->fail(Error{result, response.error_message()});
return;
}
@@ -1992,7 +2023,7 @@ void ClientConnection::handleAckResponse(const
proto::CommandAckResponse& respon
lock.unlock();
if (response.has_error()) {
- request->fail(getResult(response.error(), ""));
+ request->fail(Error{getResult(response.error(), ""),
response.message()});
} else {
request->complete({});
}
@@ -2003,7 +2034,7 @@ void ClientConnection::unsafeRemovePendingRequest(long
requestId) {
if (it != pendingRequests_.end()) {
auto request = std::move(it->second);
pendingRequests_.erase(it);
- request->fail(ResultDisconnected);
+ request->fail(Error{ResultDisconnected, ""});
}
}
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index c8cd86f..fd89fae 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -163,11 +163,11 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
* @param result all pending futures will complete with this result
* @param switchCluster whether the close is triggered by cluster switching
*/
- const std::future<void>& close(Result result = ResultConnectError, bool
switchCluster = false);
+ const std::future<void>& close(Error&& error = Error{ResultConnectError,
""}, bool switchCluster = false);
bool isClosed() const;
- Future<Result, ClientConnectionWeakPtr> getConnectFuture();
+ auto getConnectFuture() const { return connectPromise_.getFuture(); }
Future<Result, ClientConnectionWeakPtr> getCloseFuture();
@@ -191,8 +191,8 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
* Send a request with a specific Id over the connection. The future will
be
* triggered when the response for this request is received
*/
- Future<Result, ResponseData> sendRequestWithId(const SharedBuffer& cmd,
int requestId,
- const char* requestType);
+ Future<Error, ResponseData> sendRequestWithId(const SharedBuffer& cmd, int
requestId,
+ const char* requestType);
const std::string& brokerAddress() const;
@@ -212,8 +212,8 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
CommandGetTopicsOfNamespace_Mode mode,
uint64_t
requestId);
- Future<Result, SchemaInfo> newGetSchema(const std::string& topicName,
const std::string& version,
- uint64_t requestId);
+ Future<Error, SchemaInfo> newGetSchema(const std::string& topicName, const
std::string& version,
+ uint64_t requestId);
void attachMockServer(const std::shared_ptr<MockServer>& mockServer) {
mockServer_ = mockServer;
@@ -334,7 +334,8 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
SharedBuffer incomingBuffer_;
- Promise<Result, ClientConnectionWeakPtr> connectPromise_;
+ Promise<Error, ClientConnectionWeakPtr> connectPromise_;
+
const std::chrono::milliseconds connectTimeout_;
const DeadlineTimerPtr connectTimer_;
diff --git a/lib/ClientConnectionAdaptor.h b/lib/ClientConnectionAdaptor.h
index 2c29937..780b35b 100644
--- a/lib/ClientConnectionAdaptor.h
+++ b/lib/ClientConnectionAdaptor.h
@@ -45,13 +45,13 @@ inline void checkServerError(Connection& connection,
ServerError error, const st
message.find("KeeperException") == std::string::npos &&
message.find("is being unloaded") == std::string::npos &&
message.find("the broker do not have test listener") ==
std::string::npos) {
- connection.close(ResultDisconnected);
+ connection.close(Error{ResultDisconnected, message});
}
break;
case proto::ServerError::TooManyRequests:
// TODO: Implement maxNumberOfRejectedRequestPerConnection like
// https://github.com/apache/pulsar/pull/274
- connection.close(ResultDisconnected);
+ connection.close(Error{ResultDisconnected, ""});
break;
default:
break;
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index b84c14c..1e4f022 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -28,6 +28,7 @@
#include <random>
#include <shared_mutex>
#include <sstream>
+#include <variant>
#include "BinaryProtoLookupService.h"
#include "ClientConfigurationImpl.h"
@@ -188,49 +189,52 @@ LookupServicePtr ClientImpl::getLookup(const std::string&
redirectedClusterURI)
}
void ClientImpl::createProducerAsync(const std::string& topic, const
ProducerConfiguration& conf,
- const CreateProducerCallback& callback,
bool autoDownloadSchema) {
+ CreateProducerV2Callback callback, bool
autoDownloadSchema) {
if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) {
throw std::invalid_argument("Batching and chunking of messages can't
be enabled together");
}
+
TopicNamePtr topicName;
{
std::shared_lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
- callback(ResultAlreadyClosed, Producer());
+ callback(Error{ResultAlreadyClosed, ""});
return;
} else if (!(topicName = TopicName::get(topic))) {
lock.unlock();
- callback(ResultInvalidTopicName, Producer());
+ // TODO: return an error in TopicName
+ callback(Error{ResultInvalidTopicName, ""});
return;
}
}
if (autoDownloadSchema) {
- auto self = shared_from_this();
- getSchema(topicName).addListener(
- [self, topicName, callback](Result res, const SchemaInfo&
topicSchema) {
- if (res != ResultOk) {
- callback(res, Producer());
- return;
- }
- ProducerConfiguration conf;
- conf.setSchema(topicSchema);
- self->getPartitionMetadataAsync(topicName).addListener(
- std::bind(&ClientImpl::handleCreateProducer, self,
std::placeholders::_1,
- std::placeholders::_2, topicName, conf,
callback));
- });
+ getSchema(topicName).addListener([self{shared_from_this()}, topicName,
callback{std::move(callback)}](
+ const Error& error, const
SchemaInfo& topicSchema) mutable {
+ if (error.result != ResultOk) {
+ callback(error);
+ return;
+ }
+ ProducerConfiguration conf;
+ conf.setSchema(topicSchema);
+ self->getPartitionMetadataAsync(topicName).addListener(
+ std::bind(&ClientImpl::handleCreateProducer, self,
std::placeholders::_1,
+ std::placeholders::_2, topicName, conf, callback));
+ });
} else {
getPartitionMetadataAsync(topicName).addListener(
- std::bind(&ClientImpl::handleCreateProducer, shared_from_this(),
std::placeholders::_1,
- std::placeholders::_2, topicName, conf, callback));
+ [this, conf, topicName, callback{std::move(callback)}](
+ const Error& error, const LookupDataResultPtr&
partitionMetadata) {
+ handleCreateProducer(error, partitionMetadata, topicName,
conf, callback);
+ });
}
}
-void ClientImpl::handleCreateProducer(Result result, const
LookupDataResultPtr& partitionMetadata,
+void ClientImpl::handleCreateProducer(const Error& error, const
LookupDataResultPtr& partitionMetadata,
const TopicNamePtr& topicName, const
ProducerConfiguration& conf,
- const CreateProducerCallback& callback) {
- if (!result) {
+ CreateProducerV2Callback callback) {
+ if (!error.result) {
ProducerImplBasePtr producer;
auto interceptors =
std::make_shared<ProducerInterceptors>(conf.getInterceptors());
@@ -244,36 +248,39 @@ void ClientImpl::handleCreateProducer(Result result,
const LookupDataResultPtr&
}
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to create producer: " << e.what());
- callback(ResultConnectError, {});
+ callback(Error{ResultConnectError, e.what()});
return;
}
producer->getProducerCreatedFuture().addListener(
- std::bind(&ClientImpl::handleProducerCreated, shared_from_this(),
std::placeholders::_1,
- std::placeholders::_2, callback, producer));
+ [this, self{shared_from_this()}, callback{std::move(callback)},
producer](
+ const Error& error, const ProducerImplBaseWeakPtr&
producerBaseWeakPtr) {
+ handleProducerCreated(error, producerBaseWeakPtr, callback,
producer);
+ });
producer->start();
} else {
LOG_ERROR("Error Checking/Getting Partition Metadata while creating
producer on "
- << topicName->toString() << " -- " << result);
- callback(result, Producer());
+ << topicName->toString() << " -- " << error.result);
+ callback(error);
}
}
-void ClientImpl::handleProducerCreated(Result result, const
ProducerImplBaseWeakPtr& producerBaseWeakPtr,
- const CreateProducerCallback& callback,
+void ClientImpl::handleProducerCreated(const Error& error, const
ProducerImplBaseWeakPtr& producerBaseWeakPtr,
+ const CreateProducerV2Callback&
callback,
const ProducerImplBasePtr& producer) {
- if (result == ResultOk) {
+ if (error.result == ResultOk) {
auto address = producer.get();
auto existingProducer = producers_.putIfAbsent(address, producer);
if (existingProducer) {
auto producer = existingProducer.value().lock();
- LOG_ERROR("Unexpected existing producer at the same address: "
- << address << ", producer: " << (producer ?
producer->getProducerName() : "(null)"));
- callback(ResultUnknownError, {});
+ const auto name = producer ? producer->getProducerName() :
"(null)";
+ LOG_ERROR("Unexpected existing producer at the same address: " <<
address
+ <<
", producer: " << name);
+ callback(Error{ResultUnknownError, "Unexpected existing producer
for name " + name});
return;
}
- callback(result, Producer(producer));
+ callback(Producer(producer));
} else {
- callback(result, {});
+ callback(error);
}
}
@@ -293,10 +300,11 @@ void ClientImpl::createReaderAsync(const std::string&
topic, const MessageId& st
}
}
- MessageId msgId(startMessageId);
- getPartitionMetadataAsync(topicName).addListener(
- std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(),
std::placeholders::_1,
- std::placeholders::_2, topicName, msgId, conf, callback));
+ getPartitionMetadataAsync(topicName).addListener([this,
self{shared_from_this()}, topicName,
+ startMessageId, conf,
+ callback](const auto&
error, const auto& metadata) {
+ handleReaderMetadataLookup(error.result, metadata, topicName,
startMessageId, conf, callback);
+ });
}
void ClientImpl::createTableViewAsync(const std::string& topic, const
TableViewConfiguration& conf,
@@ -507,9 +515,11 @@ void ClientImpl::subscribeAsync(const std::string& topic,
const std::string& sub
}
}
- getPartitionMetadataAsync(topicName).addListener(
- std::bind(&ClientImpl::handleSubscribe, shared_from_this(),
std::placeholders::_1,
- std::placeholders::_2, topicName, subscriptionName, conf,
callback));
+ getPartitionMetadataAsync(topicName).addListener([this,
self{shared_from_this()}, topicName,
+ subscriptionName, conf,
+ callback](const auto&
error, const auto& metadata) {
+ handleSubscribe(error.result, metadata, topicName, subscriptionName,
conf, callback);
+ });
}
void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr&
partitionMetadata,
@@ -604,8 +614,8 @@ GetConnectionFuture ClientImpl::getConnection(const
std::string& redirectedClust
useProxy_ = data.proxyThroughServiceUrl;
lookupCount_++;
pool_.getConnectionAsync(data.logicalAddress,
data.physicalAddress, key)
- .addListener([promise](Result result, const
ClientConnectionWeakPtr& weakCnx) {
- if (result == ResultOk) {
+ .addListener([promise](const auto& error, const
ClientConnectionWeakPtr& weakCnx) {
+ if (error.result == ResultOk) {
auto cnx = weakCnx.lock();
if (cnx) {
promise.setValue(cnx);
@@ -613,7 +623,7 @@ GetConnectionFuture ClientImpl::getConnection(const
std::string& redirectedClust
promise.setFailed(ResultConnectError);
}
} else {
- promise.setFailed(result);
+ promise.setFailed(error.result);
}
});
});
@@ -635,8 +645,8 @@ GetConnectionFuture ClientImpl::connect(const std::string&
redirectedClusterURI,
const auto& physicalAddress = getPhysicalAddress(redirectedClusterURI,
logicalAddress);
Promise<Result, ClientConnectionPtr> promise;
pool_.getConnectionAsync(logicalAddress, physicalAddress, key)
- .addListener([promise](Result result, const ClientConnectionWeakPtr&
weakCnx) {
- if (result == ResultOk) {
+ .addListener([promise](const auto& error, const
ClientConnectionWeakPtr& weakCnx) {
+ if (error.result == ResultOk) {
auto cnx = weakCnx.lock();
if (cnx) {
promise.setValue(cnx);
@@ -644,7 +654,7 @@ GetConnectionFuture ClientImpl::connect(const std::string&
redirectedClusterURI,
promise.setFailed(ResultConnectError);
}
} else {
- promise.setFailed(result);
+ promise.setFailed(error.result);
}
});
return promise.getFuture();
@@ -685,9 +695,10 @@ void ClientImpl::getPartitionsForTopicAsync(const
std::string& topic, const GetP
return;
}
}
-
getPartitionMetadataAsync(topicName).addListener(std::bind(&ClientImpl::handleGetPartitions,
-
shared_from_this(), std::placeholders::_1,
-
std::placeholders::_2, topicName, callback));
+ getPartitionMetadataAsync(topicName).addListener(
+ [this, self{shared_from_this()}, topicName, callback](const auto&
error, const auto& metadata) {
+ handleGetPartitions(error.result, metadata, topicName, callback);
+ });
}
void ClientImpl::closeAsync(const CloseCallback& callback) {
@@ -768,8 +779,8 @@ void ClientImpl::handleClose(Result result, const
SharedInt& numberOfOpenHandler
}
LOG_DEBUG("Shutting down producers and consumers for client");
- // handleClose() is called in ExecutorService's event loop, while
shutdown() tried to wait the event
- // loop exits. So here we use another thread to call shutdown().
+ // handleClose() is called in ExecutorService's event loop, while
shutdown() tried to wait the
+ // event loop exits. So here we use another thread to call shutdown().
auto self = shared_from_this();
std::thread shutdownTask{[this, self, callback] {
shutdown();
@@ -817,9 +828,9 @@ void ClientImpl::shutdown() {
}
LOG_DEBUG("ConnectionPool is closed");
- // 500ms as the timeout is long enough because ExecutorService::close
calls io_service::stop() internally
- // and waits until io_service::run() in another thread returns, which
should be as soon as possible after
- // stop() is called.
+ // 500ms as the timeout is long enough because ExecutorService::close
calls io_service::stop()
+ // internally and waits until io_service::run() in another thread returns,
which should be as soon as
+ // possible after stop() is called.
TimeoutProcessor<std::chrono::milliseconds> timeoutProcessor{500};
timeoutProcessor.tik();
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 7772b15..c046d8e 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -93,7 +93,7 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
* that exists for the topic.
*/
void createProducerAsync(const std::string& topic, const
ProducerConfiguration& conf,
- const CreateProducerCallback& callback, bool
autoDownloadSchema = false);
+ CreateProducerV2Callback callback, bool
autoDownloadSchema = false);
void subscribeAsync(const std::string& topic, const std::string&
subscriptionName,
const ConsumerConfiguration& conf, const
SubscribeCallback& callback);
@@ -161,7 +161,6 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
std::shared_lock lock(mutex_);
return lookupServicePtr_->getTopicsOfNamespaceAsync(nsName, mode);
}
-
auto getSchema(const TopicNamePtr& topicName, const std::string& version =
"") {
std::shared_lock lock(mutex_);
return lookupServicePtr_->getSchema(topicName, version);
@@ -172,9 +171,9 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
friend class PulsarFriend;
private:
- void handleCreateProducer(Result result, const LookupDataResultPtr&
partitionMetadata,
+ void handleCreateProducer(const Error& error, const LookupDataResultPtr&
partitionMetadata,
const TopicNamePtr& topicName, const
ProducerConfiguration& conf,
- const CreateProducerCallback& callback);
+ CreateProducerV2Callback callback);
void handleSubscribe(Result result, const LookupDataResultPtr&
partitionMetadata,
const TopicNamePtr& topicName, const std::string&
consumerName,
@@ -187,8 +186,8 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
void handleGetPartitions(Result result, const LookupDataResultPtr&
partitionMetadata,
const TopicNamePtr& topicName, const
GetPartitionsCallback& callback);
- void handleProducerCreated(Result result, const ProducerImplBaseWeakPtr&
producerWeakPtr,
- const CreateProducerCallback& callback, const
ProducerImplBasePtr& producer);
+ void handleProducerCreated(const Error& error, const
ProducerImplBaseWeakPtr& producerWeakPtr,
+ const CreateProducerV2Callback& callback, const
ProducerImplBasePtr& producer);
void handleConsumerCreated(Result result, const ConsumerImplBaseWeakPtr&
consumerWeakPtr,
const SubscribeCallback& callback, const
ConsumerImplBasePtr& consumer);
diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc
index 6465ff7..35d815a 100644
--- a/lib/ConnectionPool.cc
+++ b/lib/ConnectionPool.cc
@@ -59,7 +59,7 @@ bool ConnectionPool::close() {
auto& cnx = kv.second;
if (cnx) {
// Close with a fatal error to not let client retry
- auto& future = cnx->close(ResultAlreadyClosed);
+ auto& future = cnx->close(Error{ResultAlreadyClosed, ""});
using namespace std::chrono_literals;
if (auto status = future.wait_for(5s); status !=
std::future_status::ready) {
LOG_WARN("Connection close timed out for " <<
cnx.get()->cnxString());
@@ -86,7 +86,7 @@ bool ConnectionPool::close() {
void ConnectionPool::closeAllConnectionsForNewCluster() {
for (auto&& kv : releaseConnections()) {
- kv.second->close(ResultDisconnected, true);
+ kv.second->close(Error{ResultDisconnected, ""}, true);
}
}
@@ -97,12 +97,12 @@ static const std::string getKey(const std::string&
logicalAddress, const std::st
return ss.str();
}
-Future<Result, ClientConnectionWeakPtr>
ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
-
const std::string& physicalAddress,
-
size_t keySuffix) {
+Future<Error, ClientConnectionWeakPtr>
ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
+
const std::string& physicalAddress,
+
size_t keySuffix) {
if (closed_) {
- Promise<Result, ClientConnectionWeakPtr> promise;
- promise.setFailed(ResultAlreadyClosed);
+ Promise<Error, ClientConnectionWeakPtr> promise;
+ promise.setFailed(Error{ResultAlreadyClosed, ""});
return promise.getFuture();
}
@@ -133,21 +133,21 @@ Future<Result, ClientConnectionWeakPtr>
ConnectionPool::getConnectionAsync(const
cnx.reset(new ClientConnection(logicalAddress, physicalAddress,
*serviceInfo_.load(),
executorProvider_->get(keySuffix),
clientConfiguration_,
clientVersion_, *this, keySuffix));
- } catch (Result result) {
- Promise<Result, ClientConnectionWeakPtr> promise;
- promise.setFailed(result);
+ } catch (Error error) {
+ Promise<Error, ClientConnectionWeakPtr> promise;
+ promise.setFailed(std::move(error));
return promise.getFuture();
} catch (const std::runtime_error& e) {
lock.unlock();
LOG_ERROR("Failed to create connection: " << e.what())
- Promise<Result, ClientConnectionWeakPtr> promise;
- promise.setFailed(ResultConnectError);
+ Promise<Error, ClientConnectionWeakPtr> promise;
+ promise.setFailed(Error{ResultConnectError, e.what()});
return promise.getFuture();
}
LOG_INFO("Created connection for " << key);
- Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
+ auto future = cnx->getConnectFuture();
pool_.insert(std::make_pair(key, cnx));
lock.unlock();
diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h
index f828ac6..201c1d0 100644
--- a/lib/ConnectionPool.h
+++ b/lib/ConnectionPool.h
@@ -25,6 +25,7 @@
#include <pulsar/defines.h>
#include <atomic>
+#include <cstddef>
#include <map>
#include <memory>
#include <mutex>
@@ -82,16 +83,16 @@ class PULSAR_PUBLIC ConnectionPool {
* @param keySuffix the key suffix to choose which connection on the same
broker
* @return a future that will produce the ClientCnx object
*/
- Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const
std::string& logicalAddress,
- const
std::string& physicalAddress,
- size_t
keySuffix);
+ Future<Error, ClientConnectionWeakPtr> getConnectionAsync(const
std::string& logicalAddress,
+ const
std::string& physicalAddress,
+ size_t
keySuffix);
- Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const
std::string& logicalAddress,
- const
std::string& physicalAddress) {
+ Future<Error, ClientConnectionWeakPtr> getConnectionAsync(const
std::string& logicalAddress,
+ const
std::string& physicalAddress) {
return getConnectionAsync(logicalAddress, physicalAddress,
generateRandomIndex());
}
- Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const
std::string& address) {
+ Future<Error, ClientConnectionWeakPtr> getConnectionAsync(const
std::string& address) {
return getConnectionAsync(address, address);
}
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 85f4994..1060cd5 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -225,13 +225,12 @@ void ConsumerImpl::onNegativeAcksSend(const
std::set<MessageId>& messageIds) {
interceptors_->onNegativeAcksSend(Consumer(shared_from_this()),
messageIds);
}
-Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr&
cnx) {
- // Do not use bool, only Result.
- Promise<Result, bool> promise;
+Future<Error, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr&
cnx) {
+ Promise<Error, bool> promise;
if (state_ == Closed) {
LOG_DEBUG(getName() << "connectionOpened : Consumer is already
closed");
- promise.setFailed(ResultAlreadyClosed);
+ promise.setFailed(Error{ResultAlreadyClosed, ""});
return promise.getFuture();
}
@@ -262,10 +261,10 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const
ClientConnectionPtr& c
auto self = get_shared_this_ptr();
setFirstRequestIdAfterConnect(requestId);
cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE")
- .addListener([this, self, cnx, promise](Result result, const
ResponseData& responseData) {
- Result handleResult = handleCreateConsumer(cnx, result);
+ .addListener([this, self, cnx, promise](const Error& error, const
ResponseData& responseData) {
+ Result handleResult = handleCreateConsumer(cnx, error.result);
if (handleResult != ResultOk) {
- promise.setFailed(handleResult);
+ promise.setFailed(Error{handleResult, ""});
return;
}
promise.setSuccess();
@@ -314,11 +313,11 @@ Result ConsumerImpl::handleCreateConsumer(const
ClientConnectionPtr& cnx, Result
auto name = getName();
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId),
requestId,
"CLOSE_CONSUMER")
- .addListener([name](Result result, const
ResponseData&) {
- if (result == ResultOk) {
+ .addListener([name](const Error& error, const
ResponseData&) {
+ if (error.result == ResultOk) {
LOG_INFO(name << "Closed consumer successfully
after subscribe completed");
} else {
- LOG_WARN(name << "Failed to close consumer: "
<< strResult(result));
+ LOG_WARN(name << "Failed to close consumer: "
<< strResult(error.result));
}
});
} else {
@@ -328,7 +327,7 @@ Result ConsumerImpl::handleCreateConsumer(const
ClientConnectionPtr& cnx, Result
LOG_WARN(getName()
<< "Client already closed when subscribe
completed, close the connection "
<< cnx->cnxString());
- cnx->close(ResultNotConnected);
+ cnx->close(Error{ResultNotConnected, ""});
}
return ResultAlreadyClosed;
}
@@ -423,7 +422,8 @@ void ConsumerImpl::unsubscribeAsync(const ResultCallback&
originalCallback) {
SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId);
auto self = get_shared_this_ptr();
cnx->sendRequestWithId(cmd, requestId, "UNSUBSCRIBE")
- .addListener([self, callback](Result result, const ResponseData&)
{ callback(result); });
+ .addListener(
+ [self, callback](const Error& error, const ResponseData&) {
callback(error.result); });
} else {
Result result = ResultNotConnected;
lock.unlock();
@@ -1420,7 +1420,7 @@ void ConsumerImpl::closeAsync(const ResultCallback&
originalCallback) {
auto requestId = newRequestId();
auto self = get_shared_this_ptr();
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId),
requestId, "CLOSE_CONSUMER")
- .addListener([self, callback](Result result, const ResponseData&) {
callback(result); });
+ .addListener([self, callback](const Error& error, const ResponseData&)
{ callback(error.result); });
}
const std::string& ConsumerImpl::getName() const { return consumerStr_; }
@@ -1828,12 +1828,13 @@ void ConsumerImpl::seekAsyncInternal(long requestId,
const SharedBuffer& seek, c
auto weakSelf = weak_from_this();
cnx->sendRequestWithId(seek, requestId, "SEEK")
- .addListener([this, weakSelf, previousLastSeekArg](Result result,
const ResponseData& responseData) {
+ .addListener([this, weakSelf, previousLastSeekArg](const Error& error,
+ const ResponseData&
responseData) {
auto self = weakSelf.lock();
if (!self) {
return;
}
- if (result == ResultOk) {
+ if (error.result == ResultOk) {
LockGuard lock(mutex_);
if (getCnx().expired() || reconnectionPending_) {
// Reconnection path: delay the seek callback until
connectionOpened. clearReceiveQueue()
@@ -1859,12 +1860,12 @@ void ConsumerImpl::seekAsyncInternal(long requestId,
const SharedBuffer& seek, c
seekStatus_ = SeekStatus::NOT_STARTED;
} // else: complete the seek future after connection is
established
} else {
- LOG_ERROR(getName() << "Failed to seek: " << result);
+ LOG_ERROR(getName() << "Failed to seek: " << error.result);
LockGuard lock{mutex_};
seekStatus_ = SeekStatus::NOT_STARTED;
lastSeekArg_ = previousLastSeekArg;
executor_->postWork([self,
callback{std::exchange(seekCallback_, std::nullopt).value()},
- result]() { callback(result); });
+ result{error.result}]() {
callback(result); });
}
});
}
@@ -1912,13 +1913,13 @@ void ConsumerImpl::processPossibleToDLQ(const
MessageId& messageId, const Proces
if (client) {
auto self = get_shared_this_ptr();
client->createProducerAsync(
- deadLetterPolicy_.getDeadLetterTopic(),
producerConfiguration,
- [self](Result res, const Producer& producer) {
- if (res == ResultOk) {
- self->deadLetterProducer_->setValue(producer);
+ deadLetterPolicy_.getDeadLetterTopic(),
producerConfiguration, [self](const auto& v) {
+ if (const auto* producer = std::get_if<Producer>(&v)) {
+ self->deadLetterProducer_->setValue(*producer);
} else {
LOG_ERROR("Dead letter producer create exception
with topic: "
- <<
self->deadLetterPolicy_.getDeadLetterTopic() << " ex: " << res);
+ <<
self->deadLetterPolicy_.getDeadLetterTopic()
+ << " ex: " << std::get<Error>(v).result);
self->deadLetterProducer_.reset();
}
});
@@ -2003,9 +2004,9 @@ void ConsumerImpl::doImmediateAck(const
ClientConnectionPtr& cnx, const MessageI
cnx->sendRequestWithId(
Commands::newAck(consumerId_, msgId.ledgerId(),
msgId.entryId(), ackSet, ackType, requestId),
requestId, "ACK")
- .addListener([callback](Result result, const ResponseData&) {
+ .addListener([callback](const Error& error, const ResponseData&) {
if (callback) {
- callback(result);
+ callback(error.result);
}
});
} else {
@@ -2034,9 +2035,9 @@ void ConsumerImpl::doImmediateAck(const
ClientConnectionPtr& cnx, const std::set
auto requestId = newRequestId();
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_,
ackMsgIds, requestId), requestId,
"ACK")
- .addListener([callback](Result result, const ResponseData&) {
+ .addListener([callback](const Error& error, const
ResponseData&) {
if (callback) {
- callback(result);
+ callback(error.result);
}
});
} else {
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 6f287aa..e263762 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -166,7 +166,7 @@ class ConsumerImpl : public ConsumerImplBase {
protected:
// overrided methods from HandlerBase
- Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx)
override;
+ Future<Error, bool> connectionOpened(const ClientConnectionPtr& cnx)
override;
void connectionFailed(Result result) override;
// impl methods from ConsumerImpl base
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index ffc0e3c..7e2cbb2 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -86,9 +86,8 @@ class ConsumerImplBase : public HandlerBase {
protected:
// overrided methods from HandlerBase
- Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx)
override {
- // Do not use bool, only Result.
- Promise<Result, bool> promise;
+ Future<Error, bool> connectionOpened(const ClientConnectionPtr& cnx)
override {
+ Promise<Error, bool> promise;
promise.setSuccess();
return promise.getFuture();
}
diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc
index 0be9713..08352eb 100644
--- a/lib/HTTPLookupService.cc
+++ b/lib/HTTPLookupService.cc
@@ -22,6 +22,8 @@
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
+#include <sstream>
+#include <stdexcept>
#include "CurlWrapper.h"
#include "ExecutorService.h"
@@ -79,7 +81,7 @@ auto HTTPLookupService::getBroker(const TopicName &topicName)
-> LookupResultFut
auto self = shared_from_this();
executorProvider_->get()->postWork([this, self, promise, completeUrl] {
std::string responseData;
- Result result = sendHTTPRequest(completeUrl, responseData);
+ Result result = sendHTTPRequest(completeUrl, responseData).result;
if (result != ResultOk) {
promise.setFailed(result);
@@ -93,7 +95,7 @@ auto HTTPLookupService::getBroker(const TopicName &topicName)
-> LookupResultFut
return promise.getFuture();
}
-Future<Result, LookupDataResultPtr>
HTTPLookupService::getPartitionMetadataAsync(
+Future<Error, LookupDataResultPtr>
HTTPLookupService::getPartitionMetadataAsync(
const TopicNamePtr &topicName) {
LookupPromise promise;
std::stringstream completeUrlStream;
@@ -148,9 +150,9 @@ Future<Result, NamespaceTopicsPtr>
HTTPLookupService::getTopicsOfNamespaceAsync(
return promise.getFuture();
}
-Future<Result, SchemaInfo> HTTPLookupService::getSchema(const TopicNamePtr
&topicName,
- const std::string
&version) {
- Promise<Result, SchemaInfo> promise;
+Future<Error, SchemaInfo> HTTPLookupService::getSchema(const TopicNamePtr
&topicName,
+ const std::string
&version) {
+ Promise<Error, SchemaInfo> promise;
std::stringstream completeUrlStream;
const auto &url = serviceNameResolver_.resolveHost();
@@ -166,7 +168,6 @@ Future<Result, SchemaInfo>
HTTPLookupService::getSchema(const TopicNamePtr &topi
if (!version.empty()) {
completeUrlStream << "/" << fromBigEndianBytes(version);
}
-
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleGetSchemaHTTPRequest,
shared_from_this(), promise,
completeUrlStream.str()));
return promise.getFuture();
@@ -175,7 +176,7 @@ Future<Result, SchemaInfo>
HTTPLookupService::getSchema(const TopicNamePtr &topi
void HTTPLookupService::handleNamespaceTopicsHTTPRequest(const
NamespaceTopicsPromise &promise,
const std::string
&completeUrl) {
std::string responseData;
- Result result = sendHTTPRequest(completeUrl, responseData);
+ Result result = sendHTTPRequest(completeUrl, responseData).result;
if (result != ResultOk) {
promise.setFailed(result);
@@ -184,25 +185,27 @@ void
HTTPLookupService::handleNamespaceTopicsHTTPRequest(const NamespaceTopicsPr
}
}
-Result HTTPLookupService::sendHTTPRequest(const std::string &completeUrl,
std::string &responseData) {
+Error HTTPLookupService::sendHTTPRequest(const std::string &completeUrl,
std::string &responseData) {
long responseCode = -1;
return sendHTTPRequest(completeUrl, responseData, responseCode);
}
-Result HTTPLookupService::sendHTTPRequest(const std::string &completeUrl,
std::string &responseData,
- long &responseCode) {
- // Authorization data
+Error HTTPLookupService::sendHTTPRequest(const std::string &completeUrl,
std::string &responseData,
+ long &responseCode) {
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
- LOG_ERROR("Failed to getAuthData: " << authResult);
- return authResult;
+ std::ostringstream message;
+ message << "Failed to getAuthData: " << authResult;
+ LOG_ERROR(message.str());
+ return Error{authResult, message.str()};
}
CurlWrapper curl;
if (!curl.init()) {
- LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
- return ResultLookupError;
+ const std::string message = "Unable to curl_easy_init for url " +
completeUrl;
+ LOG_ERROR(message);
+ return Error{ResultLookupError, message};
}
std::unique_ptr<CurlWrapper::TlsContext> tlsContext;
@@ -226,41 +229,39 @@ Result HTTPLookupService::sendHTTPRequest(const
std::string &completeUrl, std::s
options.userAgent = std::string("Pulsar-CPP-v") + PULSAR_VERSION_STR;
options.maxLookupRedirects = maxLookupRedirects_;
auto result = curl.get(completeUrl, authDataContent->getHttpHeaders(),
options, tlsContext.get());
- const auto &error = result.error;
- if (!error.empty()) {
- LOG_ERROR(completeUrl << " failed: " << error);
- return ResultConnectError;
- }
responseData = result.responseData;
responseCode = result.responseCode;
- auto res = result.code;
+
+ const auto res = result.code;
if (res == CURLE_OK) {
LOG_INFO("Response received for url " << completeUrl << " responseCode
" << responseCode);
- } else if (res == CURLE_TOO_MANY_REDIRECTS) {
- LOG_ERROR("Response received for url " << completeUrl << ": " <<
curl_easy_strerror(res)
- << ", curl error: " <<
result.serverError
- << ", redirect URL: " <<
result.redirectUrl);
+ return Error{};
+ }
+
+ std::ostringstream message;
+ if (res == CURLE_TOO_MANY_REDIRECTS) {
+ message << "Response received for url " << completeUrl << ": " <<
curl_easy_strerror(res)
+ << ", curl error: " << result.serverError << ", redirect URL:
" << result.redirectUrl;
} else {
- LOG_ERROR("Response failed for url " << completeUrl << ": " <<
curl_easy_strerror(res)
- << ", curl error: " <<
result.serverError);
+ message << "Response failed for url " << completeUrl << ": " <<
curl_easy_strerror(res)
+ << ", curl error: " << result.serverError;
}
+ LOG_ERROR(message.str());
switch (res) {
- case CURLE_OK:
- return ResultOk;
case CURLE_COULDNT_CONNECT:
- return ResultRetryable;
+ return Error{ResultRetryable, message.str()};
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP_RETURNED_ERROR:
- return ResultConnectError;
+ return Error{ResultConnectError, message.str()};
case CURLE_READ_ERROR:
- return ResultReadError;
+ return Error{ResultReadError, message.str()};
case CURLE_OPERATION_TIMEDOUT:
- return ResultTimeout;
+ return Error{ResultTimeout, message.str()};
default:
- return ResultLookupError;
+ return Error{ResultLookupError, message.str()};
}
}
@@ -353,10 +354,10 @@ NamespaceTopicsPtr
HTTPLookupService::parseNamespaceTopicsData(const std::string
void HTTPLookupService::handleLookupHTTPRequest(const LookupPromise &promise,
const std::string &completeUrl,
RequestType requestType) {
std::string responseData;
- Result result = sendHTTPRequest(completeUrl, responseData);
+ Error error = sendHTTPRequest(completeUrl, responseData);
- if (result != ResultOk) {
- promise.setFailed(result);
+ if (error.result != ResultOk) {
+ promise.setFailed(std::move(error));
} else {
promise.setValue((requestType == PartitionMetaData) ?
parsePartitionData(responseData)
:
parseLookupData(responseData));
@@ -367,12 +368,12 @@ void HTTPLookupService::handleGetSchemaHTTPRequest(const
GetSchemaPromise &promi
const std::string
&completeUrl) {
std::string responseData;
long responseCode = -1;
- Result result = sendHTTPRequest(completeUrl, responseData, responseCode);
+ Error error = sendHTTPRequest(completeUrl, responseData, responseCode);
if (responseCode == 404) {
- promise.setFailed(ResultTopicNotFound);
- } else if (result != ResultOk) {
- promise.setFailed(result);
+ promise.setFailed(Error{ResultTopicNotFound, ""});
+ } else if (error.result != ResultOk) {
+ promise.setFailed(std::move(error));
} else {
ptree::ptree root;
std::stringstream stream(responseData);
@@ -381,20 +382,24 @@ void HTTPLookupService::handleGetSchemaHTTPRequest(const
GetSchemaPromise &promi
} catch (ptree::json_parser_error &e) {
LOG_ERROR("Failed to parse json of Partition Metadata: " <<
e.what()
<<
"\nInput Json = " << responseData);
- promise.setFailed(ResultInvalidMessage);
+ promise.setFailed(Error{ResultInvalidMessage,
+ "Failed to parse json of Partition
Metadata: " + std::string(e.what()) +
+ "\nInput Json = " + responseData});
return;
}
const std::string defaultNotFoundString = "Not found";
auto schemaTypeStr = root.get<std::string>("type",
defaultNotFoundString);
if (schemaTypeStr == defaultNotFoundString) {
LOG_ERROR("malformed json! - type not present" << responseData);
- promise.setFailed(ResultInvalidMessage);
+ promise.setFailed(
+ Error{ResultInvalidMessage, "malformed json! - type not
present" + responseData});
return;
}
auto schemaData = root.get<std::string>("data", defaultNotFoundString);
if (schemaData == defaultNotFoundString) {
LOG_ERROR("malformed json! - data not present" << responseData);
- promise.setFailed(ResultInvalidMessage);
+ promise.setFailed(
+ Error{ResultInvalidMessage, "malformed json! - data not
present" + responseData});
return;
}
@@ -407,7 +412,9 @@ void HTTPLookupService::handleGetSchemaHTTPRequest(const
GetSchemaPromise &promi
} catch (ptree::json_parser_error &e) {
LOG_ERROR("Failed to parse json of Partition Metadata: " <<
e.what()
<<
"\nInput Json = " << schemaData);
- promise.setFailed(ResultInvalidMessage);
+ promise.setFailed(Error{ResultInvalidMessage, "Failed to parse
json of Partition Metadata: " +
+
std::string(e.what()) +
+ "\nInput
Json = " + schemaData});
return;
}
const auto keyData = toJson(kvRoot.get_child("key"));
diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h
index 61a0615..9a217fc 100644
--- a/lib/HTTPLookupService.h
+++ b/lib/HTTPLookupService.h
@@ -32,7 +32,7 @@ namespace pulsar {
class ServiceNameResolver;
using NamespaceTopicsPromise = Promise<Result, NamespaceTopicsPtr>;
using NamespaceTopicsPromisePtr = std::shared_ptr<NamespaceTopicsPromise>;
-using GetSchemaPromise = Promise<Result, SchemaInfo>;
+using GetSchemaPromise = Promise<Error, SchemaInfo>;
class HTTPLookupService : public LookupService, public
std::enable_shared_from_this<HTTPLookupService> {
enum RequestType : uint8_t
@@ -41,7 +41,7 @@ class HTTPLookupService : public LookupService, public
std::enable_shared_from_t
PartitionMetaData
};
- typedef Promise<Result, LookupDataResultPtr> LookupPromise;
+ typedef Promise<Error, LookupDataResultPtr> LookupPromise;
ExecutorServiceProviderPtr executorProvider_;
ServiceNameResolver serviceNameResolver_;
@@ -64,18 +64,18 @@ class HTTPLookupService : public LookupService, public
std::enable_shared_from_t
const std::string& completeUrl);
void handleGetSchemaHTTPRequest(const GetSchemaPromise& promise, const
std::string& completeUrl);
- Result sendHTTPRequest(const std::string& completeUrl, std::string&
responseData);
+ Error sendHTTPRequest(const std::string& completeUrl, std::string&
responseData);
- Result sendHTTPRequest(const std::string& completeUrl, std::string&
responseData, long& responseCode);
+ Error sendHTTPRequest(const std::string& completeUrl, std::string&
responseData, long& responseCode);
public:
HTTPLookupService(const ServiceInfo& serviceInfo, const
ClientConfiguration& config);
LookupResultFuture getBroker(const TopicName& topicName) override;
- Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr&) override;
+ Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr& topicName) override;
- Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const
std::string& version) override;
+ Future<Error, SchemaInfo> getSchema(const TopicNamePtr& topicName, const
std::string& version) override;
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode)
override;
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index bc8cd53..5bccc0b 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -123,16 +123,15 @@ void HandlerBase::grabCnx(const optional<std::string>&
assignedBrokerUrl) {
auto before = high_resolution_clock::now();
cnxFuture.addListener([this, self, before](Result result, const
ClientConnectionPtr& cnx) {
if (result == ResultOk) {
- connectionOpened(cnx).addListener([this, self, before](Result
result, bool) {
- // Do not use bool, only Result.
+ connectionOpened(cnx).addListener([this, self, before](const
Error& error, bool) {
reconnectionPending_ = false;
- if (result == ResultOk) {
+ if (error.result == ResultOk) {
connectionTimeMs_ =
duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
// Prevent the creationTimer_ from cancelling the timer_
in future
cancelTimer(*creationTimer_);
LOG_INFO("Finished connecting to broker after " <<
connectionTimeMs_ << " ms")
- } else if (isResultRetryable(result)) {
+ } else if (isResultRetryable(error.result)) {
scheduleReconnection();
}
});
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 229404e..1612415 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -93,7 +93,7 @@ class HandlerBase : public
std::enable_shared_from_this<HandlerBase> {
* @return ResultError if there was a failure. ResultRetryable if
reconnection is needed.
* @return Do not use bool, only Result.
*/
- virtual Future<Result, bool> connectionOpened(const ClientConnectionPtr&
connection) = 0;
+ virtual Future<Error, bool> connectionOpened(const ClientConnectionPtr&
connection) = 0;
virtual void connectionFailed(Result result) = 0;
diff --git a/lib/LookupDataResult.h b/lib/LookupDataResult.h
index 81e50cc..8f6e740 100644
--- a/lib/LookupDataResult.h
+++ b/lib/LookupDataResult.h
@@ -29,7 +29,7 @@
namespace pulsar {
class LookupDataResult;
typedef std::shared_ptr<LookupDataResult> LookupDataResultPtr;
-typedef Promise<Result, LookupDataResultPtr> LookupDataResultPromise;
+typedef Promise<Error, LookupDataResultPtr> LookupDataResultPromise;
typedef std::shared_ptr<LookupDataResultPromise> LookupDataResultPromisePtr;
class LookupDataResult {
diff --git a/lib/LookupService.h b/lib/LookupService.h
index 684984f..a18face 100644
--- a/lib/LookupService.h
+++ b/lib/LookupService.h
@@ -50,6 +50,7 @@ class LookupService {
<< ", physical address: " <<
lookupResult.physicalAddress;
}
};
+ // TODO: change it to Error
using LookupResultFuture = Future<Result, LookupResult>;
using LookupResultPromise = Promise<Result, LookupResult>;
@@ -67,7 +68,7 @@ class LookupService {
*
* Gets Partition metadata
*/
- virtual Future<Result, LookupDataResultPtr>
getPartitionMetadataAsync(const TopicNamePtr& topicName) = 0;
+ virtual Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr& topicName) = 0;
/**
* @param namespace - namespace-name
@@ -84,8 +85,8 @@ class LookupService {
* @param version the schema version byte array, if it's empty, use the
latest version
* @return SchemaInfo
*/
- virtual Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName,
- const std::string& version =
"") = 0;
+ virtual Future<Error, SchemaInfo> getSchema(const TopicNamePtr& topicName,
+ const std::string& version =
"") = 0;
virtual ServiceNameResolver& getServiceNameResolver() = 0;
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index a569978..5e1b517 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -188,11 +188,11 @@ Future<Result, Consumer>
MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
return topicPromise->getFuture();
}
client->getPartitionMetadataAsync(topicName).addListener(
- [this, topicName, topicPromise](Result result, const
LookupDataResultPtr& lookupDataResult) {
- if (result != ResultOk) {
+ [this, topicName, topicPromise](const auto& error, const
LookupDataResultPtr& lookupDataResult) {
+ if (error.result != ResultOk) {
LOG_ERROR("Error Checking/Getting Partition Metadata while
MultiTopics Subscribing- "
- << consumerStr_ << " result: " << result)
- topicPromise->setFailed(result);
+ << consumerStr_ << " result: " << error)
+ topicPromise->setFailed(error.result);
return;
}
subscribeTopicPartitions(lookupDataResult->getPartitions(),
topicName, subscriptionName_,
@@ -1010,11 +1010,12 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() {
return;
}
client->getPartitionMetadataAsync(topicName).addListener(
- [this, weakSelf, topicName, currentNumPartitions](Result result,
+ [this, weakSelf, topicName, currentNumPartitions](const auto&
error,
const
LookupDataResultPtr& lookupDataResult) {
auto self = weakSelf.lock();
if (self) {
- this->handleGetPartitions(topicName, result,
lookupDataResult, currentNumPartitions);
+ this->handleGetPartitions(topicName, error.result,
lookupDataResult,
+ currentNumPartitions);
}
});
}
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 1aa5c87..586a0ed 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -143,7 +143,7 @@ void PartitionedProducerImpl::start() {
}
void PartitionedProducerImpl::handleSinglePartitionProducerCreated(
- Result result, const ProducerImplBaseWeakPtr& producerWeakPtr, unsigned
int partitionIndex) {
+ const Error& error, const ProducerImplBaseWeakPtr& producerWeakPtr,
unsigned int partitionIndex) {
// to indicate, we are doing cleanup using closeAsync after producer create
// has failed and the invocation of closeAsync is not from client
const auto numPartitions = getNumPartitionsWithLock();
@@ -161,9 +161,10 @@ void
PartitionedProducerImpl::handleSinglePartitionProducerCreated(
return;
}
- if (result != ResultOk) {
- LOG_ERROR("Unable to create Producer for partition - " <<
partitionIndex << " Error - " << result);
- partitionedProducerCreatedPromise_.setFailed(result);
+ if (error.result != ResultOk) {
+ LOG_ERROR("Unable to create Producer for partition - " <<
partitionIndex << " Error - "
+ <<
error.result);
+ partitionedProducerCreatedPromise_.setFailed(error);
state_ = Failed;
if (++numProducersCreated_ == numPartitions) {
closeAsync(nullptr);
@@ -231,11 +232,11 @@ void PartitionedProducerImpl::sendAsync(const Message&
msg, SendCallback callbac
} else {
// Wrapping the callback into a lambda has overhead, so we check if
the producer is ready first
producer->getProducerCreatedFuture().addListener(
- [msg, callback](Result result, const ProducerImplBaseWeakPtr&
weakProducer) {
- if (result == ResultOk) {
+ [msg, callback](const Error& error, const ProducerImplBaseWeakPtr&
weakProducer) {
+ if (error.result == ResultOk) {
weakProducer.lock()->sendAsync(msg, callback);
} else if (callback) {
- callback(result, {});
+ callback(error.result, {});
}
});
}
@@ -251,7 +252,7 @@ void PartitionedProducerImpl::internalShutdown() {
if (client) {
client->cleanupProducer(this);
}
- partitionedProducerCreatedPromise_.setFailed(ResultAlreadyClosed);
+ partitionedProducerCreatedPromise_.setFailed(Error{ResultAlreadyClosed,
""});
state_ = Closed;
}
@@ -353,14 +354,14 @@ void
PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
// is set second time here, first time it was successful. So check
// if there's any adverse effect of setting it again. It should not
// be but must check. MUSTCHECK changeme
- partitionedProducerCreatedPromise_.setFailed(ResultUnknownError);
+ partitionedProducerCreatedPromise_.setFailed(Error{ResultUnknownError,
""});
callback(result);
return;
}
}
// override
-Future<Result, ProducerImplBaseWeakPtr>
PartitionedProducerImpl::getProducerCreatedFuture() {
+Future<Error, ProducerImplBaseWeakPtr>
PartitionedProducerImpl::getProducerCreatedFuture() {
return partitionedProducerCreatedPromise_.getFuture();
}
@@ -436,10 +437,10 @@ void PartitionedProducerImpl::getPartitionMetadata() {
return;
}
client->getPartitionMetadataAsync(topicName_)
- .addListener([weakSelf](Result result, const LookupDataResultPtr&
lookupDataResult) {
+ .addListener([weakSelf](const auto& error, const LookupDataResultPtr&
lookupDataResult) {
auto self = weakSelf.lock();
if (self) {
- self->handleGetPartitions(result, lookupDataResult);
+ self->handleGetPartitions(error.result, lookupDataResult);
}
});
}
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index 94ba717..dc06af0 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -78,12 +78,12 @@ class PartitionedProducerImpl : public ProducerImplBase,
void internalShutdown();
bool isClosed() override;
const std::string& getTopic() const override;
- Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture()
override;
+ Future<Error, ProducerImplBaseWeakPtr> getProducerCreatedFuture() override;
void triggerFlush() override;
void flushAsync(FlushCallback callback) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedProducer() override;
- void handleSinglePartitionProducerCreated(Result result,
+ void handleSinglePartitionProducerCreated(const Error& error,
const ProducerImplBaseWeakPtr&
producerBaseWeakPtr,
const unsigned int
partitionIndex);
void createLazyPartitionProducer(const unsigned int partitionIndex);
@@ -121,7 +121,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
std::atomic<State> state_{Pending};
// only set this promise to value, when producers on all partitions are
created.
- Promise<Result, ProducerImplBaseWeakPtr>
partitionedProducerCreatedPromise_;
+ Promise<Error, ProducerImplBaseWeakPtr> partitionedProducerCreatedPromise_;
std::unique_ptr<TopicMetadata> topicMetadata_;
diff --git a/lib/PendingRequest.h b/lib/PendingRequest.h
index 465073f..7014a02 100644
--- a/lib/PendingRequest.h
+++ b/lib/PendingRequest.h
@@ -43,7 +43,7 @@ class PendingRequest : public
std::enable_shared_from_this<PendingRequest<T>> {
return;
}
timeoutCallback_();
- promise_.setFailed(ResultTimeout);
+ promise_.setFailed(Error{ResultTimeout, ""});
});
}
@@ -52,8 +52,8 @@ class PendingRequest : public
std::enable_shared_from_this<PendingRequest<T>> {
cancelTimer(timer_);
}
- void fail(Result result) {
- promise_.setFailed(result);
+ void fail(const Error& error) {
+ promise_.setFailed(error);
cancelTimer(timer_);
}
@@ -65,7 +65,7 @@ class PendingRequest : public
std::enable_shared_from_this<PendingRequest<T>> {
private:
ASIO::steady_timer timer_;
- Promise<Result, T> promise_;
+ Promise<Error, T> promise_;
std::function<void()> timeoutCallback_;
std::atomic_bool timeoutDisabled_{false};
};
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 7632581..873796d 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -135,13 +135,12 @@ void
ProducerImpl::beforeConnectionChange(ClientConnection& connection) {
connection.removeProducer(producerId_);
}
-Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr&
cnx) {
- // Do not use bool, only Result.
- Promise<Result, bool> promise;
+Future<Error, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr&
cnx) {
+ Promise<Error, bool> promise;
if (state_ == Closed) {
LOG_DEBUG(getName() << "connectionOpened : Producer is already
closed");
- promise.setFailed(ResultAlreadyClosed);
+ promise.setFailed(Error{ResultAlreadyClosed, ""});
return promise.getFuture();
}
@@ -162,12 +161,12 @@ Future<Result, bool> ProducerImpl::connectionOpened(const
ClientConnectionPtr& c
auto self = shared_from_this();
setFirstRequestIdAfterConnect(requestId);
cnx->sendRequestWithId(cmd, requestId, "PRODUCER")
- .addListener([this, self, cnx, promise](Result result, const
ResponseData& responseData) {
- Result handleResult = handleCreateProducer(cnx, result,
responseData);
- if (handleResult == ResultOk) {
+ .addListener([this, self, cnx, promise](const Error& error, const
ResponseData& responseData) {
+ auto handledError = handleCreateProducer(cnx, error, responseData);
+ if (handledError.result == ResultOk) {
promise.setSuccess();
} else {
- promise.setFailed(handleResult);
+ promise.setFailed(handledError);
}
});
@@ -182,22 +181,23 @@ void ProducerImpl::connectionFailed(Result result) {
// if producers are lazy, then they should always try to restart
// so don't change the state and allow reconnections
return;
- } else if (!isResultRetryable(result) &&
producerCreatedPromise_.setFailed(result)) {
+ } else if (!isResultRetryable(result) &&
producerCreatedPromise_.setFailed(Error{result, ""})) {
state_ = Failed;
}
}
-Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx,
Result result,
- const ResponseData& responseData) {
+Error ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, const
Error& error,
+ const ResponseData& responseData) {
Result handleResult = ResultOk;
Lock lock(mutex_);
- LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " <<
strResult(result));
+ LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " <<
error);
// make sure we're still in the Pending/Ready state, closeAsync could have
been invoked
// while waiting for this response if using lazy producers
const auto state = state_.load();
+ const auto result = error.result;
if (state != Ready && state != Pending) {
LOG_DEBUG("Producer created response received but producer already
closed");
failPendingMessages(ResultAlreadyClosed, false);
@@ -211,9 +211,9 @@ Result ProducerImpl::handleCreateProducer(const
ClientConnectionPtr& cnx, Result
}
if (!producerCreatedPromise_.isComplete()) {
lock.unlock();
- producerCreatedPromise_.setFailed(ResultAlreadyClosed);
+ producerCreatedPromise_.setFailed(Error{ResultAlreadyClosed, ""});
}
- return ResultAlreadyClosed;
+ return Error{ResultAlreadyClosed, ""};
}
if (result == ResultOk) {
@@ -281,7 +281,7 @@ Result ProducerImpl::handleCreateProducer(const
ClientConnectionPtr& cnx, Result
client->cleanupProducer(this);
}
lock.unlock();
- producerCreatedPromise_.setFailed(result);
+ producerCreatedPromise_.setFailed(error);
handleResult = result;
} else if (producerCreatedPromise_.isComplete() ||
retryOnCreationError_) {
if (result == ResultProducerBlockedQuotaExceededException) {
@@ -292,24 +292,25 @@ Result ProducerImpl::handleCreateProducer(const
ClientConnectionPtr& cnx, Result
}
// Producer had already been initially created, we need to retry
connecting in any case
- LOG_WARN(getName() << "Failed to reconnect producer: " <<
strResult(result));
+ LOG_WARN(getName() << "Failed to reconnect producer: " << error);
handleResult = ResultRetryable;
} else {
// Producer was not yet created, retry to connect to broker if
it's possible
handleResult = convertToTimeoutIfNecessary(result,
creationTimestamp_);
+ Error convertedError{handleResult, error.message};
if (isResultRetryable(handleResult)) {
- LOG_WARN(getName() << "Temporary error in creating producer: "
<< strResult(handleResult));
+ LOG_WARN(getName() << "Temporary error in creating producer: "
<< convertedError);
} else {
- LOG_ERROR(getName() << "Failed to create producer: " <<
strResult(handleResult));
+ LOG_ERROR(getName() << "Failed to create producer: " << error);
failPendingMessages(handleResult, false);
state_ = Failed;
lock.unlock();
- producerCreatedPromise_.setFailed(handleResult);
+ producerCreatedPromise_.setFailed(convertedError);
}
}
}
- return handleResult;
+ return handleResult == error.result ? error : Error{handleResult,
error.message};
}
auto ProducerImpl::getPendingCallbacksWhenFailed() ->
decltype(pendingMessagesQueue_) {
@@ -825,10 +826,10 @@ void ProducerImpl::closeAsync(CloseCallback
originalCallback) {
int requestId = client->newRequestId();
auto self = shared_from_this();
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId),
requestId, "CLOSE_PRODUCER")
- .addListener([self, callback](Result result, const ResponseData&) {
callback(result); });
+ .addListener([self, callback](const Error& error, const ResponseData&)
{ callback(error.result); });
}
-Future<Result, ProducerImplBaseWeakPtr>
ProducerImpl::getProducerCreatedFuture() {
+Future<Error, ProducerImplBaseWeakPtr>
ProducerImpl::getProducerCreatedFuture() {
return producerCreatedPromise_.getFuture();
}
@@ -1022,7 +1023,7 @@ void ProducerImpl::internalShutdown() {
client->cleanupProducer(this);
}
cancelTimers();
- producerCreatedPromise_.setFailed(ResultAlreadyClosed);
+ producerCreatedPromise_.setFailed(Error{ResultAlreadyClosed, ""});
state_ = Closed;
}
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 26207f8..e750253 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -87,7 +87,7 @@ class ProducerImpl : public HandlerBase, public
ProducerImplBase {
void internalShutdown();
bool isClosed() override;
const std::string& getTopic() const override;
- Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture()
override;
+ Future<Error, ProducerImplBaseWeakPtr> getProducerCreatedFuture() override;
void triggerFlush() override;
void flushAsync(FlushCallback callback) override;
bool isConnected() const override;
@@ -133,15 +133,15 @@ class ProducerImpl : public HandlerBase, public
ProducerImplBase {
// overrided methods from HandlerBase
void beforeConnectionChange(ClientConnection& connection) override;
- Future<Result, bool> connectionOpened(const ClientConnectionPtr&
connection) override;
+ Future<Error, bool> connectionOpened(const ClientConnectionPtr&
connection) override;
void connectionFailed(Result result) override;
const std::string& getName() const override { return producerStr_; }
private:
void printStats();
- Result handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
- const ResponseData& responseData);
+ Error handleCreateProducer(const ClientConnectionPtr& cnx, const Error&
error,
+ const ResponseData& responseData);
void resendMessages(const ClientConnectionPtr& cnx);
@@ -195,7 +195,7 @@ class ProducerImpl : public HandlerBase, public
ProducerImplBase {
using DurationType = TimeDuration;
void asyncWaitSendTimeout(DurationType expiryTime);
- Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
+ Promise<Error, ProducerImplBaseWeakPtr> producerCreatedPromise_;
struct PendingCallbacks;
decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailed();
diff --git a/lib/ProducerImplBase.h b/lib/ProducerImplBase.h
index 25a12c8..72607e2 100644
--- a/lib/ProducerImplBase.h
+++ b/lib/ProducerImplBase.h
@@ -44,7 +44,7 @@ class ProducerImplBase {
virtual bool isClosed() = 0;
virtual void shutdown() = 0;
virtual const std::string& getTopic() const = 0;
- virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture()
= 0;
+ virtual Future<Error, ProducerImplBaseWeakPtr> getProducerCreatedFuture()
= 0;
virtual void triggerFlush() = 0;
virtual void flushAsync(FlushCallback callback) = 0;
virtual bool isConnected() const = 0;
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index 7f50cf1..3066d48 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -53,11 +53,12 @@ class RetryableLookupService : public LookupService {
}
LookupResultFuture getBroker(const TopicName& topicName) override {
- return lookupCache_->run("get-broker-" + topicName.toString(),
- [this, topicName] { return
lookupService_->getBroker(topicName); });
+ return toResultFuture(lookupCache_->run("get-broker-" +
topicName.toString(), [this, topicName] {
+ return toErrorFuture(lookupService_->getBroker(topicName));
+ }));
}
- Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr& topicName) override {
+ Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr& topicName) override {
return partitionLookupCache_->run(
"get-partition-metadata-" + topicName->toString(),
[this, topicName] { return
lookupService_->getPartitionMetadataAsync(topicName); });
@@ -65,13 +66,15 @@ class RetryableLookupService : public LookupService {
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode)
override {
- return namespaceLookupCache_->run(
+ return toResultFuture(namespaceLookupCache_->run(
"get-topics-of-namespace-" + nsName->toString() + "-" +
std::to_string(mode),
- [this, nsName, mode] { return
lookupService_->getTopicsOfNamespaceAsync(nsName, mode); });
+ [this, nsName, mode] {
+ return
toErrorFuture(lookupService_->getTopicsOfNamespaceAsync(nsName, mode));
+ }));
}
- Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const
std::string& version) override {
- return getSchemaCache_->run("get-schema" + topicName->toString(),
[this, topicName, version] {
+ Future<Error, SchemaInfo> getSchema(const TopicNamePtr& topicName, const
std::string& version) override {
+ return getSchemaCache_->run(getSchemaCacheKey(topicName, version),
[this, topicName, version] {
return lookupService_->getSchema(topicName, version);
});
}
@@ -81,6 +84,36 @@ class RetryableLookupService : public LookupService {
}
private:
+ template <typename T>
+ static Future<Error, T> toErrorFuture(Future<Result, T> future) {
+ Promise<Error, T> promise;
+ future.addListener([promise](Result result, const T& value) {
+ if (result == ResultOk) {
+ promise.setValue(value);
+ } else {
+ promise.setFailed({result, ""});
+ }
+ });
+ return promise.getFuture();
+ }
+
+ template <typename T>
+ static Future<Result, T> toResultFuture(Future<Error, T> future) {
+ Promise<Result, T> promise;
+ future.addListener([promise](const Error& error, const T& value) {
+ if (error.result == ResultOk) {
+ promise.setValue(value);
+ } else {
+ promise.setFailed(error.result);
+ }
+ });
+ return promise.getFuture();
+ }
+
+ static std::string getSchemaCacheKey(const TopicNamePtr& topicName, const
std::string& version) {
+ return "get-schema-" + topicName->toString() + "-" + version;
+ }
+
const std::shared_ptr<LookupService> lookupService_;
RetryableOperationCachePtr<LookupResult> lookupCache_;
RetryableOperationCachePtr<LookupDataResultPtr> partitionLookupCache_;
diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h
index f4b056e..2826390 100644
--- a/lib/RetryableOperation.h
+++ b/lib/RetryableOperation.h
@@ -41,7 +41,7 @@ class RetryableOperation : public
std::enable_shared_from_this<RetryableOperatio
explicit PassKey() {}
};
- RetryableOperation(const std::string& name, std::function<Future<Result,
T>()>&& func,
+ RetryableOperation(const std::string& name, std::function<Future<Error,
T>()>&& func,
TimeDuration timeout, DeadlineTimerPtr timer)
: name_(name),
func_(std::move(func)),
@@ -58,7 +58,7 @@ class RetryableOperation : public
std::enable_shared_from_this<RetryableOperatio
return std::make_shared<RetryableOperation<T>>(PassKey{},
std::forward<Args>(args)...);
}
- Future<Result, T> run() {
+ Future<Error, T> run() {
bool expected = false;
if (!started_.compare_exchange_strong(expected, true)) {
return promise_.getFuture();
@@ -67,16 +67,16 @@ class RetryableOperation : public
std::enable_shared_from_this<RetryableOperatio
}
void cancel() {
- promise_.setFailed(ResultDisconnected);
+ promise_.setFailed({ResultDisconnected, ""});
cancelTimer(*timer_);
}
private:
const std::string name_;
- std::function<Future<Result, T>()> func_;
+ std::function<Future<Error, T>()> func_;
const TimeDuration timeout_;
Backoff backoff_;
- Promise<Result, T> promise_;
+ Promise<Error, T> promise_;
std::atomic_bool started_{false};
DeadlineTimerPtr timer_;
@@ -84,24 +84,24 @@ class RetryableOperation : public
std::enable_shared_from_this<RetryableOperatio
#ifdef __GNUC__
__attribute__((visibility("hidden")))
#endif
- Future<Result, T>
+ Future<Error, T>
runImpl(TimeDuration remainingTime) {
std::weak_ptr<RetryableOperation<T>>
weakSelf{this->shared_from_this()};
- func_().addListener([this, weakSelf, remainingTime](Result result,
const T& value) {
+ func_().addListener([this, weakSelf, remainingTime](const Error&
error, const T& value) {
auto self = weakSelf.lock();
if (!self) {
return;
}
- if (result == ResultOk) {
+ if (error.result == ResultOk) {
promise_.setValue(value);
return;
}
- if (!isResultRetryable(result)) {
- promise_.setFailed(result);
+ if (!isResultRetryable(error.result)) {
+ promise_.setFailed(error);
return;
}
if (toMillis(remainingTime) <= 0) {
- promise_.setFailed(ResultTimeout);
+ promise_.setFailed({ResultTimeout, ""});
return;
}
@@ -119,7 +119,7 @@ class RetryableOperation : public
std::enable_shared_from_this<RetryableOperatio
if (ec) {
if (ec == ASIO::error::operation_aborted) {
LOG_DEBUG("Timer for " << name_ << " is cancelled");
- promise_.setFailed(ResultTimeout);
+ promise_.setFailed({ResultTimeout, ""});
} else {
LOG_WARN("Timer for " << name_ << " failed: " <<
ec.message());
}
diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h
index fa4c8fc..d3d1870 100644
--- a/lib/RetryableOperationCache.h
+++ b/lib/RetryableOperationCache.h
@@ -56,11 +56,11 @@ class RetryableOperationCache : public
std::enable_shared_from_this<RetryableOpe
return std::make_shared<Self>(PassKey{}, std::forward<Args>(args)...);
}
- Future<Result, T> run(const std::string& key, std::function<Future<Result,
T>()>&& func) {
+ Future<Error, T> run(const std::string& key, std::function<Future<Error,
T>()>&& func) {
std::unique_lock<std::mutex> lock{mutex_};
if (closed_) {
- Promise<Result, T> promise;
- promise.setFailed(ResultAlreadyClosed);
+ Promise<Error, T> promise;
+ promise.setFailed({ResultAlreadyClosed, ""});
return promise.getFuture();
}
auto it = operations_.find(key);
@@ -70,8 +70,8 @@ class RetryableOperationCache : public
std::enable_shared_from_this<RetryableOpe
timer = executorProvider_->get()->createDeadlineTimer();
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to retry lookup for " << key << ": " <<
e.what());
- Promise<Result, T> promise;
- promise.setFailed(ResultConnectError);
+ Promise<Error, T> promise;
+ promise.setFailed({ResultConnectError, e.what()});
return promise.getFuture();
}
@@ -81,7 +81,7 @@ class RetryableOperationCache : public
std::enable_shared_from_this<RetryableOpe
lock.unlock();
auto weakSelf = this->weak_from_this();
- future.addListener([this, weakSelf, key, operation](Result, const
T&) {
+ future.addListener([this, weakSelf, key, operation](const Error&,
const T&) {
auto self = weakSelf.lock();
if (!self) {
return;
diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc
index 4749683..d38e62c 100644
--- a/tests/AuthTokenTest.cc
+++ b/tests/AuthTokenTest.cc
@@ -26,7 +26,9 @@
#include <fstream>
#include <streambuf>
#include <string>
+#include <variant>
+#include "VariantHelper.h"
#include "lib/Future.h"
#include "lib/LogUtils.h"
#include "lib/Utils.h"
@@ -184,6 +186,13 @@ TEST(AuthPluginToken, testNoAuth) {
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultAuthorizationError, result);
+ std::visit(overloaded{[](Error&& error) {
+ ASSERT_EQ(ResultAuthorizationError,
error.result);
+ ASSERT_EQ("Client is not authorized to Get
Partition Metadata", error.message);
+ },
+ [](auto&&) { FAIL(); }},
+ client.createProducerV2(topicName, {}));
+
Consumer consumer;
result = client.subscribe(topicName, subName, consumer);
ASSERT_EQ(ResultAuthorizationError, result);
@@ -200,6 +209,14 @@ TEST(AuthPluginToken, testNoAuthWithHttp) {
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultConnectError, result);
+ std::visit(overloaded{[](Error&& error) {
+ ASSERT_EQ(ResultConnectError, error.result);
+ ASSERT_TRUE(error.message.find("The requested
URL returned error: 401") !=
+ std::string::npos);
+ },
+ [](auto&&) { FAIL(); }},
+ client.createProducerV2(topicName, {}));
+
Consumer consumer;
result = client.subscribe(topicName, subName, consumer);
ASSERT_EQ(ResultConnectError, result);
@@ -212,5 +229,16 @@ TEST(AuthPluginToken, testTokenSupplierException) {
Client client(serviceUrl, config);
Producer producer;
ASSERT_EQ(ResultAuthenticationError, client.createProducer("topic",
producer));
+
+ std::visit(
+ overloaded{[](Error&& error) {
+ ASSERT_EQ(ResultAuthenticationError, error.result);
+ ASSERT_TRUE(error.message.find("failed to generate
token") != std::string::npos);
+ },
+ [](auto&&) { FAIL(); }},
+ client.createProducerV2("topic", {}));
+
ASSERT_EQ(ResultOk, client.close());
+
+ client.close();
}
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index 63ac9d1..e83db4e 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -303,11 +303,11 @@ TEST(ClientTest,
testTimedOutPendingRequestsAreErasedFromConnectionMaps) {
"persistent://public/default/testTimedOutPendingRequests-" + suffix,
"", requestIdGenerator++);
ResponseData responseData;
- ASSERT_EQ(ResultTimeout, pingFuture.get(responseData));
+ ASSERT_EQ(ResultTimeout, pingFuture.get(responseData).result);
ASSERT_EQ(0u, PulsarFriend::getPendingRequests(*connection));
LookupDataResultPtr lookupData;
- ASSERT_EQ(ResultTimeout, lookupFuture.get(lookupData));
+ ASSERT_EQ(ResultTimeout, lookupFuture.get(lookupData).result);
ASSERT_EQ(0u, PulsarFriend::getPendingLookupRequests(*connection));
ASSERT_EQ(0u, PulsarFriend::getNumOfPendingLookupRequests(*connection));
@@ -320,11 +320,11 @@ TEST(ClientTest,
testTimedOutPendingRequestsAreErasedFromConnectionMaps) {
ASSERT_EQ(0u,
PulsarFriend::getPendingGetTopicsOfNamespaceRequests(*connection));
SchemaInfo schemaInfo;
- ASSERT_EQ(ResultTimeout, getSchemaFuture.get(schemaInfo));
+ ASSERT_EQ(ResultTimeout, getSchemaFuture.get(schemaInfo).result);
ASSERT_EQ(0u, PulsarFriend::getPendingGetSchemaRequests(*connection));
mockServer->close();
- connection->close(ResultDisconnected).wait();
+ connection->close(Error{ResultDisconnected, ""}).wait();
executorProvider->close();
}
diff --git a/tests/ConnectionTest.cc b/tests/ConnectionTest.cc
index e0d063e..dd4641b 100644
--- a/tests/ConnectionTest.cc
+++ b/tests/ConnectionTest.cc
@@ -24,7 +24,7 @@ using namespace pulsar;
class MockClientConnection {
public:
- MOCK_METHOD(void, close, (Result));
+ MOCK_METHOD(void, close, (Error));
void checkServerError(ServerError error, const std::string& message) {
::pulsar::adaptor::checkServerError(*this, error, message);
@@ -41,7 +41,7 @@ static const std::vector<std::string> retryableErrorMessages{
TEST(ConnectionTest, testCheckServerError) {
MockClientConnection conn;
- EXPECT_CALL(conn, close(ResultDisconnected)).Times(0);
+ EXPECT_CALL(conn, close(::testing::_)).Times(0);
for (auto&& msg : retryableErrorMessages) {
conn.checkServerError(pulsar::proto::ServiceNotReady, msg);
}
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index 53cb76e..e98d972 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -91,7 +91,7 @@ TEST(LookupServiceTest, basicLookup) {
TopicNamePtr topicName = TopicName::get("topic");
- Future<Result, LookupDataResultPtr> partitionFuture =
lookupService.getPartitionMetadataAsync(topicName);
+ Future<Error, LookupDataResultPtr> partitionFuture =
lookupService.getPartitionMetadataAsync(topicName);
LookupDataResultPtr lookupData;
partitionFuture.get(lookupData);
ASSERT_TRUE(lookupData != NULL);
@@ -129,9 +129,9 @@ static void testMultiAddresses(LookupService&
lookupService) {
results.clear();
for (int i = 0; i < numRequests; i++) {
LookupDataResultPtr data;
- const auto result =
lookupService.getPartitionMetadataAsync(TopicName::get("topic")).get(data);
- LOG_INFO("getPartitionMetadataAsync [" << i << "] " << result);
- results.emplace_back(result);
+ const auto error =
lookupService.getPartitionMetadataAsync(TopicName::get("topic")).get(data);
+ LOG_INFO("getPartitionMetadataAsync [" << i << "] " << error.result);
+ results.emplace_back(error.result);
}
verifySuccessCount();
@@ -186,7 +186,7 @@ TEST(LookupServiceTest, testRetry) {
PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr);
LookupDataResultPtr lookupDataResultPtr;
- ASSERT_EQ(ResultOk, future2.get(lookupDataResultPtr));
+ ASSERT_EQ(ResultOk, future2.get(lookupDataResultPtr).result);
LOG_INFO("getPartitionMetadataAsync returns " <<
lookupDataResultPtr->getPartitions() << " partitions");
PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
@@ -234,7 +234,7 @@ TEST(LookupServiceTest, testTimeout) {
beforeMethod();
auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr);
LookupDataResultPtr lookupDataResultPtr;
- ASSERT_EQ(ResultTimeout, future2.get(lookupDataResultPtr));
+ ASSERT_EQ(ResultTimeout, future2.get(lookupDataResultPtr).result);
afterMethod("getPartitionMetadataAsync");
beforeMethod();
@@ -307,7 +307,7 @@ TEST_P(LookupServiceTest, testGetSchema) {
SchemaInfo schemaInfo;
auto future =
PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic));
- ASSERT_EQ(ResultOk, future.get(schemaInfo));
+ ASSERT_EQ(ResultOk, future.get(schemaInfo).result);
ASSERT_EQ(jsonSchema, schemaInfo.getSchema());
ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType());
ASSERT_EQ(properties, schemaInfo.getProperties());
@@ -322,7 +322,7 @@ TEST_P(LookupServiceTest, testGetSchemaNotFound) {
SchemaInfo schemaInfo;
auto future =
PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic));
- ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo));
+ ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo).result);
}
TEST_P(LookupServiceTest, testGetKeyValueSchema) {
@@ -344,7 +344,7 @@ TEST_P(LookupServiceTest, testGetKeyValueSchema) {
SchemaInfo schemaInfo;
auto future =
PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic));
- ASSERT_EQ(ResultOk, future.get(schemaInfo));
+ ASSERT_EQ(ResultOk, future.get(schemaInfo).result);
ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema());
ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType());
ASSERT_FALSE(schemaInfo.getProperties().empty());
@@ -510,13 +510,13 @@ class MockLookupService : public BinaryProtoLookupService
{
public:
using BinaryProtoLookupService::BinaryProtoLookupService;
- Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr& topicName) override {
+ Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const
TopicNamePtr& topicName) override {
bool expected = true;
if (firstTime_.compare_exchange_strong(expected, false)) {
// Trigger the retry
LOG_INFO("Fail the lookup for " << topicName->toString() << "
intentionally");
- Promise<Result, LookupDataResultPtr> promise;
- promise.setFailed(ResultRetryable);
+ Promise<Error, LookupDataResultPtr> promise;
+ promise.setFailed(Error{ResultRetryable, ""});
return promise.getFuture();
}
return BinaryProtoLookupService::getPartitionMetadataAsync(topicName);
@@ -564,7 +564,7 @@ TEST(LookupServiceTest, testRetryAfterDestroyed) {
lookupService->close();
Result result = ResultOk;
lookupService->getPartitionMetadataAsync(TopicName::get("lookup-service-test-retry-after-destroyed"))
- .addListener([&result](Result innerResult, const LookupDataResultPtr&)
{ result = innerResult; });
+ .addListener([&result](const auto& error, const LookupDataResultPtr&)
{ result = error.result; });
EXPECT_EQ(ResultAlreadyClosed, result);
pool.close();
executorProvider->close();
diff --git a/tests/MockClientImpl.h b/tests/MockClientImpl.h
index 074808f..7fa902b 100644
--- a/tests/MockClientImpl.h
+++ b/tests/MockClientImpl.h
@@ -24,6 +24,7 @@
#include <chrono>
#include <future>
#include <memory>
+#include <variant>
#include "lib/ClientImpl.h"
@@ -46,9 +47,13 @@ class MockClientImpl : public ClientImpl {
using namespace std::chrono;
auto start = high_resolution_clock::now();
auto promise = createPromise();
- createProducerAsync(topic, {}, [&start, promise](Result result,
Producer) {
+ createProducerAsync(topic, {}, [&start, promise](const auto& v) {
auto timeMs =
duration_cast<milliseconds>(high_resolution_clock::now() - start).count();
- promise->set_value({result, timeMs});
+ if (const auto* error = std::get_if<Error>(&v)) {
+ promise->set_value({error->result, timeMs});
+ } else {
+ promise->set_value({ResultOk, timeMs});
+ }
});
return wait(promise);
}
diff --git a/tests/MultiTopicsConsumerTest.cc b/tests/MultiTopicsConsumerTest.cc
index 57407fb..07e0c50 100644
--- a/tests/MultiTopicsConsumerTest.cc
+++ b/tests/MultiTopicsConsumerTest.cc
@@ -167,7 +167,7 @@ TEST(MultiTopicsConsumerTest, testGetConsumerStatsFail) {
return PulsarFriend::getPendingConsumerStatsRequests(*connection) ==
expectedRequests;
}));
- connection->close(ResultDisconnected);
+ connection->close(Error{ResultDisconnected, ""});
ASSERT_EQ(ResultDisconnected, future.get());
mockServer->close();
diff --git a/tests/RetryableOperationCacheTest.cc
b/tests/RetryableOperationCacheTest.cc
index 2daaf3f..a1bb285 100644
--- a/tests/RetryableOperationCacheTest.cc
+++ b/tests/RetryableOperationCacheTest.cc
@@ -27,13 +27,13 @@
namespace pulsar {
-using IntFuture = Future<Result, int>;
+using IntFuture = Future<Error, int>;
static int wait(IntFuture future) {
int value;
- auto result = future.get(value);
- if (result != ResultOk) {
- throw std::runtime_error(strResult(result));
+ auto error = future.get(value);
+ if (error.result != ResultOk) {
+ throw std::runtime_error(strResult(error.result));
}
return value;
}
@@ -50,9 +50,9 @@ class CountdownFunc {
: result_(rhs.result_), totalRetryCount_(rhs.totalRetryCount_),
current_(rhs.current_.load()) {}
IntFuture operator()() {
- Promise<Result, int> promise;
+ Promise<Error, int> promise;
if (++current_ < totalRetryCount_) {
- promise.setFailed(ResultRetryable);
+ promise.setFailed({ResultRetryable, ""});
} else {
promise.setValue(result_);
}
@@ -141,7 +141,7 @@ TEST_F(RetryableOperationCacheTest, testClose) {
for (auto&& future : futures_) {
int value;
// All cancelled futures complete with ResultDisconnected and the
default int value
- ASSERT_EQ(ResultDisconnected, future.get(value));
+ ASSERT_EQ(ResultDisconnected, future.get(value).result);
ASSERT_EQ(value, 0);
}
ASSERT_EQ(getSize(*cache), 0);
diff --git a/tests/SchemaTest.cc b/tests/SchemaTest.cc
index 1fad081..8b17514 100644
--- a/tests/SchemaTest.cc
+++ b/tests/SchemaTest.cc
@@ -172,10 +172,18 @@ TEST(SchemaTest, testAutoDownloadSchema) {
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
- Promise<Result, Producer> promise;
- clientImplPtr->createProducerAsync(topic, producerConfiguration,
WaitForCallbackValue<Producer>(promise),
- true);
- ASSERT_EQ(ResultOk, promise.getFuture().get(producer));
+ Promise<Error, Producer> promise;
+ clientImplPtr->createProducerAsync(
+ topic, producerConfiguration,
+ [promise](const auto& v) {
+ if (const auto* error = std::get_if<Error>(&v)) {
+ promise.setFailed(*error);
+ } else {
+ promise.setValue(std::get<Producer>(v));
+ }
+ },
+ true);
+ ASSERT_EQ(ResultOk, promise.getFuture().get(producer).result);
Message msg = MessageBuilder().setContent("content").build();
ASSERT_EQ(ResultOk, producer.send(msg));
diff --git a/tests/VariantHelper.h b/tests/VariantHelper.h
new file mode 100644
index 0000000..4959e0a
--- /dev/null
+++ b/tests/VariantHelper.h
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+// See https://en.cppreference.com/cpp/utility/variant/visit2
+template <typename... Ts>
+struct overloaded : Ts... {
+ using Ts::operator()...;
+};
+// explicit deduction guide (not needed as of C++20)
+template <typename... Ts>
+overloaded(Ts...) -> overloaded<Ts...>;