This is an automated email from the ASF dual-hosted git repository.

BewareMyPower pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ca2eac  feat: introduce a v2 createProducer API to carry error 
message when fail (#579)
2ca2eac is described below

commit 2ca2eacbb781280316249dc166212de6610d3b87
Author: Yunze Xu <[email protected]>
AuthorDate: Wed May 27 12:10:37 2026 +0800

    feat: introduce a v2 createProducer API to carry error message when fail 
(#579)
---
 include/pulsar/Client.h              |  20 +++-
 include/pulsar/Result.h              |  16 +++
 lib/BinaryProtoLookupService.cc      |  72 +++++++------
 lib/BinaryProtoLookupService.h       |  14 +--
 lib/Client.cc                        |  37 ++++++-
 lib/ClientConnection.cc              | 193 ++++++++++++++++++++---------------
 lib/ClientConnection.h               |  15 +--
 lib/ClientConnectionAdaptor.h        |   4 +-
 lib/ClientImpl.cc                    | 121 ++++++++++++----------
 lib/ClientImpl.h                     |  11 +-
 lib/ConnectionPool.cc                |  26 ++---
 lib/ConnectionPool.h                 |  13 +--
 lib/ConsumerImpl.cc                  |  53 +++++-----
 lib/ConsumerImpl.h                   |   2 +-
 lib/ConsumerImplBase.h               |   5 +-
 lib/HTTPLookupService.cc             |  97 ++++++++++--------
 lib/HTTPLookupService.h              |  12 +--
 lib/HandlerBase.cc                   |   7 +-
 lib/HandlerBase.h                    |   2 +-
 lib/LookupDataResult.h               |   2 +-
 lib/LookupService.h                  |   7 +-
 lib/MultiTopicsConsumerImpl.cc       |  13 +--
 lib/PartitionedProducerImpl.cc       |  25 ++---
 lib/PartitionedProducerImpl.h        |   6 +-
 lib/PendingRequest.h                 |   8 +-
 lib/ProducerImpl.cc                  |  47 ++++-----
 lib/ProducerImpl.h                   |  10 +-
 lib/ProducerImplBase.h               |   2 +-
 lib/RetryableLookupService.h         |  47 +++++++--
 lib/RetryableOperation.h             |  24 ++---
 lib/RetryableOperationCache.h        |  12 +--
 tests/AuthTokenTest.cc               |  28 +++++
 tests/ClientTest.cc                  |   8 +-
 tests/ConnectionTest.cc              |   4 +-
 tests/LookupServiceTest.cc           |  26 ++---
 tests/MockClientImpl.h               |   9 +-
 tests/MultiTopicsConsumerTest.cc     |   2 +-
 tests/RetryableOperationCacheTest.cc |  14 +--
 tests/SchemaTest.cc                  |  16 ++-
 tests/VariantHelper.h                |  28 +++++
 40 files changed, 633 insertions(+), 425 deletions(-)

diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h
index e9813e3..4114075 100644
--- a/include/pulsar/Client.h
+++ b/include/pulsar/Client.h
@@ -36,6 +36,7 @@
 
 #include <memory>
 #include <string>
+#include <variant>
 
 namespace pulsar {
 typedef std::function<void(Result, Producer)> CreateProducerCallback;
@@ -45,6 +46,8 @@ typedef std::function<void(Result, TableView)> 
TableViewCallback;
 typedef std::function<void(Result, const std::vector<std::string>&)> 
GetPartitionsCallback;
 typedef std::function<void(Result)> CloseCallback;
 
+using CreateProducerV2Callback = std::function<void(std::variant<Error, 
Producer>)>;
+
 class ClientImpl;
 class PulsarFriend;
 class PulsarWrapper;
@@ -108,7 +111,9 @@ class PULSAR_PUBLIC Client {
      * @return ResultOk if the producer has been successfully created
      * @return ResultError if there was an error
      */
-    Result createProducer(const std::string& topic, const 
ProducerConfiguration& conf, Producer& producer);
+    [[deprecated("use createProducerV2")]] Result createProducer(const 
std::string& topic,
+                                                                 const 
ProducerConfiguration& conf,
+                                                                 Producer& 
producer);
 
     /**
      * Asynchronously create a producer with the default ProducerConfiguration 
for publishing on a specific
@@ -118,7 +123,8 @@ class PULSAR_PUBLIC Client {
      * @param callback the callback that is triggered when the producer is 
created successfully or not
      * @param callback Callback function that is invoked when the operation is 
completed
      */
-    void createProducerAsync(const std::string& topic, const 
CreateProducerCallback& callback);
+    [[deprecated("use createProducerAsyncV2")]] void createProducerAsync(
+        const std::string& topic, const CreateProducerCallback& callback);
 
     /**
      * Asynchronously create a producer with the customized 
ProducerConfiguration for publishing on a specific
@@ -127,8 +133,14 @@ class PULSAR_PUBLIC Client {
      * @param topic the name of the topic where to produce
      * @param conf the customized ProducerConfiguration
      */
-    void createProducerAsync(const std::string& topic, const 
ProducerConfiguration& conf,
-                             const CreateProducerCallback& callback);
+    [[deprecated("use createProducerAsyncV2")]] void createProducerAsync(
+        const std::string& topic, const ProducerConfiguration& conf, const 
CreateProducerCallback& callback);
+
+    void createProducerAsyncV2(const std::string& topic, const 
ProducerConfiguration& conf,
+                               CreateProducerV2Callback callback);
+
+    std::variant<Error, Producer> createProducerV2(const std::string& topic,
+                                                   const 
ProducerConfiguration& conf);
 
     /**
      * Subscribe to a given topic and subscription combination with the 
default ConsumerConfiguration
diff --git a/include/pulsar/Result.h b/include/pulsar/Result.h
index a6c30d4..b94c08c 100644
--- a/include/pulsar/Result.h
+++ b/include/pulsar/Result.h
@@ -23,6 +23,8 @@
 
 #include <cstdint>
 #include <iosfwd>
+#include <ostream>
+#include <string>
 
 namespace pulsar {
 
@@ -101,6 +103,20 @@ enum Result : int8_t
 PULSAR_PUBLIC const char* strResult(Result result);
 
 PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, pulsar::Result result);
+
+struct PULSAR_PUBLIC Error {
+    Result result;
+    std::string message;
+};
+
+inline std::ostream& operator<<(std::ostream& os, const Error& error) {
+    os << error.result;
+    if (!error.message.empty()) {
+        os << " " << error.message;
+    }
+    return os;
+}
+
 }  // namespace pulsar
 
 #endif /* ERROR_HPP_ */
diff --git a/lib/BinaryProtoLookupService.cc b/lib/BinaryProtoLookupService.cc
index a110e96..7712e76 100644
--- a/lib/BinaryProtoLookupService.cc
+++ b/lib/BinaryProtoLookupService.cc
@@ -47,10 +47,10 @@ auto BinaryProtoLookupService::findBroker(const 
std::string& address, bool autho
 
     // NOTE: we can use move capture for topic since C++14
     cnxPool_.getConnectionAsync(address).addListener([this, promise, topic, 
address, authoritative,
-                                                      redirectCount](Result 
result,
+                                                      redirectCount](const 
auto& error,
                                                                      const 
ClientConnectionWeakPtr& weakCnx) {
-        if (result != ResultOk) {
-            promise->setFailed(result);
+        if (error.result != ResultOk) {
+            promise->setFailed(error.result);
             return;
         }
         auto cnx = weakCnx.lock();
@@ -62,10 +62,10 @@ auto BinaryProtoLookupService::findBroker(const 
std::string& address, bool autho
         auto lookupPromise = std::make_shared<LookupDataResultPromise>();
         cnx->newTopicLookup(topic, authoritative, listenerName_, 
newRequestId(), lookupPromise);
         lookupPromise->getFuture().addListener([this, cnx, promise, topic, 
address, redirectCount](
-                                                   Result result, const 
LookupDataResultPtr& data) {
-            if (result != ResultOk || !data) {
-                LOG_ERROR("Lookup failed for " << topic << ", result " << 
result);
-                promise->setFailed(result);
+                                                   const Error& error, const 
LookupDataResultPtr& data) {
+            if (error.result != ResultOk || !data) {
+                LOG_ERROR("Lookup failed for " << topic << ", result " << 
error);
+                promise->setFailed(error.result);
                 return;
             }
 
@@ -96,15 +96,11 @@ auto BinaryProtoLookupService::findBroker(const 
std::string& address, bool autho
     return promise->getFuture();
 }
 
-/*
- * @param    topicName topic to get number of partitions.
- *
- */
-Future<Result, LookupDataResultPtr> 
BinaryProtoLookupService::getPartitionMetadataAsync(
+Future<Error, LookupDataResultPtr> 
BinaryProtoLookupService::getPartitionMetadataAsync(
     const TopicNamePtr& topicName) {
     LookupDataResultPromisePtr promise = 
std::make_shared<LookupDataResultPromise>();
     if (!topicName) {
-        promise->setFailed(ResultInvalidTopicName);
+        promise->setFailed(Error{ResultInvalidTopicName, ""});
         return promise->getFuture();
     }
     std::string lookupName = topicName->toString();
@@ -115,16 +111,17 @@ Future<Result, LookupDataResultPtr> 
BinaryProtoLookupService::getPartitionMetada
     return promise->getFuture();
 }
 
-void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const 
std::string& topicName, Result result,
+void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const 
std::string& topicName,
+                                                                  const Error& 
error,
                                                                   const 
ClientConnectionWeakPtr& clientCnx,
                                                                   const 
LookupDataResultPromisePtr& promise) {
-    if (result != ResultOk) {
-        promise->setFailed(result);
+    if (error.result != ResultOk) {
+        promise->setFailed(error);
         return;
     }
     auto conn = clientCnx.lock();
     if (!conn) {
-        promise->setFailed(ResultConnectError);
+        promise->setFailed(Error{ResultConnectError, ""});
         return;
     }
     LookupDataResultPromisePtr lookupPromise = 
std::make_shared<LookupDataResultPromise>();
@@ -135,7 +132,7 @@ void 
BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
                                                      std::placeholders::_2, 
clientCnx, promise));
 }
 
-void BinaryProtoLookupService::handlePartitionMetadataLookup(const 
std::string& topicName, Result result,
+void BinaryProtoLookupService::handlePartitionMetadataLookup(const 
std::string& topicName, const Error& error,
                                                              const 
LookupDataResultPtr& data,
                                                              const 
ClientConnectionWeakPtr& clientCnx,
                                                              const 
LookupDataResultPromisePtr& promise) {
@@ -144,8 +141,8 @@ void 
BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string&
                                                           << 
data->getBrokerUrl());
         promise->setValue(data);
     } else {
-        LOG_DEBUG("PartitionMetadataLookup failed for " << topicName << ", 
result " << result);
-        promise->setFailed(result);
+        LOG_DEBUG("PartitionMetadataLookup failed for " << topicName << ", 
result " << error);
+        promise->setFailed(error);
     }
 }
 
@@ -168,26 +165,28 @@ Future<Result, NamespaceTopicsPtr> 
BinaryProtoLookupService::getTopicsOfNamespac
     return promise->getFuture();
 }
 
-Future<Result, SchemaInfo> BinaryProtoLookupService::getSchema(const 
TopicNamePtr& topicName,
-                                                               const 
std::string& version) {
-    GetSchemaPromisePtr promise = std::make_shared<Promise<Result, 
SchemaInfo>>();
-
+Future<Error, SchemaInfo> BinaryProtoLookupService::getSchema(const 
TopicNamePtr& topicName,
+                                                              const 
std::string& version) {
+    GetSchemaPromisePtr promise = std::make_shared<Promise<Error, 
SchemaInfo>>();
     if (!topicName) {
-        promise->setFailed(ResultInvalidTopicName);
+        promise->setFailed(Error{ResultInvalidTopicName, ""});
         return promise->getFuture();
     }
-    cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
+
+    const auto topic = topicName->toString();
+    const auto address = serviceNameResolver_.resolveHost();
+    cnxPool_.getConnectionAsync(address, address)
         
.addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, 
topicName->toString(),
                                version, std::placeholders::_1, 
std::placeholders::_2, promise));
-
     return promise->getFuture();
 }
 
 void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& 
topicName, const std::string& version,
-                                                    Result result, const 
ClientConnectionWeakPtr& clientCnx,
+                                                    const Error& error,
+                                                    const 
ClientConnectionWeakPtr& clientCnx,
                                                     const GetSchemaPromisePtr& 
promise) {
-    if (result != ResultOk) {
-        promise->setFailed(result);
+    if (error.result != ResultOk) {
+        promise->setFailed(error);
         return;
     }
 
@@ -195,11 +194,10 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const 
std::string& topicName
     uint64_t requestId = newRequestId();
     LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: 
" << topicName
                                                   << " version: " << version);
-
     conn->newGetSchema(topicName, version, requestId)
-        .addListener([promise](Result result, const SchemaInfo& schemaInfo) {
-            if (result != ResultOk) {
-                promise->setFailed(result);
+        .addListener([promise](const auto& error, const SchemaInfo& 
schemaInfo) {
+            if (error.result != ResultOk) {
+                promise->setFailed(error);
                 return;
             }
             promise->setValue(schemaInfo);
@@ -208,11 +206,11 @@ void BinaryProtoLookupService::sendGetSchemaRequest(const 
std::string& topicName
 
 void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const 
std::string& nsName,
                                                                
CommandGetTopicsOfNamespace_Mode mode,
-                                                               Result result,
+                                                               const Error& 
error,
                                                                const 
ClientConnectionWeakPtr& clientCnx,
                                                                const 
NamespaceTopicsPromisePtr& promise) {
-    if (result != ResultOk) {
-        promise->setFailed(result);
+    if (error.result != ResultOk) {
+        promise->setFailed(error.result);
         return;
     }
 
diff --git a/lib/BinaryProtoLookupService.h b/lib/BinaryProtoLookupService.h
index 35dcb16..b5adc42 100644
--- a/lib/BinaryProtoLookupService.h
+++ b/lib/BinaryProtoLookupService.h
@@ -35,7 +35,7 @@ class ConnectionPool;
 class LookupDataResult;
 class ServiceNameResolver;
 using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, 
NamespaceTopicsPtr>>;
-using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, SchemaInfo>>;
+using GetSchemaPromisePtr = std::shared_ptr<Promise<Error, SchemaInfo>>;
 
 class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
    public:
@@ -48,12 +48,12 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public 
LookupService {
 
     LookupResultFuture getBroker(const TopicName& topicName) override;
 
-    Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr& topicName) override;
+    Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr& topicName) override;
 
     Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
         const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) 
override;
 
-    Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const 
std::string& version) override;
+    Future<Error, SchemaInfo> getSchema(const TopicNamePtr& topicName, const 
std::string& version) override;
 
     ServiceNameResolver& getServiceNameResolver() override { return 
serviceNameResolver_; }
 
@@ -71,20 +71,20 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public 
LookupService {
     std::string listenerName_;
     const int32_t maxLookupRedirects_;
 
-    void sendPartitionMetadataLookupRequest(const std::string& topicName, 
Result result,
+    void sendPartitionMetadataLookupRequest(const std::string& topicName, 
const Error& error,
                                             const ClientConnectionWeakPtr& 
clientCnx,
                                             const LookupDataResultPromisePtr& 
promise);
 
-    void handlePartitionMetadataLookup(const std::string& topicName, Result 
result,
+    void handlePartitionMetadataLookup(const std::string& topicName, const 
Error& error,
                                        const LookupDataResultPtr& data,
                                        const ClientConnectionWeakPtr& 
clientCnx,
                                        const LookupDataResultPromisePtr& 
promise);
 
     void sendGetTopicsOfNamespaceRequest(const std::string& nsName, 
CommandGetTopicsOfNamespace_Mode mode,
-                                         Result result, const 
ClientConnectionWeakPtr& clientCnx,
+                                         const Error& error, const 
ClientConnectionWeakPtr& clientCnx,
                                          const NamespaceTopicsPromisePtr& 
promise);
 
-    void sendGetSchemaRequest(const std::string& topicName, const std::string& 
version, Result result,
+    void sendGetSchemaRequest(const std::string& topicName, const std::string& 
version, const Error& error,
                               const ClientConnectionWeakPtr& clientCnx, const 
GetSchemaPromisePtr& promise);
 
     void getTopicsOfNamespaceListener(Result result, const NamespaceTopicsPtr& 
topicsPtr,
diff --git a/lib/Client.cc b/lib/Client.cc
index 48e92dd..32ab87c 100644
--- a/lib/Client.cc
+++ b/lib/Client.cc
@@ -19,6 +19,7 @@
 #include <pulsar/Client.h>
 #include <pulsar/ServiceInfoProvider.h>
 
+#include <future>
 #include <iostream>
 #include <memory>
 #include <utility>
@@ -65,7 +66,31 @@ void Client::createProducerAsync(const std::string& topic, 
const CreateProducerC
 
 void Client::createProducerAsync(const std::string& topic, const 
ProducerConfiguration& conf,
                                  const CreateProducerCallback& callback) {
-    impl_->createProducerAsync(topic, conf, callback);
+    impl_->createProducerAsync(topic, conf, [callback](const auto& v) {
+        if (const auto* error = std::get_if<Error>(&v)) {
+            callback(error->result, Producer());
+        } else {
+            callback(ResultOk, std::get<Producer>(v));
+        }
+    });
+}
+
+void Client::createProducerAsyncV2(const std::string& topic, const 
ProducerConfiguration& conf,
+                                   CreateProducerV2Callback callback) {
+    impl_->createProducerAsync(topic, conf, std::move(callback));
+}
+
+std::variant<Error, Producer> Client::createProducerV2(const std::string& 
topic,
+                                                       const 
ProducerConfiguration& conf) {
+    std::promise<std::variant<Error, Producer>> promise;
+    createProducerAsyncV2(topic, conf, [&promise](const auto& v) mutable {
+        if (const auto* error = std::get_if<Error>(&v)) {
+            promise.set_value(*error);
+        } else {
+            promise.set_value(std::get<Producer>(v));
+        }
+    });
+    return promise.get_future().get();
 }
 
 Result Client::subscribe(const std::string& topic, const std::string& 
subscriptionName, Consumer& consumer) {
@@ -169,9 +194,9 @@ void Client::createTableViewAsync(const std::string& topic, 
const TableViewConfi
 }
 
 Result Client::getPartitionsForTopic(const std::string& topic, 
std::vector<std::string>& partitions) {
-    Promise<Result, std::vector<std::string> > promise;
-    getPartitionsForTopicAsync(topic, 
WaitForCallbackValue<std::vector<std::string> >(promise));
-    Future<Result, std::vector<std::string> > future = promise.getFuture();
+    Promise<Result, std::vector<std::string>> promise;
+    getPartitionsForTopicAsync(topic, 
WaitForCallbackValue<std::vector<std::string>>(promise));
+    Future<Result, std::vector<std::string>> future = promise.getFuture();
 
     return future.get(partitions);
 }
@@ -199,7 +224,9 @@ uint64_t Client::getNumberOfConsumers() { return 
impl_->getNumberOfConsumers();
 void Client::getSchemaInfoAsync(const std::string& topic, int64_t version,
                                 std::function<void(Result, const SchemaInfo&)> 
callback) {
     impl_->getSchema(TopicName::get(topic), (version >= 0) ? 
toBigEndianBytes(version) : "")
-        .addListener(std::move(callback));
+        .addListener([callback{std::move(callback)}](const Error& error, const 
SchemaInfo& schemaInfo) {
+            callback(error.result, schemaInfo);
+        });
 }
 
 ServiceInfo Client::getServiceInfo() const { return impl_->getServiceInfo(); }
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index a135ade..9559703 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -209,8 +209,7 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
     LOG_INFO(cnxString() << "Create ClientConnection, timeout="
                          << clientConfiguration.getConnectionTimeout());
     if (!authentication_) {
-        LOG_ERROR("Invalid authentication plugin");
-        throw ResultAuthenticationError;
+        throw Error{ResultAuthenticationError, "Invalid authentication 
plugin"};
         return;
     }
 
@@ -248,7 +247,7 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
                     ctx.load_verify_file(trustCertFilePath);
                 } else {
                     LOG_ERROR(trustCertFilePath << ": No such trustCertFile");
-                    throw ResultAuthenticationError;
+                    throw Error{ResultAuthenticationError, trustCertFilePath + 
": No such trustCertFile"};
                 }
             } else {
                 ctx.set_default_verify_paths();
@@ -265,11 +264,11 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
             tlsPrivateKey = authData->getTlsPrivateKey();
             if (!file_exists(tlsCertificates)) {
                 LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
-                throw ResultAuthenticationError;
+                throw Error{ResultAuthenticationError, tlsCertificates + ": No 
such tlsCertificates"};
             }
             if (!file_exists(tlsCertificates)) {
                 LOG_ERROR(tlsCertificates << ": No such tlsCertificates");
-                throw ResultAuthenticationError;
+                throw Error{ResultAuthenticationError, tlsCertificates + ": No 
such tlsCertificates"};
             }
             ctx.use_private_key_file(tlsPrivateKey, ASIO::ssl::context::pem);
             ctx.use_certificate_chain_file(tlsCertificates);
@@ -304,7 +303,7 @@ ClientConnection::~ClientConnection() {
 void ClientConnection::handlePulsarConnected(const proto::CommandConnected& 
cmdConnected) {
     if (!cmdConnected.has_server_version()) {
         LOG_ERROR(cnxString() << "Server version is not set");
-        close();
+        close(Error{ResultConnectError, cnxString() + "Server version is not 
set"});
         return;
     }
 
@@ -414,7 +413,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& 
err, const tcp::endp
             std::atomic_store(&cnxStringPtr_, 
std::make_shared<std::string>(cnxStringStream.str()));
         } catch (const ASIO_SYSTEM_ERROR& e) {
             LOG_ERROR("Failed to get endpoint: " << e.what());
-            close(ResultRetryable);
+            close(Error{ResultRetryable, ""});
             return;
         }
         if (logicalAddress_ == physicalAddress_) {
@@ -470,7 +469,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& 
err, const tcp::endp
                 Url service_url;
                 if (!Url::parse(physicalAddress_, service_url)) {
                     LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " 
<< err << " " << err.message());
-                    close();
+                    close(Error{ResultConnectError, "Invalid Url, unable to 
parse: " + err.message()});
                     return;
                 }
             }
@@ -492,9 +491,9 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& 
err, const tcp::endp
             cancelTimer(*connectTimer_);
         }
         if (err == ASIO::error::operation_aborted) {
-            close();
+            close(Error{ResultConnectError, "Connection attempt was 
canceled"});
         } else {
-            close(ResultRetryable);
+            close(Error{ResultRetryable, ""});
         }
     }
 }
@@ -503,10 +502,10 @@ void ClientConnection::handleHandshake(const ASIO_ERROR& 
err) {
     if (err) {
         if (err.value() == ASIO::ssl::error::stream_truncated) {
             LOG_WARN(cnxString() << "Handshake failed: " << err.message());
-            close(ResultRetryable);
+            close(Error{ResultRetryable, ""});
         } else {
             LOG_ERROR(cnxString() << "Handshake failed: " << err.message());
-            close();
+            close(Error{ResultConnectError, "Handshake failed: " + 
err.message()});
         }
         return;
     }
@@ -515,16 +514,17 @@ void ClientConnection::handleHandshake(const ASIO_ERROR& 
err) {
     Result result = ResultOk;
     SharedBuffer buffer;
     try {
+        // TODO: pass error instead of result
         buffer = Commands::newConnect(authentication_, logicalAddress_, 
connectingThroughProxy,
                                       clientVersion_, result);
     } catch (const std::exception& e) {
         LOG_ERROR(cnxString() << "Failed to create Connect command: " << 
e.what());
-        close(ResultAuthenticationError);
+        close(Error{ResultAuthenticationError, "Failed to create Connect 
command: " + std::string(e.what())});
         return;
     }
     if (result != ResultOk) {
         LOG_ERROR(cnxString() << "Failed to establish connection: " << result);
-        close(result);
+        close(Error{result, ""});
         return;
     }
     // Send CONNECT command to broker
@@ -541,7 +541,7 @@ void ClientConnection::handleSentPulsarConnect(const 
ASIO_ERROR& err, const Shar
     }
     if (err) {
         LOG_ERROR(cnxString() << "Failed to establish connection: " << 
err.message());
-        close();
+        close(Error{ResultConnectError, "Failed to establish connection: " + 
err.message()});
         return;
     }
 
@@ -555,7 +555,7 @@ void ClientConnection::handleSentAuthResponse(const 
ASIO_ERROR& err, const Share
     }
     if (err) {
         LOG_WARN(cnxString() << "Failed to send auth response: " << 
err.message());
-        close();
+        close(Error{ResultRetryable, "Failed to send auth response: " + 
err.message()});
         return;
     }
 }
@@ -576,14 +576,15 @@ void ClientConnection::tcpConnectAsync() {
     std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
     if (!Url::parse(hostUrl, service_url)) {
         LOG_ERROR(cnxString() << "Invalid Url, unable to parse: " << err << " 
" << err.message());
-        close();
+        close(Error{ResultConnectError, "Invalid Url, unable to parse: " + 
err.message()});
         return;
     }
 
     if (service_url.protocol() != "pulsar" && service_url.protocol() != 
"pulsar+ssl") {
         LOG_ERROR(cnxString() << "Invalid Url protocol '" << 
service_url.protocol()
                               << "'. Valid values are 'pulsar' and 
'pulsar+ssl'");
-        close();
+        close(Error{ResultConnectError, "Invalid Url protocol '" + 
service_url.protocol() +
+                                            "'. Valid values are 'pulsar' and 
'pulsar+ssl'"});
         return;
     }
 
@@ -598,7 +599,7 @@ void ClientConnection::handleResolve(ASIO_ERROR err, const 
tcp::resolver::result
     if (err) {
         std::string hostUrl = isSniProxy_ ? cnxString() : proxyServiceUrl_;
         LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << 
err.message());
-        close();
+        close(Error{ResultConnectError, hostUrl + " Resolve error: " + 
err.message()});
         return;
     }
 
@@ -625,7 +626,8 @@ void ClientConnection::handleResolve(ASIO_ERROR err, const 
tcp::resolver::result
             LOG_ERROR(cnxString() << "Connection to " << results << " was not 
established in "
                                   << connectTimeout_.count() << " ms");
             lock.unlock();
-            close();
+            close(Error{ResultConnectError, "Connection was not established in 
" +
+                                                
std::to_string(connectTimeout_.count()) + " ms"});
         }  // else: the connection is closed or already established
     });
 
@@ -660,7 +662,7 @@ void ClientConnection::handleRead(const ASIO_ERROR& err, 
size_t bytesTransferred
         } else {
             LOG_ERROR(cnxString() << "Read operation failed: " << 
err.message());
         }
-        close(ResultDisconnected);
+        close(Error{ResultDisconnected, ""});
     } else if (bytesTransferred < minReadSize) {
         // Read the remaining part, use a slice of buffer to write on the next
         // region
@@ -711,7 +713,7 @@ void ClientConnection::processIncomingBuffer() {
         proto::BaseCommand incomingCmd;
         if (!incomingCmd.ParseFromArray(incomingBuffer_.data(), cmdSize)) {
             LOG_ERROR(cnxString() << "Error parsing protocol buffer command");
-            close(ResultDisconnected);
+            close(Error{ResultDisconnected, ""});
             return;
         }
 
@@ -735,7 +737,7 @@ void ClientConnection::processIncomingBuffer() {
                               << ", message ledger id " << 
incomingCmd.message().message_id().ledgerid()
                               << ", entry id " << 
incomingCmd.message().message_id().entryid()
                               << "] Error parsing broker entry metadata");
-                    close(ResultDisconnected);
+                    close(Error{ResultDisconnected, ""});
                     return;
                 }
                 incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + 
brokerEntryMetadataSize);
@@ -753,7 +755,7 @@ void ClientConnection::processIncomingBuffer() {
                           << ", message ledger id " << 
incomingCmd.message().message_id().ledgerid()  //
                           << ", entry id " << 
incomingCmd.message().message_id().entryid()
                           << "] Error parsing message metadata");
-                close(ResultDisconnected);
+                close(Error{ResultDisconnected, ""});
                 return;
             }
 
@@ -886,7 +888,8 @@ void ClientConnection::handleIncomingCommand(BaseCommand& 
incomingCmd) {
             // Handle Pulsar Connected
             if (incomingCmd.type() != BaseCommand::CONNECTED) {
                 // Wrong cmd
-                close();
+                close(Error{ResultDisconnected, cnxString() + "Expected 
CONNECTED command but received " +
+                                                    
Commands::messageType(incomingCmd.type())});
             } else {
                 handlePulsarConnected(incomingCmd.connected());
             }
@@ -987,7 +990,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& 
incomingCmd) {
 
                 default:
                     LOG_WARN(cnxString() << "Received invalid message from 
server");
-                    close(ResultDisconnected);
+                    close(Error{ResultDisconnected, cnxString() + "Received 
invalid message from server"});
                     break;
             }
         }
@@ -1032,11 +1035,11 @@ void ClientConnection::newLookup(const SharedBuffer& 
cmd, uint64_t requestId, co
     Lock lock(mutex_);
     if (isClosed()) {
         lock.unlock();
-        promise->setFailed(ResultNotConnected);
+        promise->setFailed(Error{ResultNotConnected, ""});
         return;
     } else if (numOfPendingLookupRequest_ >= maxPendingLookupRequest_) {
         lock.unlock();
-        promise->setFailed(ResultTooManyLookupRequestException);
+        promise->setFailed(Error{ResultTooManyLookupRequestException, ""});
         return;
     }
 
@@ -1048,13 +1051,14 @@ void ClientConnection::newLookup(const SharedBuffer& 
cmd, uint64_t requestId, co
                 self->numOfPendingLookupRequest_--;
             }
         });
-    request->getFuture().addListener([promise](Result result, const 
LookupDataResultPtr& lookupDataResult) {
-        if (result == ResultOk) {
-            promise->setValue(lookupDataResult);
-        } else {
-            promise->setFailed(result);
-        }
-    });
+    request->getFuture().addListener(
+        [promise](const auto& error, const LookupDataResultPtr& 
lookupDataResult) {
+            if (error.result == ResultOk) {
+                promise->setValue(lookupDataResult);
+            } else {
+                promise->setFailed(error);
+            }
+        });
 
     numOfPendingLookupRequest_++;
     lock.unlock();
@@ -1112,7 +1116,7 @@ void ClientConnection::handleSend(const ASIO_ERROR& err, 
const SharedBuffer&) {
     }
     if (err) {
         LOG_WARN(cnxString() << "Could not send message on connection: " << 
err << " " << err.message());
-        close(ResultDisconnected);
+        close(Error{ResultDisconnected, ""});
     } else {
         sendPendingCommands();
     }
@@ -1124,7 +1128,7 @@ void ClientConnection::handleSendPair(const ASIO_ERROR& 
err) {
     }
     if (err) {
         LOG_WARN(cnxString() << "Could not send pair message on connection: " 
<< err << " " << err.message());
-        close(ResultDisconnected);
+        close(Error{ResultDisconnected, ""});
     } else {
         sendPendingCommands();
     }
@@ -1164,16 +1168,16 @@ void ClientConnection::sendPendingCommands() {
     }
 }
 
-Future<Result, ResponseData> ClientConnection::sendRequestWithId(const 
SharedBuffer& cmd, int requestId,
-                                                                 const char* 
requestType) {
+Future<Error, ResponseData> ClientConnection::sendRequestWithId(const 
SharedBuffer& cmd, int requestId,
+                                                                const char* 
requestType) {
     Lock lock(mutex_);
 
     if (isClosed()) {
         lock.unlock();
         LOG_DEBUG(cnxString() << "Fail " << requestType << "(req_id: " << 
requestId
                               << ") to a closed connection");
-        Promise<Result, ResponseData> promise;
-        promise.setFailed(ResultNotConnected);
+        Promise<Error, ResponseData> promise;
+        promise.setFailed(Error{ResultNotConnected, ""});
         return promise.getFuture();
     }
 
@@ -1206,7 +1210,7 @@ void ClientConnection::handleKeepAliveTimeout(const 
ASIO_ERROR& ec) {
 
     if (havePendingPingRequest_) {
         LOG_WARN(cnxString() << "Forcing connection to close after keep-alive 
timeout");
-        close(ResultDisconnected);
+        close(Error{ResultDisconnected, ""});
     } else {
         // Send keep alive probe to peer
         LOG_DEBUG(cnxString() << "Sending ping message");
@@ -1234,10 +1238,10 @@ void ClientConnection::handleConsumerStatsTimeout(const 
ASIO_ERROR& ec,
     startConsumerStatsTimer(consumerStatsRequests);
 }
 
-const std::future<void>& ClientConnection::close(Result result, bool 
switchCluster) {
+const std::future<void>& ClientConnection::close(Error&& error, bool 
switchCluster) {
     Lock lock(mutex_);
     if (closeFuture_) {
-        connectPromise_.setFailed(result);
+        connectPromise_.setFailed(std::move(error));
         return *closeFuture_;
     }
     auto promise = std::make_shared<std::promise<void>>();
@@ -1279,6 +1283,7 @@ const std::future<void>& ClientConnection::close(Result 
result, bool switchClust
     cancelTimer(*connectTimer_);
     lock.unlock();
     int refCount = weak_from_this().use_count();
+    auto result = error.result;
     if (result != ResultAlreadyClosed /* closed by the pool */ && 
!isResultRetryable(result)) {
         LOG_ERROR(cnxString() << "Connection closed with " << result << " 
(refCnt: " << refCount << ")");
     } else {
@@ -1327,37 +1332,33 @@ const std::future<void>& ClientConnection::close(Result 
result, bool switchClust
     }
     self.reset();
 
-    connectPromise_.setFailed(result);
+    connectPromise_.setFailed(error);
 
     // Fail all pending requests after releasing the lock.
     for (auto& kv : pendingRequests) {
-        kv.second->fail(result);
+        kv.second->fail(error);
     }
     for (auto& kv : pendingLookupRequests) {
-        kv.second->fail(result);
+        kv.second->fail(error);
     }
     for (auto& kv : pendingConsumerStatsMap) {
         LOG_ERROR(cnxString() << " Closing Client Connection, please try again 
later");
         kv.second.setFailed(result);
     }
     for (auto& kv : pendingGetLastMessageIdRequests) {
-        kv.second->fail(result);
+        kv.second->fail(error);
     }
     for (auto& kv : pendingGetNamespaceTopicsRequests) {
-        kv.second->fail(result);
+        kv.second->fail(error);
     }
     for (auto& kv : pendingGetSchemaRequests) {
-        kv.second->fail(result);
+        kv.second->fail(error);
     }
     return *closeFuture_;
 }
 
 bool ClientConnection::isClosed() const { return state_ == Disconnected; }
 
-Future<Result, ClientConnectionWeakPtr> ClientConnection::getConnectFuture() {
-    return connectPromise_.getFuture();
-}
-
 void ClientConnection::registerProducer(int producerId, const ProducerImplPtr& 
producer) {
     Lock lock(mutex_);
     producers_.insert(std::make_pair(producerId, producer));
@@ -1405,12 +1406,21 @@ Future<Result, GetLastMessageIdResponse> 
ClientConnection::newGetLastMessageId(u
         });
     lock.unlock();
 
+    // TODO: return Error instead
+    Promise<Result, GetLastMessageIdResponse> promise;
+    request->getFuture().addListener([promise](const auto& error, const auto& 
response) {
+        if (error.result == ResultOk) {
+            promise.setValue(response);
+        } else {
+            promise.setFailed(error.result);
+        }
+    });
     if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != 
nullptr &&
         mockServer_->sendRequest("GET_LAST_MESSAGE_ID", requestId)) {
-        return request->getFuture();
+        return promise.getFuture();
     }
     sendCommand(Commands::newGetLastMessageId(consumerId, requestId));
-    return request->getFuture();
+    return promise.getFuture();
 }
 
 Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
@@ -1429,23 +1439,34 @@ Future<Result, NamespaceTopicsPtr> 
ClientConnection::newGetTopicsOfNamespace(
             LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to 
broker, req_id: " << requestId);
         });
     lock.unlock();
+
+    // TODO: return Error instead
+    Promise<Result, NamespaceTopicsPtr> promise;
+    request->getFuture().addListener([promise](const auto& error, const auto& 
response) {
+        if (error.result == ResultOk) {
+            promise.setValue(response);
+        } else {
+            promise.setFailed(error.result);
+        }
+    });
+
     if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != 
nullptr &&
         mockServer_->sendRequest("GET_TOPICS_OF_NAMESPACE", requestId)) {
-        return request->getFuture();
+        return promise.getFuture();
     }
     sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId));
-    return request->getFuture();
+    return promise.getFuture();
 }
 
-Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& 
topicName,
-                                                          const std::string& 
version, uint64_t requestId) {
+Future<Error, SchemaInfo> ClientConnection::newGetSchema(const std::string& 
topicName,
+                                                         const std::string& 
version, uint64_t requestId) {
     Lock lock(mutex_);
 
     if (isClosed()) {
         lock.unlock();
         LOG_ERROR(cnxString() << "Client is not connected to the broker");
-        Promise<Result, SchemaInfo> promise;
-        promise.setFailed(ResultNotConnected);
+        Promise<Error, SchemaInfo> promise;
+        promise.setFailed(Error{ResultNotConnected, ""});
         return promise.getFuture();
     }
 
@@ -1453,14 +1474,15 @@ Future<Result, SchemaInfo> 
ClientConnection::newGetSchema(const std::string& top
         insertRequest(pendingGetSchemaRequests_, requestId, [cnxString = 
cnxString(), requestId]() {
             LOG_WARN(cnxString << "GetSchema request timeout to broker, 
req_id: " << requestId);
         });
+    auto future = request->getFuture();
     lock.unlock();
 
     if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != 
nullptr &&
         mockServer_->sendRequest("GET_SCHEMA", requestId)) {
-        return request->getFuture();
+        return future;
     }
     sendCommand(Commands::newGetSchema(topicName, version, requestId));
-    return request->getFuture();
+    return future;
 }
 
 void ClientConnection::checkServerError(ServerError error, const std::string& 
message) {
@@ -1486,7 +1508,7 @@ void ClientConnection::handleSendReceipt(const 
proto::CommandSendReceipt& sendRe
             if (!producer->ackReceived(sequenceId, messageId)) {
                 // If the producer fails to process the ack, we need to close 
the connection
                 // to give it a chance to recover from there
-                close(ResultDisconnected);
+                close(Error{ResultDisconnected, ""});
             }
         }
     } else {
@@ -1510,12 +1532,12 @@ void ClientConnection::handleSendError(const 
proto::CommandSendError& error) {
                 if (!producer->removeCorruptMessage(sequenceId)) {
                     // If the producer fails to remove corrupt msg, we need to 
close the
                     // connection to give it a chance to recover from there
-                    close(ResultDisconnected);
+                    close(Error{ResultDisconnected, ""});
                 }
             }
         }
     } else {
-        close(ResultDisconnected);
+        close(Error{ResultDisconnected, ""});
     }
 }
 
@@ -1554,13 +1576,16 @@ void 
ClientConnection::handlePartitionedMetadataResponse(
                                       << partitionMetadataResponse.request_id()
                                       << " error: " << 
partitionMetadataResponse.error()
                                       << " msg: " << 
partitionMetadataResponse.message());
-                checkServerError(partitionMetadataResponse.error(), 
partitionMetadataResponse.message());
-                request->fail(
-                    getResult(partitionMetadataResponse.error(), 
partitionMetadataResponse.message()));
+                auto msg = partitionMetadataResponse.message();
+                checkServerError(partitionMetadataResponse.error(), msg);
+                auto result = getResult(partitionMetadataResponse.error(), 
msg);
+                request->fail(Error{result, std::move(msg)});
             } else {
                 LOG_ERROR(cnxString() << "Failed partition-metadata lookup 
req_id: "
                                       << 
partitionMetadataResponse.request_id() << " with empty response: ");
-                request->fail(ResultConnectError);
+                request->fail(
+                    Error{ResultConnectError, "Empty response from broker for 
request " +
+                                                  
std::to_string(partitionMetadataResponse.request_id())});
             }
         } else {
             LookupDataResultPtr lookupResultPtr = 
std::make_shared<LookupDataResult>();
@@ -1627,12 +1652,16 @@ void ClientConnection::handleLookupTopicRespose(
                 LOG_ERROR(cnxString() << "Failed lookup req_id: " << 
lookupTopicResponse.request_id()
                                       << " error: " << 
lookupTopicResponse.error()
                                       << " msg: " << 
lookupTopicResponse.message());
-                checkServerError(lookupTopicResponse.error(), 
lookupTopicResponse.message());
-                request->fail(getResult(lookupTopicResponse.error(), 
lookupTopicResponse.message()));
+                auto msg = lookupTopicResponse.message();
+                checkServerError(lookupTopicResponse.error(), msg);
+                auto result = getResult(lookupTopicResponse.error(), 
lookupTopicResponse.message());
+                request->fail(Error{result, std::move(msg)});
             } else {
                 LOG_ERROR(cnxString() << "Failed lookup req_id: " << 
lookupTopicResponse.request_id()
                                       << " with empty response: ");
-                request->fail(ResultConnectError);
+                request->fail(
+                    Error{ResultConnectError, "Empty response from broker for 
request " +
+                                                  
std::to_string(lookupTopicResponse.request_id())});
             }
         } else {
             LOG_DEBUG(cnxString() << "Received lookup response from server. 
req_id: "
@@ -1702,6 +1731,7 @@ void ClientConnection::handleError(const 
proto::CommandError& error) {
     LOG_WARN(cnxString() << "Received error response from server: " << result
                          << (error.has_message() ? (" (" + error.message() + 
")") : "")
                          << " -- req_id: " << error.request_id());
+    Error requestError{result, error.message()};
 
     Lock lock(mutex_);
 
@@ -1711,7 +1741,7 @@ void ClientConnection::handleError(const 
proto::CommandError& error) {
         pendingRequests_.erase(it);
         lock.unlock();
 
-        request->fail(result);
+        request->fail(requestError);
     } else {
         auto it = pendingGetLastMessageIdRequests_.find(error.request_id());
         if (it != pendingGetLastMessageIdRequests_.end()) {
@@ -1719,7 +1749,7 @@ void ClientConnection::handleError(const 
proto::CommandError& error) {
             pendingGetLastMessageIdRequests_.erase(it);
             lock.unlock();
 
-            request->fail(result);
+            request->fail(requestError);
         } else {
             auto it = 
pendingGetNamespaceTopicsRequests_.find(error.request_id());
             if (it != pendingGetNamespaceTopicsRequests_.end()) {
@@ -1727,7 +1757,7 @@ void ClientConnection::handleError(const 
proto::CommandError& error) {
                 pendingGetNamespaceTopicsRequests_.erase(it);
                 lock.unlock();
 
-                request->fail(result);
+                request->fail(requestError);
             } else {
                 lock.unlock();
             }
@@ -1857,10 +1887,11 @@ void ClientConnection::handleAuthChallenge() {
     LOG_DEBUG(cnxString() << "Received auth challenge from broker");
 
     Result result;
+    // TODO: use Error instead
     SharedBuffer buffer = Commands::newAuthResponse(authentication_, result);
     if (result != ResultOk) {
         LOG_ERROR(cnxString() << "Failed to send auth response: " << result);
-        close(result);
+        close(Error{result, "Failed to auth response"});
         return;
     }
     auto self = shared_from_this();
@@ -1955,7 +1986,7 @@ void ClientConnection::handleGetSchemaResponse(const 
proto::CommandGetSchemaResp
                                              : "")
                                      << " -- req_id: " << 
response.request_id());
             }
-            request->fail(result);
+            request->fail(Error{result, response.error_message()});
             return;
         }
 
@@ -1992,7 +2023,7 @@ void ClientConnection::handleAckResponse(const 
proto::CommandAckResponse& respon
     lock.unlock();
 
     if (response.has_error()) {
-        request->fail(getResult(response.error(), ""));
+        request->fail(Error{getResult(response.error(), ""), 
response.message()});
     } else {
         request->complete({});
     }
@@ -2003,7 +2034,7 @@ void ClientConnection::unsafeRemovePendingRequest(long 
requestId) {
     if (it != pendingRequests_.end()) {
         auto request = std::move(it->second);
         pendingRequests_.erase(it);
-        request->fail(ResultDisconnected);
+        request->fail(Error{ResultDisconnected, ""});
     }
 }
 
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index c8cd86f..fd89fae 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -163,11 +163,11 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      * @param result all pending futures will complete with this result
      * @param switchCluster whether the close is triggered by cluster switching
      */
-    const std::future<void>& close(Result result = ResultConnectError, bool 
switchCluster = false);
+    const std::future<void>& close(Error&& error = Error{ResultConnectError, 
""}, bool switchCluster = false);
 
     bool isClosed() const;
 
-    Future<Result, ClientConnectionWeakPtr> getConnectFuture();
+    auto getConnectFuture() const { return connectPromise_.getFuture(); }
 
     Future<Result, ClientConnectionWeakPtr> getCloseFuture();
 
@@ -191,8 +191,8 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      * Send a request with a specific Id over the connection. The future will 
be
      * triggered when the response for this request is received
      */
-    Future<Result, ResponseData> sendRequestWithId(const SharedBuffer& cmd, 
int requestId,
-                                                   const char* requestType);
+    Future<Error, ResponseData> sendRequestWithId(const SharedBuffer& cmd, int 
requestId,
+                                                  const char* requestType);
 
     const std::string& brokerAddress() const;
 
@@ -212,8 +212,8 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
                                                                
CommandGetTopicsOfNamespace_Mode mode,
                                                                uint64_t 
requestId);
 
-    Future<Result, SchemaInfo> newGetSchema(const std::string& topicName, 
const std::string& version,
-                                            uint64_t requestId);
+    Future<Error, SchemaInfo> newGetSchema(const std::string& topicName, const 
std::string& version,
+                                           uint64_t requestId);
 
     void attachMockServer(const std::shared_ptr<MockServer>& mockServer) {
         mockServer_ = mockServer;
@@ -334,7 +334,8 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
 
     SharedBuffer incomingBuffer_;
 
-    Promise<Result, ClientConnectionWeakPtr> connectPromise_;
+    Promise<Error, ClientConnectionWeakPtr> connectPromise_;
+
     const std::chrono::milliseconds connectTimeout_;
     const DeadlineTimerPtr connectTimer_;
 
diff --git a/lib/ClientConnectionAdaptor.h b/lib/ClientConnectionAdaptor.h
index 2c29937..780b35b 100644
--- a/lib/ClientConnectionAdaptor.h
+++ b/lib/ClientConnectionAdaptor.h
@@ -45,13 +45,13 @@ inline void checkServerError(Connection& connection, 
ServerError error, const st
                 message.find("KeeperException") == std::string::npos &&
                 message.find("is being unloaded") == std::string::npos &&
                 message.find("the broker do not have test listener") == 
std::string::npos) {
-                connection.close(ResultDisconnected);
+                connection.close(Error{ResultDisconnected, message});
             }
             break;
         case proto::ServerError::TooManyRequests:
             // TODO: Implement maxNumberOfRejectedRequestPerConnection like
             // https://github.com/apache/pulsar/pull/274
-            connection.close(ResultDisconnected);
+            connection.close(Error{ResultDisconnected, ""});
             break;
         default:
             break;
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index b84c14c..1e4f022 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -28,6 +28,7 @@
 #include <random>
 #include <shared_mutex>
 #include <sstream>
+#include <variant>
 
 #include "BinaryProtoLookupService.h"
 #include "ClientConfigurationImpl.h"
@@ -188,49 +189,52 @@ LookupServicePtr ClientImpl::getLookup(const std::string& 
redirectedClusterURI)
 }
 
 void ClientImpl::createProducerAsync(const std::string& topic, const 
ProducerConfiguration& conf,
-                                     const CreateProducerCallback& callback, 
bool autoDownloadSchema) {
+                                     CreateProducerV2Callback callback, bool 
autoDownloadSchema) {
     if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) {
         throw std::invalid_argument("Batching and chunking of messages can't 
be enabled together");
     }
+
     TopicNamePtr topicName;
     {
         std::shared_lock lock(mutex_);
         if (state_ != Open) {
             lock.unlock();
-            callback(ResultAlreadyClosed, Producer());
+            callback(Error{ResultAlreadyClosed, ""});
             return;
         } else if (!(topicName = TopicName::get(topic))) {
             lock.unlock();
-            callback(ResultInvalidTopicName, Producer());
+            // TODO: return an error in TopicName
+            callback(Error{ResultInvalidTopicName, ""});
             return;
         }
     }
 
     if (autoDownloadSchema) {
-        auto self = shared_from_this();
-        getSchema(topicName).addListener(
-            [self, topicName, callback](Result res, const SchemaInfo& 
topicSchema) {
-                if (res != ResultOk) {
-                    callback(res, Producer());
-                    return;
-                }
-                ProducerConfiguration conf;
-                conf.setSchema(topicSchema);
-                self->getPartitionMetadataAsync(topicName).addListener(
-                    std::bind(&ClientImpl::handleCreateProducer, self, 
std::placeholders::_1,
-                              std::placeholders::_2, topicName, conf, 
callback));
-            });
+        getSchema(topicName).addListener([self{shared_from_this()}, topicName, 
callback{std::move(callback)}](
+                                             const Error& error, const 
SchemaInfo& topicSchema) mutable {
+            if (error.result != ResultOk) {
+                callback(error);
+                return;
+            }
+            ProducerConfiguration conf;
+            conf.setSchema(topicSchema);
+            self->getPartitionMetadataAsync(topicName).addListener(
+                std::bind(&ClientImpl::handleCreateProducer, self, 
std::placeholders::_1,
+                          std::placeholders::_2, topicName, conf, callback));
+        });
     } else {
         getPartitionMetadataAsync(topicName).addListener(
-            std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), 
std::placeholders::_1,
-                      std::placeholders::_2, topicName, conf, callback));
+            [this, conf, topicName, callback{std::move(callback)}](
+                const Error& error, const LookupDataResultPtr& 
partitionMetadata) {
+                handleCreateProducer(error, partitionMetadata, topicName, 
conf, callback);
+            });
     }
 }
 
-void ClientImpl::handleCreateProducer(Result result, const 
LookupDataResultPtr& partitionMetadata,
+void ClientImpl::handleCreateProducer(const Error& error, const 
LookupDataResultPtr& partitionMetadata,
                                       const TopicNamePtr& topicName, const 
ProducerConfiguration& conf,
-                                      const CreateProducerCallback& callback) {
-    if (!result) {
+                                      CreateProducerV2Callback callback) {
+    if (!error.result) {
         ProducerImplBasePtr producer;
 
         auto interceptors = 
std::make_shared<ProducerInterceptors>(conf.getInterceptors());
@@ -244,36 +248,39 @@ void ClientImpl::handleCreateProducer(Result result, 
const LookupDataResultPtr&
             }
         } catch (const std::runtime_error& e) {
             LOG_ERROR("Failed to create producer: " << e.what());
-            callback(ResultConnectError, {});
+            callback(Error{ResultConnectError, e.what()});
             return;
         }
         producer->getProducerCreatedFuture().addListener(
-            std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), 
std::placeholders::_1,
-                      std::placeholders::_2, callback, producer));
+            [this, self{shared_from_this()}, callback{std::move(callback)}, 
producer](
+                const Error& error, const ProducerImplBaseWeakPtr& 
producerBaseWeakPtr) {
+                handleProducerCreated(error, producerBaseWeakPtr, callback, 
producer);
+            });
         producer->start();
     } else {
         LOG_ERROR("Error Checking/Getting Partition Metadata while creating 
producer on "
-                  << topicName->toString() << " -- " << result);
-        callback(result, Producer());
+                  << topicName->toString() << " -- " << error.result);
+        callback(error);
     }
 }
 
-void ClientImpl::handleProducerCreated(Result result, const 
ProducerImplBaseWeakPtr& producerBaseWeakPtr,
-                                       const CreateProducerCallback& callback,
+void ClientImpl::handleProducerCreated(const Error& error, const 
ProducerImplBaseWeakPtr& producerBaseWeakPtr,
+                                       const CreateProducerV2Callback& 
callback,
                                        const ProducerImplBasePtr& producer) {
-    if (result == ResultOk) {
+    if (error.result == ResultOk) {
         auto address = producer.get();
         auto existingProducer = producers_.putIfAbsent(address, producer);
         if (existingProducer) {
             auto producer = existingProducer.value().lock();
-            LOG_ERROR("Unexpected existing producer at the same address: "
-                      << address << ", producer: " << (producer ? 
producer->getProducerName() : "(null)"));
-            callback(ResultUnknownError, {});
+            const auto name = producer ? producer->getProducerName() : 
"(null)";
+            LOG_ERROR("Unexpected existing producer at the same address: " << 
address
+                                                                           << 
", producer: " << name);
+            callback(Error{ResultUnknownError, "Unexpected existing producer 
for name " + name});
             return;
         }
-        callback(result, Producer(producer));
+        callback(Producer(producer));
     } else {
-        callback(result, {});
+        callback(error);
     }
 }
 
@@ -293,10 +300,11 @@ void ClientImpl::createReaderAsync(const std::string& 
topic, const MessageId& st
         }
     }
 
-    MessageId msgId(startMessageId);
-    getPartitionMetadataAsync(topicName).addListener(
-        std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), 
std::placeholders::_1,
-                  std::placeholders::_2, topicName, msgId, conf, callback));
+    getPartitionMetadataAsync(topicName).addListener([this, 
self{shared_from_this()}, topicName,
+                                                      startMessageId, conf,
+                                                      callback](const auto& 
error, const auto& metadata) {
+        handleReaderMetadataLookup(error.result, metadata, topicName, 
startMessageId, conf, callback);
+    });
 }
 
 void ClientImpl::createTableViewAsync(const std::string& topic, const 
TableViewConfiguration& conf,
@@ -507,9 +515,11 @@ void ClientImpl::subscribeAsync(const std::string& topic, 
const std::string& sub
         }
     }
 
-    getPartitionMetadataAsync(topicName).addListener(
-        std::bind(&ClientImpl::handleSubscribe, shared_from_this(), 
std::placeholders::_1,
-                  std::placeholders::_2, topicName, subscriptionName, conf, 
callback));
+    getPartitionMetadataAsync(topicName).addListener([this, 
self{shared_from_this()}, topicName,
+                                                      subscriptionName, conf,
+                                                      callback](const auto& 
error, const auto& metadata) {
+        handleSubscribe(error.result, metadata, topicName, subscriptionName, 
conf, callback);
+    });
 }
 
 void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& 
partitionMetadata,
@@ -604,8 +614,8 @@ GetConnectionFuture ClientImpl::getConnection(const 
std::string& redirectedClust
             useProxy_ = data.proxyThroughServiceUrl;
             lookupCount_++;
             pool_.getConnectionAsync(data.logicalAddress, 
data.physicalAddress, key)
-                .addListener([promise](Result result, const 
ClientConnectionWeakPtr& weakCnx) {
-                    if (result == ResultOk) {
+                .addListener([promise](const auto& error, const 
ClientConnectionWeakPtr& weakCnx) {
+                    if (error.result == ResultOk) {
                         auto cnx = weakCnx.lock();
                         if (cnx) {
                             promise.setValue(cnx);
@@ -613,7 +623,7 @@ GetConnectionFuture ClientImpl::getConnection(const 
std::string& redirectedClust
                             promise.setFailed(ResultConnectError);
                         }
                     } else {
-                        promise.setFailed(result);
+                        promise.setFailed(error.result);
                     }
                 });
         });
@@ -635,8 +645,8 @@ GetConnectionFuture ClientImpl::connect(const std::string& 
redirectedClusterURI,
     const auto& physicalAddress = getPhysicalAddress(redirectedClusterURI, 
logicalAddress);
     Promise<Result, ClientConnectionPtr> promise;
     pool_.getConnectionAsync(logicalAddress, physicalAddress, key)
-        .addListener([promise](Result result, const ClientConnectionWeakPtr& 
weakCnx) {
-            if (result == ResultOk) {
+        .addListener([promise](const auto& error, const 
ClientConnectionWeakPtr& weakCnx) {
+            if (error.result == ResultOk) {
                 auto cnx = weakCnx.lock();
                 if (cnx) {
                     promise.setValue(cnx);
@@ -644,7 +654,7 @@ GetConnectionFuture ClientImpl::connect(const std::string& 
redirectedClusterURI,
                     promise.setFailed(ResultConnectError);
                 }
             } else {
-                promise.setFailed(result);
+                promise.setFailed(error.result);
             }
         });
     return promise.getFuture();
@@ -685,9 +695,10 @@ void ClientImpl::getPartitionsForTopicAsync(const 
std::string& topic, const GetP
             return;
         }
     }
-    
getPartitionMetadataAsync(topicName).addListener(std::bind(&ClientImpl::handleGetPartitions,
-                                                               
shared_from_this(), std::placeholders::_1,
-                                                               
std::placeholders::_2, topicName, callback));
+    getPartitionMetadataAsync(topicName).addListener(
+        [this, self{shared_from_this()}, topicName, callback](const auto& 
error, const auto& metadata) {
+            handleGetPartitions(error.result, metadata, topicName, callback);
+        });
 }
 
 void ClientImpl::closeAsync(const CloseCallback& callback) {
@@ -768,8 +779,8 @@ void ClientImpl::handleClose(Result result, const 
SharedInt& numberOfOpenHandler
         }
 
         LOG_DEBUG("Shutting down producers and consumers for client");
-        // handleClose() is called in ExecutorService's event loop, while 
shutdown() tried to wait the event
-        // loop exits. So here we use another thread to call shutdown().
+        // handleClose() is called in ExecutorService's event loop, while 
shutdown() tried to wait the
+        // event loop exits. So here we use another thread to call shutdown().
         auto self = shared_from_this();
         std::thread shutdownTask{[this, self, callback] {
             shutdown();
@@ -817,9 +828,9 @@ void ClientImpl::shutdown() {
     }
     LOG_DEBUG("ConnectionPool is closed");
 
-    // 500ms as the timeout is long enough because ExecutorService::close 
calls io_service::stop() internally
-    // and waits until io_service::run() in another thread returns, which 
should be as soon as possible after
-    // stop() is called.
+    // 500ms as the timeout is long enough because ExecutorService::close 
calls io_service::stop()
+    // internally and waits until io_service::run() in another thread returns, 
which should be as soon as
+    // possible after stop() is called.
     TimeoutProcessor<std::chrono::milliseconds> timeoutProcessor{500};
 
     timeoutProcessor.tik();
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 7772b15..c046d8e 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -93,7 +93,7 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
      * that exists for the topic.
      */
     void createProducerAsync(const std::string& topic, const 
ProducerConfiguration& conf,
-                             const CreateProducerCallback& callback, bool 
autoDownloadSchema = false);
+                             CreateProducerV2Callback callback, bool 
autoDownloadSchema = false);
 
     void subscribeAsync(const std::string& topic, const std::string& 
subscriptionName,
                         const ConsumerConfiguration& conf, const 
SubscribeCallback& callback);
@@ -161,7 +161,6 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
         std::shared_lock lock(mutex_);
         return lookupServicePtr_->getTopicsOfNamespaceAsync(nsName, mode);
     }
-
     auto getSchema(const TopicNamePtr& topicName, const std::string& version = 
"") {
         std::shared_lock lock(mutex_);
         return lookupServicePtr_->getSchema(topicName, version);
@@ -172,9 +171,9 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
     friend class PulsarFriend;
 
    private:
-    void handleCreateProducer(Result result, const LookupDataResultPtr& 
partitionMetadata,
+    void handleCreateProducer(const Error& error, const LookupDataResultPtr& 
partitionMetadata,
                               const TopicNamePtr& topicName, const 
ProducerConfiguration& conf,
-                              const CreateProducerCallback& callback);
+                              CreateProducerV2Callback callback);
 
     void handleSubscribe(Result result, const LookupDataResultPtr& 
partitionMetadata,
                          const TopicNamePtr& topicName, const std::string& 
consumerName,
@@ -187,8 +186,8 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
     void handleGetPartitions(Result result, const LookupDataResultPtr& 
partitionMetadata,
                              const TopicNamePtr& topicName, const 
GetPartitionsCallback& callback);
 
-    void handleProducerCreated(Result result, const ProducerImplBaseWeakPtr& 
producerWeakPtr,
-                               const CreateProducerCallback& callback, const 
ProducerImplBasePtr& producer);
+    void handleProducerCreated(const Error& error, const 
ProducerImplBaseWeakPtr& producerWeakPtr,
+                               const CreateProducerV2Callback& callback, const 
ProducerImplBasePtr& producer);
     void handleConsumerCreated(Result result, const ConsumerImplBaseWeakPtr& 
consumerWeakPtr,
                                const SubscribeCallback& callback, const 
ConsumerImplBasePtr& consumer);
 
diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc
index 6465ff7..35d815a 100644
--- a/lib/ConnectionPool.cc
+++ b/lib/ConnectionPool.cc
@@ -59,7 +59,7 @@ bool ConnectionPool::close() {
         auto& cnx = kv.second;
         if (cnx) {
             // Close with a fatal error to not let client retry
-            auto& future = cnx->close(ResultAlreadyClosed);
+            auto& future = cnx->close(Error{ResultAlreadyClosed, ""});
             using namespace std::chrono_literals;
             if (auto status = future.wait_for(5s); status != 
std::future_status::ready) {
                 LOG_WARN("Connection close timed out for " << 
cnx.get()->cnxString());
@@ -86,7 +86,7 @@ bool ConnectionPool::close() {
 
 void ConnectionPool::closeAllConnectionsForNewCluster() {
     for (auto&& kv : releaseConnections()) {
-        kv.second->close(ResultDisconnected, true);
+        kv.second->close(Error{ResultDisconnected, ""}, true);
     }
 }
 
@@ -97,12 +97,12 @@ static const std::string getKey(const std::string& 
logicalAddress, const std::st
     return ss.str();
 }
 
-Future<Result, ClientConnectionWeakPtr> 
ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
-                                                                           
const std::string& physicalAddress,
-                                                                           
size_t keySuffix) {
+Future<Error, ClientConnectionWeakPtr> 
ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
+                                                                          
const std::string& physicalAddress,
+                                                                          
size_t keySuffix) {
     if (closed_) {
-        Promise<Result, ClientConnectionWeakPtr> promise;
-        promise.setFailed(ResultAlreadyClosed);
+        Promise<Error, ClientConnectionWeakPtr> promise;
+        promise.setFailed(Error{ResultAlreadyClosed, ""});
         return promise.getFuture();
     }
 
@@ -133,21 +133,21 @@ Future<Result, ClientConnectionWeakPtr> 
ConnectionPool::getConnectionAsync(const
         cnx.reset(new ClientConnection(logicalAddress, physicalAddress, 
*serviceInfo_.load(),
                                        executorProvider_->get(keySuffix), 
clientConfiguration_,
                                        clientVersion_, *this, keySuffix));
-    } catch (Result result) {
-        Promise<Result, ClientConnectionWeakPtr> promise;
-        promise.setFailed(result);
+    } catch (Error error) {
+        Promise<Error, ClientConnectionWeakPtr> promise;
+        promise.setFailed(std::move(error));
         return promise.getFuture();
     } catch (const std::runtime_error& e) {
         lock.unlock();
         LOG_ERROR("Failed to create connection: " << e.what())
-        Promise<Result, ClientConnectionWeakPtr> promise;
-        promise.setFailed(ResultConnectError);
+        Promise<Error, ClientConnectionWeakPtr> promise;
+        promise.setFailed(Error{ResultConnectError, e.what()});
         return promise.getFuture();
     }
 
     LOG_INFO("Created connection for " << key);
 
-    Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
+    auto future = cnx->getConnectFuture();
     pool_.insert(std::make_pair(key, cnx));
 
     lock.unlock();
diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h
index f828ac6..201c1d0 100644
--- a/lib/ConnectionPool.h
+++ b/lib/ConnectionPool.h
@@ -25,6 +25,7 @@
 #include <pulsar/defines.h>
 
 #include <atomic>
+#include <cstddef>
 #include <map>
 #include <memory>
 #include <mutex>
@@ -82,16 +83,16 @@ class PULSAR_PUBLIC ConnectionPool {
      * @param keySuffix the key suffix to choose which connection on the same 
broker
      * @return a future that will produce the ClientCnx object
      */
-    Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const 
std::string& logicalAddress,
-                                                               const 
std::string& physicalAddress,
-                                                               size_t 
keySuffix);
+    Future<Error, ClientConnectionWeakPtr> getConnectionAsync(const 
std::string& logicalAddress,
+                                                              const 
std::string& physicalAddress,
+                                                              size_t 
keySuffix);
 
-    Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const 
std::string& logicalAddress,
-                                                               const 
std::string& physicalAddress) {
+    Future<Error, ClientConnectionWeakPtr> getConnectionAsync(const 
std::string& logicalAddress,
+                                                              const 
std::string& physicalAddress) {
         return getConnectionAsync(logicalAddress, physicalAddress, 
generateRandomIndex());
     }
 
-    Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const 
std::string& address) {
+    Future<Error, ClientConnectionWeakPtr> getConnectionAsync(const 
std::string& address) {
         return getConnectionAsync(address, address);
     }
 
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 85f4994..1060cd5 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -225,13 +225,12 @@ void ConsumerImpl::onNegativeAcksSend(const 
std::set<MessageId>& messageIds) {
     interceptors_->onNegativeAcksSend(Consumer(shared_from_this()), 
messageIds);
 }
 
-Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& 
cnx) {
-    // Do not use bool, only Result.
-    Promise<Result, bool> promise;
+Future<Error, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& 
cnx) {
+    Promise<Error, bool> promise;
 
     if (state_ == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Consumer is already 
closed");
-        promise.setFailed(ResultAlreadyClosed);
+        promise.setFailed(Error{ResultAlreadyClosed, ""});
         return promise.getFuture();
     }
 
@@ -262,10 +261,10 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& c
     auto self = get_shared_this_ptr();
     setFirstRequestIdAfterConnect(requestId);
     cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE")
-        .addListener([this, self, cnx, promise](Result result, const 
ResponseData& responseData) {
-            Result handleResult = handleCreateConsumer(cnx, result);
+        .addListener([this, self, cnx, promise](const Error& error, const 
ResponseData& responseData) {
+            Result handleResult = handleCreateConsumer(cnx, error.result);
             if (handleResult != ResultOk) {
-                promise.setFailed(handleResult);
+                promise.setFailed(Error{handleResult, ""});
                 return;
             }
             promise.setSuccess();
@@ -314,11 +313,11 @@ Result ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result
                     auto name = getName();
                     
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), 
requestId,
                                            "CLOSE_CONSUMER")
-                        .addListener([name](Result result, const 
ResponseData&) {
-                            if (result == ResultOk) {
+                        .addListener([name](const Error& error, const 
ResponseData&) {
+                            if (error.result == ResultOk) {
                                 LOG_INFO(name << "Closed consumer successfully 
after subscribe completed");
                             } else {
-                                LOG_WARN(name << "Failed to close consumer: " 
<< strResult(result));
+                                LOG_WARN(name << "Failed to close consumer: " 
<< strResult(error.result));
                             }
                         });
                 } else {
@@ -328,7 +327,7 @@ Result ConsumerImpl::handleCreateConsumer(const 
ClientConnectionPtr& cnx, Result
                     LOG_WARN(getName()
                              << "Client already closed when subscribe 
completed, close the connection "
                              << cnx->cnxString());
-                    cnx->close(ResultNotConnected);
+                    cnx->close(Error{ResultNotConnected, ""});
                 }
                 return ResultAlreadyClosed;
             }
@@ -423,7 +422,8 @@ void ConsumerImpl::unsubscribeAsync(const ResultCallback& 
originalCallback) {
         SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId);
         auto self = get_shared_this_ptr();
         cnx->sendRequestWithId(cmd, requestId, "UNSUBSCRIBE")
-            .addListener([self, callback](Result result, const ResponseData&) 
{ callback(result); });
+            .addListener(
+                [self, callback](const Error& error, const ResponseData&) { 
callback(error.result); });
     } else {
         Result result = ResultNotConnected;
         lock.unlock();
@@ -1420,7 +1420,7 @@ void ConsumerImpl::closeAsync(const ResultCallback& 
originalCallback) {
     auto requestId = newRequestId();
     auto self = get_shared_this_ptr();
     cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), 
requestId, "CLOSE_CONSUMER")
-        .addListener([self, callback](Result result, const ResponseData&) { 
callback(result); });
+        .addListener([self, callback](const Error& error, const ResponseData&) 
{ callback(error.result); });
 }
 
 const std::string& ConsumerImpl::getName() const { return consumerStr_; }
@@ -1828,12 +1828,13 @@ void ConsumerImpl::seekAsyncInternal(long requestId, 
const SharedBuffer& seek, c
     auto weakSelf = weak_from_this();
 
     cnx->sendRequestWithId(seek, requestId, "SEEK")
-        .addListener([this, weakSelf, previousLastSeekArg](Result result, 
const ResponseData& responseData) {
+        .addListener([this, weakSelf, previousLastSeekArg](const Error& error,
+                                                           const ResponseData& 
responseData) {
             auto self = weakSelf.lock();
             if (!self) {
                 return;
             }
-            if (result == ResultOk) {
+            if (error.result == ResultOk) {
                 LockGuard lock(mutex_);
                 if (getCnx().expired() || reconnectionPending_) {
                     // Reconnection path: delay the seek callback until 
connectionOpened. clearReceiveQueue()
@@ -1859,12 +1860,12 @@ void ConsumerImpl::seekAsyncInternal(long requestId, 
const SharedBuffer& seek, c
                     seekStatus_ = SeekStatus::NOT_STARTED;
                 }  // else: complete the seek future after connection is 
established
             } else {
-                LOG_ERROR(getName() << "Failed to seek: " << result);
+                LOG_ERROR(getName() << "Failed to seek: " << error.result);
                 LockGuard lock{mutex_};
                 seekStatus_ = SeekStatus::NOT_STARTED;
                 lastSeekArg_ = previousLastSeekArg;
                 executor_->postWork([self, 
callback{std::exchange(seekCallback_, std::nullopt).value()},
-                                     result]() { callback(result); });
+                                     result{error.result}]() { 
callback(result); });
             }
         });
 }
@@ -1912,13 +1913,13 @@ void ConsumerImpl::processPossibleToDLQ(const 
MessageId& messageId, const Proces
             if (client) {
                 auto self = get_shared_this_ptr();
                 client->createProducerAsync(
-                    deadLetterPolicy_.getDeadLetterTopic(), 
producerConfiguration,
-                    [self](Result res, const Producer& producer) {
-                        if (res == ResultOk) {
-                            self->deadLetterProducer_->setValue(producer);
+                    deadLetterPolicy_.getDeadLetterTopic(), 
producerConfiguration, [self](const auto& v) {
+                        if (const auto* producer = std::get_if<Producer>(&v)) {
+                            self->deadLetterProducer_->setValue(*producer);
                         } else {
                             LOG_ERROR("Dead letter producer create exception 
with topic: "
-                                      << 
self->deadLetterPolicy_.getDeadLetterTopic() << " ex: " << res);
+                                      << 
self->deadLetterPolicy_.getDeadLetterTopic()
+                                      << " ex: " << std::get<Error>(v).result);
                             self->deadLetterProducer_.reset();
                         }
                     });
@@ -2003,9 +2004,9 @@ void ConsumerImpl::doImmediateAck(const 
ClientConnectionPtr& cnx, const MessageI
         cnx->sendRequestWithId(
                Commands::newAck(consumerId_, msgId.ledgerId(), 
msgId.entryId(), ackSet, ackType, requestId),
                requestId, "ACK")
-            .addListener([callback](Result result, const ResponseData&) {
+            .addListener([callback](const Error& error, const ResponseData&) {
                 if (callback) {
-                    callback(result);
+                    callback(error.result);
                 }
             });
     } else {
@@ -2034,9 +2035,9 @@ void ConsumerImpl::doImmediateAck(const 
ClientConnectionPtr& cnx, const std::set
             auto requestId = newRequestId();
             cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, 
ackMsgIds, requestId), requestId,
                                    "ACK")
-                .addListener([callback](Result result, const ResponseData&) {
+                .addListener([callback](const Error& error, const 
ResponseData&) {
                     if (callback) {
-                        callback(result);
+                        callback(error.result);
                     }
                 });
         } else {
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 6f287aa..e263762 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -166,7 +166,7 @@ class ConsumerImpl : public ConsumerImplBase {
 
    protected:
     // overrided methods from HandlerBase
-    Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx) 
override;
+    Future<Error, bool> connectionOpened(const ClientConnectionPtr& cnx) 
override;
     void connectionFailed(Result result) override;
 
     // impl methods from ConsumerImpl base
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index ffc0e3c..7e2cbb2 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -86,9 +86,8 @@ class ConsumerImplBase : public HandlerBase {
 
    protected:
     // overrided methods from HandlerBase
-    Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx) 
override {
-        // Do not use bool, only Result.
-        Promise<Result, bool> promise;
+    Future<Error, bool> connectionOpened(const ClientConnectionPtr& cnx) 
override {
+        Promise<Error, bool> promise;
         promise.setSuccess();
         return promise.getFuture();
     }
diff --git a/lib/HTTPLookupService.cc b/lib/HTTPLookupService.cc
index 0be9713..08352eb 100644
--- a/lib/HTTPLookupService.cc
+++ b/lib/HTTPLookupService.cc
@@ -22,6 +22,8 @@
 
 #include <boost/property_tree/json_parser.hpp>
 #include <boost/property_tree/ptree.hpp>
+#include <sstream>
+#include <stdexcept>
 
 #include "CurlWrapper.h"
 #include "ExecutorService.h"
@@ -79,7 +81,7 @@ auto HTTPLookupService::getBroker(const TopicName &topicName) 
-> LookupResultFut
     auto self = shared_from_this();
     executorProvider_->get()->postWork([this, self, promise, completeUrl] {
         std::string responseData;
-        Result result = sendHTTPRequest(completeUrl, responseData);
+        Result result = sendHTTPRequest(completeUrl, responseData).result;
 
         if (result != ResultOk) {
             promise.setFailed(result);
@@ -93,7 +95,7 @@ auto HTTPLookupService::getBroker(const TopicName &topicName) 
-> LookupResultFut
     return promise.getFuture();
 }
 
-Future<Result, LookupDataResultPtr> 
HTTPLookupService::getPartitionMetadataAsync(
+Future<Error, LookupDataResultPtr> 
HTTPLookupService::getPartitionMetadataAsync(
     const TopicNamePtr &topicName) {
     LookupPromise promise;
     std::stringstream completeUrlStream;
@@ -148,9 +150,9 @@ Future<Result, NamespaceTopicsPtr> 
HTTPLookupService::getTopicsOfNamespaceAsync(
     return promise.getFuture();
 }
 
-Future<Result, SchemaInfo> HTTPLookupService::getSchema(const TopicNamePtr 
&topicName,
-                                                        const std::string 
&version) {
-    Promise<Result, SchemaInfo> promise;
+Future<Error, SchemaInfo> HTTPLookupService::getSchema(const TopicNamePtr 
&topicName,
+                                                       const std::string 
&version) {
+    Promise<Error, SchemaInfo> promise;
     std::stringstream completeUrlStream;
 
     const auto &url = serviceNameResolver_.resolveHost();
@@ -166,7 +168,6 @@ Future<Result, SchemaInfo> 
HTTPLookupService::getSchema(const TopicNamePtr &topi
     if (!version.empty()) {
         completeUrlStream << "/" << fromBigEndianBytes(version);
     }
-
     
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleGetSchemaHTTPRequest,
                                                  shared_from_this(), promise, 
completeUrlStream.str()));
     return promise.getFuture();
@@ -175,7 +176,7 @@ Future<Result, SchemaInfo> 
HTTPLookupService::getSchema(const TopicNamePtr &topi
 void HTTPLookupService::handleNamespaceTopicsHTTPRequest(const 
NamespaceTopicsPromise &promise,
                                                          const std::string 
&completeUrl) {
     std::string responseData;
-    Result result = sendHTTPRequest(completeUrl, responseData);
+    Result result = sendHTTPRequest(completeUrl, responseData).result;
 
     if (result != ResultOk) {
         promise.setFailed(result);
@@ -184,25 +185,27 @@ void 
HTTPLookupService::handleNamespaceTopicsHTTPRequest(const NamespaceTopicsPr
     }
 }
 
-Result HTTPLookupService::sendHTTPRequest(const std::string &completeUrl, 
std::string &responseData) {
+Error HTTPLookupService::sendHTTPRequest(const std::string &completeUrl, 
std::string &responseData) {
     long responseCode = -1;
     return sendHTTPRequest(completeUrl, responseData, responseCode);
 }
 
-Result HTTPLookupService::sendHTTPRequest(const std::string &completeUrl, 
std::string &responseData,
-                                          long &responseCode) {
-    // Authorization data
+Error HTTPLookupService::sendHTTPRequest(const std::string &completeUrl, 
std::string &responseData,
+                                         long &responseCode) {
     AuthenticationDataPtr authDataContent;
     Result authResult = authenticationPtr_->getAuthData(authDataContent);
     if (authResult != ResultOk) {
-        LOG_ERROR("Failed to getAuthData: " << authResult);
-        return authResult;
+        std::ostringstream message;
+        message << "Failed to getAuthData: " << authResult;
+        LOG_ERROR(message.str());
+        return Error{authResult, message.str()};
     }
 
     CurlWrapper curl;
     if (!curl.init()) {
-        LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
-        return ResultLookupError;
+        const std::string message = "Unable to curl_easy_init for url " + 
completeUrl;
+        LOG_ERROR(message);
+        return Error{ResultLookupError, message};
     }
 
     std::unique_ptr<CurlWrapper::TlsContext> tlsContext;
@@ -226,41 +229,39 @@ Result HTTPLookupService::sendHTTPRequest(const 
std::string &completeUrl, std::s
     options.userAgent = std::string("Pulsar-CPP-v") + PULSAR_VERSION_STR;
     options.maxLookupRedirects = maxLookupRedirects_;
     auto result = curl.get(completeUrl, authDataContent->getHttpHeaders(), 
options, tlsContext.get());
-    const auto &error = result.error;
-    if (!error.empty()) {
-        LOG_ERROR(completeUrl << " failed: " << error);
-        return ResultConnectError;
-    }
 
     responseData = result.responseData;
     responseCode = result.responseCode;
-    auto res = result.code;
+
+    const auto res = result.code;
     if (res == CURLE_OK) {
         LOG_INFO("Response received for url " << completeUrl << " responseCode 
" << responseCode);
-    } else if (res == CURLE_TOO_MANY_REDIRECTS) {
-        LOG_ERROR("Response received for url " << completeUrl << ": " << 
curl_easy_strerror(res)
-                                               << ", curl error: " << 
result.serverError
-                                               << ", redirect URL: " << 
result.redirectUrl);
+        return Error{};
+    }
+
+    std::ostringstream message;
+    if (res == CURLE_TOO_MANY_REDIRECTS) {
+        message << "Response received for url " << completeUrl << ": " << 
curl_easy_strerror(res)
+                << ", curl error: " << result.serverError << ", redirect URL: 
" << result.redirectUrl;
     } else {
-        LOG_ERROR("Response failed for url " << completeUrl << ": " << 
curl_easy_strerror(res)
-                                             << ", curl error: " << 
result.serverError);
+        message << "Response failed for url " << completeUrl << ": " << 
curl_easy_strerror(res)
+                << ", curl error: " << result.serverError;
     }
+    LOG_ERROR(message.str());
 
     switch (res) {
-        case CURLE_OK:
-            return ResultOk;
         case CURLE_COULDNT_CONNECT:
-            return ResultRetryable;
+            return Error{ResultRetryable, message.str()};
         case CURLE_COULDNT_RESOLVE_PROXY:
         case CURLE_COULDNT_RESOLVE_HOST:
         case CURLE_HTTP_RETURNED_ERROR:
-            return ResultConnectError;
+            return Error{ResultConnectError, message.str()};
         case CURLE_READ_ERROR:
-            return ResultReadError;
+            return Error{ResultReadError, message.str()};
         case CURLE_OPERATION_TIMEDOUT:
-            return ResultTimeout;
+            return Error{ResultTimeout, message.str()};
         default:
-            return ResultLookupError;
+            return Error{ResultLookupError, message.str()};
     }
 }
 
@@ -353,10 +354,10 @@ NamespaceTopicsPtr 
HTTPLookupService::parseNamespaceTopicsData(const std::string
 void HTTPLookupService::handleLookupHTTPRequest(const LookupPromise &promise, 
const std::string &completeUrl,
                                                 RequestType requestType) {
     std::string responseData;
-    Result result = sendHTTPRequest(completeUrl, responseData);
+    Error error = sendHTTPRequest(completeUrl, responseData);
 
-    if (result != ResultOk) {
-        promise.setFailed(result);
+    if (error.result != ResultOk) {
+        promise.setFailed(std::move(error));
     } else {
         promise.setValue((requestType == PartitionMetaData) ? 
parsePartitionData(responseData)
                                                             : 
parseLookupData(responseData));
@@ -367,12 +368,12 @@ void HTTPLookupService::handleGetSchemaHTTPRequest(const 
GetSchemaPromise &promi
                                                    const std::string 
&completeUrl) {
     std::string responseData;
     long responseCode = -1;
-    Result result = sendHTTPRequest(completeUrl, responseData, responseCode);
+    Error error = sendHTTPRequest(completeUrl, responseData, responseCode);
 
     if (responseCode == 404) {
-        promise.setFailed(ResultTopicNotFound);
-    } else if (result != ResultOk) {
-        promise.setFailed(result);
+        promise.setFailed(Error{ResultTopicNotFound, ""});
+    } else if (error.result != ResultOk) {
+        promise.setFailed(std::move(error));
     } else {
         ptree::ptree root;
         std::stringstream stream(responseData);
@@ -381,20 +382,24 @@ void HTTPLookupService::handleGetSchemaHTTPRequest(const 
GetSchemaPromise &promi
         } catch (ptree::json_parser_error &e) {
             LOG_ERROR("Failed to parse json of Partition Metadata: " << 
e.what()
                                                                      << 
"\nInput Json = " << responseData);
-            promise.setFailed(ResultInvalidMessage);
+            promise.setFailed(Error{ResultInvalidMessage,
+                                    "Failed to parse json of Partition 
Metadata: " + std::string(e.what()) +
+                                        "\nInput Json = " + responseData});
             return;
         }
         const std::string defaultNotFoundString = "Not found";
         auto schemaTypeStr = root.get<std::string>("type", 
defaultNotFoundString);
         if (schemaTypeStr == defaultNotFoundString) {
             LOG_ERROR("malformed json! - type not present" << responseData);
-            promise.setFailed(ResultInvalidMessage);
+            promise.setFailed(
+                Error{ResultInvalidMessage, "malformed json! - type not 
present" + responseData});
             return;
         }
         auto schemaData = root.get<std::string>("data", defaultNotFoundString);
         if (schemaData == defaultNotFoundString) {
             LOG_ERROR("malformed json! - data not present" << responseData);
-            promise.setFailed(ResultInvalidMessage);
+            promise.setFailed(
+                Error{ResultInvalidMessage, "malformed json! - data not 
present" + responseData});
             return;
         }
 
@@ -407,7 +412,9 @@ void HTTPLookupService::handleGetSchemaHTTPRequest(const 
GetSchemaPromise &promi
             } catch (ptree::json_parser_error &e) {
                 LOG_ERROR("Failed to parse json of Partition Metadata: " << 
e.what()
                                                                          << 
"\nInput Json = " << schemaData);
-                promise.setFailed(ResultInvalidMessage);
+                promise.setFailed(Error{ResultInvalidMessage, "Failed to parse 
json of Partition Metadata: " +
+                                                                  
std::string(e.what()) +
+                                                                  "\nInput 
Json = " + schemaData});
                 return;
             }
             const auto keyData = toJson(kvRoot.get_child("key"));
diff --git a/lib/HTTPLookupService.h b/lib/HTTPLookupService.h
index 61a0615..9a217fc 100644
--- a/lib/HTTPLookupService.h
+++ b/lib/HTTPLookupService.h
@@ -32,7 +32,7 @@ namespace pulsar {
 class ServiceNameResolver;
 using NamespaceTopicsPromise = Promise<Result, NamespaceTopicsPtr>;
 using NamespaceTopicsPromisePtr = std::shared_ptr<NamespaceTopicsPromise>;
-using GetSchemaPromise = Promise<Result, SchemaInfo>;
+using GetSchemaPromise = Promise<Error, SchemaInfo>;
 
 class HTTPLookupService : public LookupService, public 
std::enable_shared_from_this<HTTPLookupService> {
     enum RequestType : uint8_t
@@ -41,7 +41,7 @@ class HTTPLookupService : public LookupService, public 
std::enable_shared_from_t
         PartitionMetaData
     };
 
-    typedef Promise<Result, LookupDataResultPtr> LookupPromise;
+    typedef Promise<Error, LookupDataResultPtr> LookupPromise;
 
     ExecutorServiceProviderPtr executorProvider_;
     ServiceNameResolver serviceNameResolver_;
@@ -64,18 +64,18 @@ class HTTPLookupService : public LookupService, public 
std::enable_shared_from_t
                                           const std::string& completeUrl);
     void handleGetSchemaHTTPRequest(const GetSchemaPromise& promise, const 
std::string& completeUrl);
 
-    Result sendHTTPRequest(const std::string& completeUrl, std::string& 
responseData);
+    Error sendHTTPRequest(const std::string& completeUrl, std::string& 
responseData);
 
-    Result sendHTTPRequest(const std::string& completeUrl, std::string& 
responseData, long& responseCode);
+    Error sendHTTPRequest(const std::string& completeUrl, std::string& 
responseData, long& responseCode);
 
    public:
     HTTPLookupService(const ServiceInfo& serviceInfo, const 
ClientConfiguration& config);
 
     LookupResultFuture getBroker(const TopicName& topicName) override;
 
-    Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr&) override;
+    Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr& topicName) override;
 
-    Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const 
std::string& version) override;
+    Future<Error, SchemaInfo> getSchema(const TopicNamePtr& topicName, const 
std::string& version) override;
 
     Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
         const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) 
override;
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index bc8cd53..5bccc0b 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -123,16 +123,15 @@ void HandlerBase::grabCnx(const optional<std::string>& 
assignedBrokerUrl) {
     auto before = high_resolution_clock::now();
     cnxFuture.addListener([this, self, before](Result result, const 
ClientConnectionPtr& cnx) {
         if (result == ResultOk) {
-            connectionOpened(cnx).addListener([this, self, before](Result 
result, bool) {
-                // Do not use bool, only Result.
+            connectionOpened(cnx).addListener([this, self, before](const 
Error& error, bool) {
                 reconnectionPending_ = false;
-                if (result == ResultOk) {
+                if (error.result == ResultOk) {
                     connectionTimeMs_ =
                         
duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
                     // Prevent the creationTimer_ from cancelling the timer_ 
in future
                     cancelTimer(*creationTimer_);
                     LOG_INFO("Finished connecting to broker after " << 
connectionTimeMs_ << " ms")
-                } else if (isResultRetryable(result)) {
+                } else if (isResultRetryable(error.result)) {
                     scheduleReconnection();
                 }
             });
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 229404e..1612415 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -93,7 +93,7 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
      * @return ResultError if there was a failure. ResultRetryable if 
reconnection is needed.
      * @return Do not use bool, only Result.
      */
-    virtual Future<Result, bool> connectionOpened(const ClientConnectionPtr& 
connection) = 0;
+    virtual Future<Error, bool> connectionOpened(const ClientConnectionPtr& 
connection) = 0;
 
     virtual void connectionFailed(Result result) = 0;
 
diff --git a/lib/LookupDataResult.h b/lib/LookupDataResult.h
index 81e50cc..8f6e740 100644
--- a/lib/LookupDataResult.h
+++ b/lib/LookupDataResult.h
@@ -29,7 +29,7 @@
 namespace pulsar {
 class LookupDataResult;
 typedef std::shared_ptr<LookupDataResult> LookupDataResultPtr;
-typedef Promise<Result, LookupDataResultPtr> LookupDataResultPromise;
+typedef Promise<Error, LookupDataResultPtr> LookupDataResultPromise;
 typedef std::shared_ptr<LookupDataResultPromise> LookupDataResultPromisePtr;
 
 class LookupDataResult {
diff --git a/lib/LookupService.h b/lib/LookupService.h
index 684984f..a18face 100644
--- a/lib/LookupService.h
+++ b/lib/LookupService.h
@@ -50,6 +50,7 @@ class LookupService {
                       << ", physical address: " << 
lookupResult.physicalAddress;
         }
     };
+    // TODO: change it to Error
     using LookupResultFuture = Future<Result, LookupResult>;
     using LookupResultPromise = Promise<Result, LookupResult>;
 
@@ -67,7 +68,7 @@ class LookupService {
      *
      * Gets Partition metadata
      */
-    virtual Future<Result, LookupDataResultPtr> 
getPartitionMetadataAsync(const TopicNamePtr& topicName) = 0;
+    virtual Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr& topicName) = 0;
 
     /**
      * @param   namespace - namespace-name
@@ -84,8 +85,8 @@ class LookupService {
      * @param version the schema version byte array, if it's empty, use the 
latest version
      * @return SchemaInfo
      */
-    virtual Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName,
-                                                 const std::string& version = 
"") = 0;
+    virtual Future<Error, SchemaInfo> getSchema(const TopicNamePtr& topicName,
+                                                const std::string& version = 
"") = 0;
 
     virtual ServiceNameResolver& getServiceNameResolver() = 0;
 
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index a569978..5e1b517 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -188,11 +188,11 @@ Future<Result, Consumer> 
MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
             return topicPromise->getFuture();
         }
         client->getPartitionMetadataAsync(topicName).addListener(
-            [this, topicName, topicPromise](Result result, const 
LookupDataResultPtr& lookupDataResult) {
-                if (result != ResultOk) {
+            [this, topicName, topicPromise](const auto& error, const 
LookupDataResultPtr& lookupDataResult) {
+                if (error.result != ResultOk) {
                     LOG_ERROR("Error Checking/Getting Partition Metadata while 
MultiTopics Subscribing- "
-                              << consumerStr_ << " result: " << result)
-                    topicPromise->setFailed(result);
+                              << consumerStr_ << " result: " << error)
+                    topicPromise->setFailed(error.result);
                     return;
                 }
                 subscribeTopicPartitions(lookupDataResult->getPartitions(), 
topicName, subscriptionName_,
@@ -1010,11 +1010,12 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() {
             return;
         }
         client->getPartitionMetadataAsync(topicName).addListener(
-            [this, weakSelf, topicName, currentNumPartitions](Result result,
+            [this, weakSelf, topicName, currentNumPartitions](const auto& 
error,
                                                               const 
LookupDataResultPtr& lookupDataResult) {
                 auto self = weakSelf.lock();
                 if (self) {
-                    this->handleGetPartitions(topicName, result, 
lookupDataResult, currentNumPartitions);
+                    this->handleGetPartitions(topicName, error.result, 
lookupDataResult,
+                                              currentNumPartitions);
                 }
             });
     }
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 1aa5c87..586a0ed 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -143,7 +143,7 @@ void PartitionedProducerImpl::start() {
 }
 
 void PartitionedProducerImpl::handleSinglePartitionProducerCreated(
-    Result result, const ProducerImplBaseWeakPtr& producerWeakPtr, unsigned 
int partitionIndex) {
+    const Error& error, const ProducerImplBaseWeakPtr& producerWeakPtr, 
unsigned int partitionIndex) {
     // to indicate, we are doing cleanup using closeAsync after producer create
     // has failed and the invocation of closeAsync is not from client
     const auto numPartitions = getNumPartitionsWithLock();
@@ -161,9 +161,10 @@ void 
PartitionedProducerImpl::handleSinglePartitionProducerCreated(
         return;
     }
 
-    if (result != ResultOk) {
-        LOG_ERROR("Unable to create Producer for partition - " << 
partitionIndex << " Error - " << result);
-        partitionedProducerCreatedPromise_.setFailed(result);
+    if (error.result != ResultOk) {
+        LOG_ERROR("Unable to create Producer for partition - " << 
partitionIndex << " Error - "
+                                                               << 
error.result);
+        partitionedProducerCreatedPromise_.setFailed(error);
         state_ = Failed;
         if (++numProducersCreated_ == numPartitions) {
             closeAsync(nullptr);
@@ -231,11 +232,11 @@ void PartitionedProducerImpl::sendAsync(const Message& 
msg, SendCallback callbac
     } else {
         // Wrapping the callback into a lambda has overhead, so we check if 
the producer is ready first
         producer->getProducerCreatedFuture().addListener(
-            [msg, callback](Result result, const ProducerImplBaseWeakPtr& 
weakProducer) {
-                if (result == ResultOk) {
+            [msg, callback](const Error& error, const ProducerImplBaseWeakPtr& 
weakProducer) {
+                if (error.result == ResultOk) {
                     weakProducer.lock()->sendAsync(msg, callback);
                 } else if (callback) {
-                    callback(result, {});
+                    callback(error.result, {});
                 }
             });
     }
@@ -251,7 +252,7 @@ void PartitionedProducerImpl::internalShutdown() {
     if (client) {
         client->cleanupProducer(this);
     }
-    partitionedProducerCreatedPromise_.setFailed(ResultAlreadyClosed);
+    partitionedProducerCreatedPromise_.setFailed(Error{ResultAlreadyClosed, 
""});
     state_ = Closed;
 }
 
@@ -353,14 +354,14 @@ void 
PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
         // is set second time here, first time it was successful. So check
         // if there's any adverse effect of setting it again. It should not
         // be but must check. MUSTCHECK changeme
-        partitionedProducerCreatedPromise_.setFailed(ResultUnknownError);
+        partitionedProducerCreatedPromise_.setFailed(Error{ResultUnknownError, 
""});
         callback(result);
         return;
     }
 }
 
 // override
-Future<Result, ProducerImplBaseWeakPtr> 
PartitionedProducerImpl::getProducerCreatedFuture() {
+Future<Error, ProducerImplBaseWeakPtr> 
PartitionedProducerImpl::getProducerCreatedFuture() {
     return partitionedProducerCreatedPromise_.getFuture();
 }
 
@@ -436,10 +437,10 @@ void PartitionedProducerImpl::getPartitionMetadata() {
         return;
     }
     client->getPartitionMetadataAsync(topicName_)
-        .addListener([weakSelf](Result result, const LookupDataResultPtr& 
lookupDataResult) {
+        .addListener([weakSelf](const auto& error, const LookupDataResultPtr& 
lookupDataResult) {
             auto self = weakSelf.lock();
             if (self) {
-                self->handleGetPartitions(result, lookupDataResult);
+                self->handleGetPartitions(error.result, lookupDataResult);
             }
         });
 }
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index 94ba717..dc06af0 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -78,12 +78,12 @@ class PartitionedProducerImpl : public ProducerImplBase,
     void internalShutdown();
     bool isClosed() override;
     const std::string& getTopic() const override;
-    Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture() 
override;
+    Future<Error, ProducerImplBaseWeakPtr> getProducerCreatedFuture() override;
     void triggerFlush() override;
     void flushAsync(FlushCallback callback) override;
     bool isConnected() const override;
     uint64_t getNumberOfConnectedProducer() override;
-    void handleSinglePartitionProducerCreated(Result result,
+    void handleSinglePartitionProducerCreated(const Error& error,
                                               const ProducerImplBaseWeakPtr& 
producerBaseWeakPtr,
                                               const unsigned int 
partitionIndex);
     void createLazyPartitionProducer(const unsigned int partitionIndex);
@@ -121,7 +121,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
     std::atomic<State> state_{Pending};
 
     // only set this promise to value, when producers on all partitions are 
created.
-    Promise<Result, ProducerImplBaseWeakPtr> 
partitionedProducerCreatedPromise_;
+    Promise<Error, ProducerImplBaseWeakPtr> partitionedProducerCreatedPromise_;
 
     std::unique_ptr<TopicMetadata> topicMetadata_;
 
diff --git a/lib/PendingRequest.h b/lib/PendingRequest.h
index 465073f..7014a02 100644
--- a/lib/PendingRequest.h
+++ b/lib/PendingRequest.h
@@ -43,7 +43,7 @@ class PendingRequest : public 
std::enable_shared_from_this<PendingRequest<T>> {
                 return;
             }
             timeoutCallback_();
-            promise_.setFailed(ResultTimeout);
+            promise_.setFailed(Error{ResultTimeout, ""});
         });
     }
 
@@ -52,8 +52,8 @@ class PendingRequest : public 
std::enable_shared_from_this<PendingRequest<T>> {
         cancelTimer(timer_);
     }
 
-    void fail(Result result) {
-        promise_.setFailed(result);
+    void fail(const Error& error) {
+        promise_.setFailed(error);
         cancelTimer(timer_);
     }
 
@@ -65,7 +65,7 @@ class PendingRequest : public 
std::enable_shared_from_this<PendingRequest<T>> {
 
    private:
     ASIO::steady_timer timer_;
-    Promise<Result, T> promise_;
+    Promise<Error, T> promise_;
     std::function<void()> timeoutCallback_;
     std::atomic_bool timeoutDisabled_{false};
 };
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 7632581..873796d 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -135,13 +135,12 @@ void 
ProducerImpl::beforeConnectionChange(ClientConnection& connection) {
     connection.removeProducer(producerId_);
 }
 
-Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& 
cnx) {
-    // Do not use bool, only Result.
-    Promise<Result, bool> promise;
+Future<Error, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& 
cnx) {
+    Promise<Error, bool> promise;
 
     if (state_ == Closed) {
         LOG_DEBUG(getName() << "connectionOpened : Producer is already 
closed");
-        promise.setFailed(ResultAlreadyClosed);
+        promise.setFailed(Error{ResultAlreadyClosed, ""});
         return promise.getFuture();
     }
 
@@ -162,12 +161,12 @@ Future<Result, bool> ProducerImpl::connectionOpened(const 
ClientConnectionPtr& c
     auto self = shared_from_this();
     setFirstRequestIdAfterConnect(requestId);
     cnx->sendRequestWithId(cmd, requestId, "PRODUCER")
-        .addListener([this, self, cnx, promise](Result result, const 
ResponseData& responseData) {
-            Result handleResult = handleCreateProducer(cnx, result, 
responseData);
-            if (handleResult == ResultOk) {
+        .addListener([this, self, cnx, promise](const Error& error, const 
ResponseData& responseData) {
+            auto handledError = handleCreateProducer(cnx, error, responseData);
+            if (handledError.result == ResultOk) {
                 promise.setSuccess();
             } else {
-                promise.setFailed(handleResult);
+                promise.setFailed(handledError);
             }
         });
 
@@ -182,22 +181,23 @@ void ProducerImpl::connectionFailed(Result result) {
         // if producers are lazy, then they should always try to restart
         // so don't change the state and allow reconnections
         return;
-    } else if (!isResultRetryable(result) && 
producerCreatedPromise_.setFailed(result)) {
+    } else if (!isResultRetryable(result) && 
producerCreatedPromise_.setFailed(Error{result, ""})) {
         state_ = Failed;
     }
 }
 
-Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, 
Result result,
-                                          const ResponseData& responseData) {
+Error ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, const 
Error& error,
+                                         const ResponseData& responseData) {
     Result handleResult = ResultOk;
 
     Lock lock(mutex_);
 
-    LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << 
strResult(result));
+    LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << 
error);
 
     // make sure we're still in the Pending/Ready state, closeAsync could have 
been invoked
     // while waiting for this response if using lazy producers
     const auto state = state_.load();
+    const auto result = error.result;
     if (state != Ready && state != Pending) {
         LOG_DEBUG("Producer created response received but producer already 
closed");
         failPendingMessages(ResultAlreadyClosed, false);
@@ -211,9 +211,9 @@ Result ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result
         }
         if (!producerCreatedPromise_.isComplete()) {
             lock.unlock();
-            producerCreatedPromise_.setFailed(ResultAlreadyClosed);
+            producerCreatedPromise_.setFailed(Error{ResultAlreadyClosed, ""});
         }
-        return ResultAlreadyClosed;
+        return Error{ResultAlreadyClosed, ""};
     }
 
     if (result == ResultOk) {
@@ -281,7 +281,7 @@ Result ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result
                 client->cleanupProducer(this);
             }
             lock.unlock();
-            producerCreatedPromise_.setFailed(result);
+            producerCreatedPromise_.setFailed(error);
             handleResult = result;
         } else if (producerCreatedPromise_.isComplete() || 
retryOnCreationError_) {
             if (result == ResultProducerBlockedQuotaExceededException) {
@@ -292,24 +292,25 @@ Result ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result
             }
 
             // Producer had already been initially created, we need to retry 
connecting in any case
-            LOG_WARN(getName() << "Failed to reconnect producer: " << 
strResult(result));
+            LOG_WARN(getName() << "Failed to reconnect producer: " << error);
             handleResult = ResultRetryable;
         } else {
             // Producer was not yet created, retry to connect to broker if 
it's possible
             handleResult = convertToTimeoutIfNecessary(result, 
creationTimestamp_);
+            Error convertedError{handleResult, error.message};
             if (isResultRetryable(handleResult)) {
-                LOG_WARN(getName() << "Temporary error in creating producer: " 
<< strResult(handleResult));
+                LOG_WARN(getName() << "Temporary error in creating producer: " 
<< convertedError);
             } else {
-                LOG_ERROR(getName() << "Failed to create producer: " << 
strResult(handleResult));
+                LOG_ERROR(getName() << "Failed to create producer: " << error);
                 failPendingMessages(handleResult, false);
                 state_ = Failed;
                 lock.unlock();
-                producerCreatedPromise_.setFailed(handleResult);
+                producerCreatedPromise_.setFailed(convertedError);
             }
         }
     }
 
-    return handleResult;
+    return handleResult == error.result ? error : Error{handleResult, 
error.message};
 }
 
 auto ProducerImpl::getPendingCallbacksWhenFailed() -> 
decltype(pendingMessagesQueue_) {
@@ -825,10 +826,10 @@ void ProducerImpl::closeAsync(CloseCallback 
originalCallback) {
     int requestId = client->newRequestId();
     auto self = shared_from_this();
     cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), 
requestId, "CLOSE_PRODUCER")
-        .addListener([self, callback](Result result, const ResponseData&) { 
callback(result); });
+        .addListener([self, callback](const Error& error, const ResponseData&) 
{ callback(error.result); });
 }
 
-Future<Result, ProducerImplBaseWeakPtr> 
ProducerImpl::getProducerCreatedFuture() {
+Future<Error, ProducerImplBaseWeakPtr> 
ProducerImpl::getProducerCreatedFuture() {
     return producerCreatedPromise_.getFuture();
 }
 
@@ -1022,7 +1023,7 @@ void ProducerImpl::internalShutdown() {
         client->cleanupProducer(this);
     }
     cancelTimers();
-    producerCreatedPromise_.setFailed(ResultAlreadyClosed);
+    producerCreatedPromise_.setFailed(Error{ResultAlreadyClosed, ""});
     state_ = Closed;
 }
 
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 26207f8..e750253 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -87,7 +87,7 @@ class ProducerImpl : public HandlerBase, public 
ProducerImplBase {
     void internalShutdown();
     bool isClosed() override;
     const std::string& getTopic() const override;
-    Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture() 
override;
+    Future<Error, ProducerImplBaseWeakPtr> getProducerCreatedFuture() override;
     void triggerFlush() override;
     void flushAsync(FlushCallback callback) override;
     bool isConnected() const override;
@@ -133,15 +133,15 @@ class ProducerImpl : public HandlerBase, public 
ProducerImplBase {
 
     // overrided methods from HandlerBase
     void beforeConnectionChange(ClientConnection& connection) override;
-    Future<Result, bool> connectionOpened(const ClientConnectionPtr& 
connection) override;
+    Future<Error, bool> connectionOpened(const ClientConnectionPtr& 
connection) override;
     void connectionFailed(Result result) override;
     const std::string& getName() const override { return producerStr_; }
 
    private:
     void printStats();
 
-    Result handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
-                                const ResponseData& responseData);
+    Error handleCreateProducer(const ClientConnectionPtr& cnx, const Error& 
error,
+                               const ResponseData& responseData);
 
     void resendMessages(const ClientConnectionPtr& cnx);
 
@@ -195,7 +195,7 @@ class ProducerImpl : public HandlerBase, public 
ProducerImplBase {
     using DurationType = TimeDuration;
     void asyncWaitSendTimeout(DurationType expiryTime);
 
-    Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
+    Promise<Error, ProducerImplBaseWeakPtr> producerCreatedPromise_;
 
     struct PendingCallbacks;
     decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailed();
diff --git a/lib/ProducerImplBase.h b/lib/ProducerImplBase.h
index 25a12c8..72607e2 100644
--- a/lib/ProducerImplBase.h
+++ b/lib/ProducerImplBase.h
@@ -44,7 +44,7 @@ class ProducerImplBase {
     virtual bool isClosed() = 0;
     virtual void shutdown() = 0;
     virtual const std::string& getTopic() const = 0;
-    virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture() 
= 0;
+    virtual Future<Error, ProducerImplBaseWeakPtr> getProducerCreatedFuture() 
= 0;
     virtual void triggerFlush() = 0;
     virtual void flushAsync(FlushCallback callback) = 0;
     virtual bool isConnected() const = 0;
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index 7f50cf1..3066d48 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -53,11 +53,12 @@ class RetryableLookupService : public LookupService {
     }
 
     LookupResultFuture getBroker(const TopicName& topicName) override {
-        return lookupCache_->run("get-broker-" + topicName.toString(),
-                                 [this, topicName] { return 
lookupService_->getBroker(topicName); });
+        return toResultFuture(lookupCache_->run("get-broker-" + 
topicName.toString(), [this, topicName] {
+            return toErrorFuture(lookupService_->getBroker(topicName));
+        }));
     }
 
-    Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr& topicName) override {
+    Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr& topicName) override {
         return partitionLookupCache_->run(
             "get-partition-metadata-" + topicName->toString(),
             [this, topicName] { return 
lookupService_->getPartitionMetadataAsync(topicName); });
@@ -65,13 +66,15 @@ class RetryableLookupService : public LookupService {
 
     Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
         const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) 
override {
-        return namespaceLookupCache_->run(
+        return toResultFuture(namespaceLookupCache_->run(
             "get-topics-of-namespace-" + nsName->toString() + "-" + 
std::to_string(mode),
-            [this, nsName, mode] { return 
lookupService_->getTopicsOfNamespaceAsync(nsName, mode); });
+            [this, nsName, mode] {
+                return 
toErrorFuture(lookupService_->getTopicsOfNamespaceAsync(nsName, mode));
+            }));
     }
 
-    Future<Result, SchemaInfo> getSchema(const TopicNamePtr& topicName, const 
std::string& version) override {
-        return getSchemaCache_->run("get-schema" + topicName->toString(), 
[this, topicName, version] {
+    Future<Error, SchemaInfo> getSchema(const TopicNamePtr& topicName, const 
std::string& version) override {
+        return getSchemaCache_->run(getSchemaCacheKey(topicName, version), 
[this, topicName, version] {
             return lookupService_->getSchema(topicName, version);
         });
     }
@@ -81,6 +84,36 @@ class RetryableLookupService : public LookupService {
     }
 
    private:
+    template <typename T>
+    static Future<Error, T> toErrorFuture(Future<Result, T> future) {
+        Promise<Error, T> promise;
+        future.addListener([promise](Result result, const T& value) {
+            if (result == ResultOk) {
+                promise.setValue(value);
+            } else {
+                promise.setFailed({result, ""});
+            }
+        });
+        return promise.getFuture();
+    }
+
+    template <typename T>
+    static Future<Result, T> toResultFuture(Future<Error, T> future) {
+        Promise<Result, T> promise;
+        future.addListener([promise](const Error& error, const T& value) {
+            if (error.result == ResultOk) {
+                promise.setValue(value);
+            } else {
+                promise.setFailed(error.result);
+            }
+        });
+        return promise.getFuture();
+    }
+
+    static std::string getSchemaCacheKey(const TopicNamePtr& topicName, const 
std::string& version) {
+        return "get-schema-" + topicName->toString() + "-" + version;
+    }
+
     const std::shared_ptr<LookupService> lookupService_;
     RetryableOperationCachePtr<LookupResult> lookupCache_;
     RetryableOperationCachePtr<LookupDataResultPtr> partitionLookupCache_;
diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h
index f4b056e..2826390 100644
--- a/lib/RetryableOperation.h
+++ b/lib/RetryableOperation.h
@@ -41,7 +41,7 @@ class RetryableOperation : public 
std::enable_shared_from_this<RetryableOperatio
         explicit PassKey() {}
     };
 
-    RetryableOperation(const std::string& name, std::function<Future<Result, 
T>()>&& func,
+    RetryableOperation(const std::string& name, std::function<Future<Error, 
T>()>&& func,
                        TimeDuration timeout, DeadlineTimerPtr timer)
         : name_(name),
           func_(std::move(func)),
@@ -58,7 +58,7 @@ class RetryableOperation : public 
std::enable_shared_from_this<RetryableOperatio
         return std::make_shared<RetryableOperation<T>>(PassKey{}, 
std::forward<Args>(args)...);
     }
 
-    Future<Result, T> run() {
+    Future<Error, T> run() {
         bool expected = false;
         if (!started_.compare_exchange_strong(expected, true)) {
             return promise_.getFuture();
@@ -67,16 +67,16 @@ class RetryableOperation : public 
std::enable_shared_from_this<RetryableOperatio
     }
 
     void cancel() {
-        promise_.setFailed(ResultDisconnected);
+        promise_.setFailed({ResultDisconnected, ""});
         cancelTimer(*timer_);
     }
 
    private:
     const std::string name_;
-    std::function<Future<Result, T>()> func_;
+    std::function<Future<Error, T>()> func_;
     const TimeDuration timeout_;
     Backoff backoff_;
-    Promise<Result, T> promise_;
+    Promise<Error, T> promise_;
     std::atomic_bool started_{false};
     DeadlineTimerPtr timer_;
 
@@ -84,24 +84,24 @@ class RetryableOperation : public 
std::enable_shared_from_this<RetryableOperatio
 #ifdef __GNUC__
     __attribute__((visibility("hidden")))
 #endif
-    Future<Result, T>
+    Future<Error, T>
     runImpl(TimeDuration remainingTime) {
         std::weak_ptr<RetryableOperation<T>> 
weakSelf{this->shared_from_this()};
-        func_().addListener([this, weakSelf, remainingTime](Result result, 
const T& value) {
+        func_().addListener([this, weakSelf, remainingTime](const Error& 
error, const T& value) {
             auto self = weakSelf.lock();
             if (!self) {
                 return;
             }
-            if (result == ResultOk) {
+            if (error.result == ResultOk) {
                 promise_.setValue(value);
                 return;
             }
-            if (!isResultRetryable(result)) {
-                promise_.setFailed(result);
+            if (!isResultRetryable(error.result)) {
+                promise_.setFailed(error);
                 return;
             }
             if (toMillis(remainingTime) <= 0) {
-                promise_.setFailed(ResultTimeout);
+                promise_.setFailed({ResultTimeout, ""});
                 return;
             }
 
@@ -119,7 +119,7 @@ class RetryableOperation : public 
std::enable_shared_from_this<RetryableOperatio
                 if (ec) {
                     if (ec == ASIO::error::operation_aborted) {
                         LOG_DEBUG("Timer for " << name_ << " is cancelled");
-                        promise_.setFailed(ResultTimeout);
+                        promise_.setFailed({ResultTimeout, ""});
                     } else {
                         LOG_WARN("Timer for " << name_ << " failed: " << 
ec.message());
                     }
diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h
index fa4c8fc..d3d1870 100644
--- a/lib/RetryableOperationCache.h
+++ b/lib/RetryableOperationCache.h
@@ -56,11 +56,11 @@ class RetryableOperationCache : public 
std::enable_shared_from_this<RetryableOpe
         return std::make_shared<Self>(PassKey{}, std::forward<Args>(args)...);
     }
 
-    Future<Result, T> run(const std::string& key, std::function<Future<Result, 
T>()>&& func) {
+    Future<Error, T> run(const std::string& key, std::function<Future<Error, 
T>()>&& func) {
         std::unique_lock<std::mutex> lock{mutex_};
         if (closed_) {
-            Promise<Result, T> promise;
-            promise.setFailed(ResultAlreadyClosed);
+            Promise<Error, T> promise;
+            promise.setFailed({ResultAlreadyClosed, ""});
             return promise.getFuture();
         }
         auto it = operations_.find(key);
@@ -70,8 +70,8 @@ class RetryableOperationCache : public 
std::enable_shared_from_this<RetryableOpe
                 timer = executorProvider_->get()->createDeadlineTimer();
             } catch (const std::runtime_error& e) {
                 LOG_ERROR("Failed to retry lookup for " << key << ": " << 
e.what());
-                Promise<Result, T> promise;
-                promise.setFailed(ResultConnectError);
+                Promise<Error, T> promise;
+                promise.setFailed({ResultConnectError, e.what()});
                 return promise.getFuture();
             }
 
@@ -81,7 +81,7 @@ class RetryableOperationCache : public 
std::enable_shared_from_this<RetryableOpe
             lock.unlock();
 
             auto weakSelf = this->weak_from_this();
-            future.addListener([this, weakSelf, key, operation](Result, const 
T&) {
+            future.addListener([this, weakSelf, key, operation](const Error&, 
const T&) {
                 auto self = weakSelf.lock();
                 if (!self) {
                     return;
diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc
index 4749683..d38e62c 100644
--- a/tests/AuthTokenTest.cc
+++ b/tests/AuthTokenTest.cc
@@ -26,7 +26,9 @@
 #include <fstream>
 #include <streambuf>
 #include <string>
+#include <variant>
 
+#include "VariantHelper.h"
 #include "lib/Future.h"
 #include "lib/LogUtils.h"
 #include "lib/Utils.h"
@@ -184,6 +186,13 @@ TEST(AuthPluginToken, testNoAuth) {
     Result result = client.createProducer(topicName, producer);
     ASSERT_EQ(ResultAuthorizationError, result);
 
+    std::visit(overloaded{[](Error&& error) {
+                              ASSERT_EQ(ResultAuthorizationError, 
error.result);
+                              ASSERT_EQ("Client is not authorized to Get 
Partition Metadata", error.message);
+                          },
+                          [](auto&&) { FAIL(); }},
+               client.createProducerV2(topicName, {}));
+
     Consumer consumer;
     result = client.subscribe(topicName, subName, consumer);
     ASSERT_EQ(ResultAuthorizationError, result);
@@ -200,6 +209,14 @@ TEST(AuthPluginToken, testNoAuthWithHttp) {
     Result result = client.createProducer(topicName, producer);
     ASSERT_EQ(ResultConnectError, result);
 
+    std::visit(overloaded{[](Error&& error) {
+                              ASSERT_EQ(ResultConnectError, error.result);
+                              ASSERT_TRUE(error.message.find("The requested 
URL returned error: 401") !=
+                                          std::string::npos);
+                          },
+                          [](auto&&) { FAIL(); }},
+               client.createProducerV2(topicName, {}));
+
     Consumer consumer;
     result = client.subscribe(topicName, subName, consumer);
     ASSERT_EQ(ResultConnectError, result);
@@ -212,5 +229,16 @@ TEST(AuthPluginToken, testTokenSupplierException) {
     Client client(serviceUrl, config);
     Producer producer;
     ASSERT_EQ(ResultAuthenticationError, client.createProducer("topic", 
producer));
+
+    std::visit(
+        overloaded{[](Error&& error) {
+                       ASSERT_EQ(ResultAuthenticationError, error.result);
+                       ASSERT_TRUE(error.message.find("failed to generate 
token") != std::string::npos);
+                   },
+                   [](auto&&) { FAIL(); }},
+        client.createProducerV2("topic", {}));
+
     ASSERT_EQ(ResultOk, client.close());
+
+    client.close();
 }
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index 63ac9d1..e83db4e 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -303,11 +303,11 @@ TEST(ClientTest, 
testTimedOutPendingRequestsAreErasedFromConnectionMaps) {
         "persistent://public/default/testTimedOutPendingRequests-" + suffix, 
"", requestIdGenerator++);
 
     ResponseData responseData;
-    ASSERT_EQ(ResultTimeout, pingFuture.get(responseData));
+    ASSERT_EQ(ResultTimeout, pingFuture.get(responseData).result);
     ASSERT_EQ(0u, PulsarFriend::getPendingRequests(*connection));
 
     LookupDataResultPtr lookupData;
-    ASSERT_EQ(ResultTimeout, lookupFuture.get(lookupData));
+    ASSERT_EQ(ResultTimeout, lookupFuture.get(lookupData).result);
     ASSERT_EQ(0u, PulsarFriend::getPendingLookupRequests(*connection));
     ASSERT_EQ(0u, PulsarFriend::getNumOfPendingLookupRequests(*connection));
 
@@ -320,11 +320,11 @@ TEST(ClientTest, 
testTimedOutPendingRequestsAreErasedFromConnectionMaps) {
     ASSERT_EQ(0u, 
PulsarFriend::getPendingGetTopicsOfNamespaceRequests(*connection));
 
     SchemaInfo schemaInfo;
-    ASSERT_EQ(ResultTimeout, getSchemaFuture.get(schemaInfo));
+    ASSERT_EQ(ResultTimeout, getSchemaFuture.get(schemaInfo).result);
     ASSERT_EQ(0u, PulsarFriend::getPendingGetSchemaRequests(*connection));
 
     mockServer->close();
-    connection->close(ResultDisconnected).wait();
+    connection->close(Error{ResultDisconnected, ""}).wait();
     executorProvider->close();
 }
 
diff --git a/tests/ConnectionTest.cc b/tests/ConnectionTest.cc
index e0d063e..dd4641b 100644
--- a/tests/ConnectionTest.cc
+++ b/tests/ConnectionTest.cc
@@ -24,7 +24,7 @@ using namespace pulsar;
 
 class MockClientConnection {
    public:
-    MOCK_METHOD(void, close, (Result));
+    MOCK_METHOD(void, close, (Error));
 
     void checkServerError(ServerError error, const std::string& message) {
         ::pulsar::adaptor::checkServerError(*this, error, message);
@@ -41,7 +41,7 @@ static const std::vector<std::string> retryableErrorMessages{
 
 TEST(ConnectionTest, testCheckServerError) {
     MockClientConnection conn;
-    EXPECT_CALL(conn, close(ResultDisconnected)).Times(0);
+    EXPECT_CALL(conn, close(::testing::_)).Times(0);
     for (auto&& msg : retryableErrorMessages) {
         conn.checkServerError(pulsar::proto::ServiceNotReady, msg);
     }
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index 53cb76e..e98d972 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -91,7 +91,7 @@ TEST(LookupServiceTest, basicLookup) {
 
     TopicNamePtr topicName = TopicName::get("topic");
 
-    Future<Result, LookupDataResultPtr> partitionFuture = 
lookupService.getPartitionMetadataAsync(topicName);
+    Future<Error, LookupDataResultPtr> partitionFuture = 
lookupService.getPartitionMetadataAsync(topicName);
     LookupDataResultPtr lookupData;
     partitionFuture.get(lookupData);
     ASSERT_TRUE(lookupData != NULL);
@@ -129,9 +129,9 @@ static void testMultiAddresses(LookupService& 
lookupService) {
     results.clear();
     for (int i = 0; i < numRequests; i++) {
         LookupDataResultPtr data;
-        const auto result = 
lookupService.getPartitionMetadataAsync(TopicName::get("topic")).get(data);
-        LOG_INFO("getPartitionMetadataAsync [" << i << "] " << result);
-        results.emplace_back(result);
+        const auto error = 
lookupService.getPartitionMetadataAsync(TopicName::get("topic")).get(data);
+        LOG_INFO("getPartitionMetadataAsync [" << i << "] " << error.result);
+        results.emplace_back(error.result);
     }
     verifySuccessCount();
 
@@ -186,7 +186,7 @@ TEST(LookupServiceTest, testRetry) {
     PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
     auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr);
     LookupDataResultPtr lookupDataResultPtr;
-    ASSERT_EQ(ResultOk, future2.get(lookupDataResultPtr));
+    ASSERT_EQ(ResultOk, future2.get(lookupDataResultPtr).result);
     LOG_INFO("getPartitionMetadataAsync returns " << 
lookupDataResultPtr->getPartitions() << " partitions");
 
     PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
@@ -234,7 +234,7 @@ TEST(LookupServiceTest, testTimeout) {
     beforeMethod();
     auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr);
     LookupDataResultPtr lookupDataResultPtr;
-    ASSERT_EQ(ResultTimeout, future2.get(lookupDataResultPtr));
+    ASSERT_EQ(ResultTimeout, future2.get(lookupDataResultPtr).result);
     afterMethod("getPartitionMetadataAsync");
 
     beforeMethod();
@@ -307,7 +307,7 @@ TEST_P(LookupServiceTest, testGetSchema) {
 
     SchemaInfo schemaInfo;
     auto future = 
PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic));
-    ASSERT_EQ(ResultOk, future.get(schemaInfo));
+    ASSERT_EQ(ResultOk, future.get(schemaInfo).result);
     ASSERT_EQ(jsonSchema, schemaInfo.getSchema());
     ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType());
     ASSERT_EQ(properties, schemaInfo.getProperties());
@@ -322,7 +322,7 @@ TEST_P(LookupServiceTest, testGetSchemaNotFound) {
 
     SchemaInfo schemaInfo;
     auto future = 
PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic));
-    ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo));
+    ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo).result);
 }
 
 TEST_P(LookupServiceTest, testGetKeyValueSchema) {
@@ -344,7 +344,7 @@ TEST_P(LookupServiceTest, testGetKeyValueSchema) {
 
     SchemaInfo schemaInfo;
     auto future = 
PulsarFriend::getClientImplPtr(client_)->getSchema(TopicName::get(topic));
-    ASSERT_EQ(ResultOk, future.get(schemaInfo));
+    ASSERT_EQ(ResultOk, future.get(schemaInfo).result);
     ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema());
     ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType());
     ASSERT_FALSE(schemaInfo.getProperties().empty());
@@ -510,13 +510,13 @@ class MockLookupService : public BinaryProtoLookupService 
{
    public:
     using BinaryProtoLookupService::BinaryProtoLookupService;
 
-    Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr& topicName) override {
+    Future<Error, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr& topicName) override {
         bool expected = true;
         if (firstTime_.compare_exchange_strong(expected, false)) {
             // Trigger the retry
             LOG_INFO("Fail the lookup for " << topicName->toString() << " 
intentionally");
-            Promise<Result, LookupDataResultPtr> promise;
-            promise.setFailed(ResultRetryable);
+            Promise<Error, LookupDataResultPtr> promise;
+            promise.setFailed(Error{ResultRetryable, ""});
             return promise.getFuture();
         }
         return BinaryProtoLookupService::getPartitionMetadataAsync(topicName);
@@ -564,7 +564,7 @@ TEST(LookupServiceTest, testRetryAfterDestroyed) {
     lookupService->close();
     Result result = ResultOk;
     
lookupService->getPartitionMetadataAsync(TopicName::get("lookup-service-test-retry-after-destroyed"))
-        .addListener([&result](Result innerResult, const LookupDataResultPtr&) 
{ result = innerResult; });
+        .addListener([&result](const auto& error, const LookupDataResultPtr&) 
{ result = error.result; });
     EXPECT_EQ(ResultAlreadyClosed, result);
     pool.close();
     executorProvider->close();
diff --git a/tests/MockClientImpl.h b/tests/MockClientImpl.h
index 074808f..7fa902b 100644
--- a/tests/MockClientImpl.h
+++ b/tests/MockClientImpl.h
@@ -24,6 +24,7 @@
 #include <chrono>
 #include <future>
 #include <memory>
+#include <variant>
 
 #include "lib/ClientImpl.h"
 
@@ -46,9 +47,13 @@ class MockClientImpl : public ClientImpl {
         using namespace std::chrono;
         auto start = high_resolution_clock::now();
         auto promise = createPromise();
-        createProducerAsync(topic, {}, [&start, promise](Result result, 
Producer) {
+        createProducerAsync(topic, {}, [&start, promise](const auto& v) {
             auto timeMs = 
duration_cast<milliseconds>(high_resolution_clock::now() - start).count();
-            promise->set_value({result, timeMs});
+            if (const auto* error = std::get_if<Error>(&v)) {
+                promise->set_value({error->result, timeMs});
+            } else {
+                promise->set_value({ResultOk, timeMs});
+            }
         });
         return wait(promise);
     }
diff --git a/tests/MultiTopicsConsumerTest.cc b/tests/MultiTopicsConsumerTest.cc
index 57407fb..07e0c50 100644
--- a/tests/MultiTopicsConsumerTest.cc
+++ b/tests/MultiTopicsConsumerTest.cc
@@ -167,7 +167,7 @@ TEST(MultiTopicsConsumerTest, testGetConsumerStatsFail) {
         return PulsarFriend::getPendingConsumerStatsRequests(*connection) == 
expectedRequests;
     }));
 
-    connection->close(ResultDisconnected);
+    connection->close(Error{ResultDisconnected, ""});
     ASSERT_EQ(ResultDisconnected, future.get());
 
     mockServer->close();
diff --git a/tests/RetryableOperationCacheTest.cc 
b/tests/RetryableOperationCacheTest.cc
index 2daaf3f..a1bb285 100644
--- a/tests/RetryableOperationCacheTest.cc
+++ b/tests/RetryableOperationCacheTest.cc
@@ -27,13 +27,13 @@
 
 namespace pulsar {
 
-using IntFuture = Future<Result, int>;
+using IntFuture = Future<Error, int>;
 
 static int wait(IntFuture future) {
     int value;
-    auto result = future.get(value);
-    if (result != ResultOk) {
-        throw std::runtime_error(strResult(result));
+    auto error = future.get(value);
+    if (error.result != ResultOk) {
+        throw std::runtime_error(strResult(error.result));
     }
     return value;
 }
@@ -50,9 +50,9 @@ class CountdownFunc {
         : result_(rhs.result_), totalRetryCount_(rhs.totalRetryCount_), 
current_(rhs.current_.load()) {}
 
     IntFuture operator()() {
-        Promise<Result, int> promise;
+        Promise<Error, int> promise;
         if (++current_ < totalRetryCount_) {
-            promise.setFailed(ResultRetryable);
+            promise.setFailed({ResultRetryable, ""});
         } else {
             promise.setValue(result_);
         }
@@ -141,7 +141,7 @@ TEST_F(RetryableOperationCacheTest, testClose) {
     for (auto&& future : futures_) {
         int value;
         // All cancelled futures complete with ResultDisconnected and the 
default int value
-        ASSERT_EQ(ResultDisconnected, future.get(value));
+        ASSERT_EQ(ResultDisconnected, future.get(value).result);
         ASSERT_EQ(value, 0);
     }
     ASSERT_EQ(getSize(*cache), 0);
diff --git a/tests/SchemaTest.cc b/tests/SchemaTest.cc
index 1fad081..8b17514 100644
--- a/tests/SchemaTest.cc
+++ b/tests/SchemaTest.cc
@@ -172,10 +172,18 @@ TEST(SchemaTest, testAutoDownloadSchema) {
 
     auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
 
-    Promise<Result, Producer> promise;
-    clientImplPtr->createProducerAsync(topic, producerConfiguration, 
WaitForCallbackValue<Producer>(promise),
-                                       true);
-    ASSERT_EQ(ResultOk, promise.getFuture().get(producer));
+    Promise<Error, Producer> promise;
+    clientImplPtr->createProducerAsync(
+        topic, producerConfiguration,
+        [promise](const auto& v) {
+            if (const auto* error = std::get_if<Error>(&v)) {
+                promise.setFailed(*error);
+            } else {
+                promise.setValue(std::get<Producer>(v));
+            }
+        },
+        true);
+    ASSERT_EQ(ResultOk, promise.getFuture().get(producer).result);
 
     Message msg = MessageBuilder().setContent("content").build();
     ASSERT_EQ(ResultOk, producer.send(msg));
diff --git a/tests/VariantHelper.h b/tests/VariantHelper.h
new file mode 100644
index 0000000..4959e0a
--- /dev/null
+++ b/tests/VariantHelper.h
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+// See https://en.cppreference.com/cpp/utility/variant/visit2
+template <typename... Ts>
+struct overloaded : Ts... {
+    using Ts::operator()...;
+};
+// explicit deduction guide (not needed as of C++20)
+template <typename... Ts>
+overloaded(Ts...) -> overloaded<Ts...>;


Reply via email to