This is an automated email from the ASF dual-hosted git repository.

shibd pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new ce3dfa8  feat: add v2 APIs to create Consumer, Reader or TableView 
(#581)
ce3dfa8 is described below

commit ce3dfa814fbc1aaddfe4f8c61f3b1d7954c99462
Author: Yunze Xu <[email protected]>
AuthorDate: Thu May 28 14:23:48 2026 +0800

    feat: add v2 APIs to create Consumer, Reader or TableView (#581)
    
    * feat: add v2 APIs to create Consumer, Reader or TableView
    
    * improve tests for other v2 methods
    
    * fix lint
---
 include/pulsar/Client.h |  36 +++++++++
 lib/Client.cc           |  91 ++++++++++++++++++----
 lib/ClientImpl.cc       | 195 +++++++++++++++++++++++++++++++++---------------
 lib/ClientImpl.h        |  30 ++++++--
 lib/TableViewImpl.cc    |  20 ++---
 lib/TableViewImpl.h     |   4 +-
 tests/AuthTokenTest.cc  |  21 ++++--
 7 files changed, 296 insertions(+), 101 deletions(-)

diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h
index 4114075..63a2515 100644
--- a/include/pulsar/Client.h
+++ b/include/pulsar/Client.h
@@ -36,7 +36,9 @@
 
 #include <memory>
 #include <string>
+#include <utility>
 #include <variant>
+#include <vector>
 
 namespace pulsar {
 typedef std::function<void(Result, Producer)> CreateProducerCallback;
@@ -47,6 +49,21 @@ typedef std::function<void(Result, const 
std::vector<std::string>&)> GetPartitio
 typedef std::function<void(Result)> CloseCallback;
 
 using CreateProducerV2Callback = std::function<void(std::variant<Error, 
Producer>)>;
+using CreateConsumerV2Callback = std::function<void(std::variant<Error, 
Consumer>)>;
+using SubscribeV2Callback = CreateConsumerV2Callback;
+using ReaderV2Callback = std::function<void(std::variant<Error, Reader>)>;
+using TableViewV2Callback = std::function<void(std::variant<Error, 
TableView>)>;
+
+/**
+ * Use TopicRegex with subscribeV2/subscribeAsyncV2 to distinguish a regex 
pattern from a single topic name.
+ */
+struct TopicRegex {
+    explicit TopicRegex(std::string pattern) : pattern(std::move(pattern)) {}
+
+    std::string pattern;
+};
+
+using SubscribeTopics = std::variant<std::string, std::vector<std::string>, 
TopicRegex>;
 
 class ClientImpl;
 class PulsarFriend;
@@ -188,6 +205,13 @@ class PULSAR_PUBLIC Client {
     void subscribeAsync(const std::string& topic, const std::string& 
subscriptionName,
                         const ConsumerConfiguration& conf, const 
SubscribeCallback& callback);
 
+    void subscribeAsyncV2(const SubscribeTopics& topics, const std::string& 
subscriptionName,
+                          const ConsumerConfiguration& conf, 
SubscribeV2Callback callback);
+
+    std::variant<Error, Consumer> subscribeV2(const SubscribeTopics& topics,
+                                              const std::string& 
subscriptionName,
+                                              const ConsumerConfiguration& 
conf);
+
     /**
      * Subscribe to multiple topics under the same namespace.
      *
@@ -332,6 +356,12 @@ class PULSAR_PUBLIC Client {
     void createReaderAsync(const std::string& topic, const MessageId& 
startMessageId,
                            const ReaderConfiguration& conf, const 
ReaderCallback& callback);
 
+    void createReaderAsyncV2(const std::string& topic, const MessageId& 
startMessageId,
+                             const ReaderConfiguration& conf, ReaderV2Callback 
callback);
+
+    std::variant<Error, Reader> createReaderV2(const std::string& topic, const 
MessageId& startMessageId,
+                                               const ReaderConfiguration& 
conf);
+
     /**
      * Create a table view with given {@code TableViewConfiguration} for 
specified topic.
      *
@@ -362,6 +392,12 @@ class PULSAR_PUBLIC Client {
     void createTableViewAsync(const std::string& topic, const 
TableViewConfiguration& conf,
                               const TableViewCallback& callBack);
 
+    void createTableViewAsyncV2(const std::string& topic, const 
TableViewConfiguration& conf,
+                                TableViewV2Callback callback);
+
+    std::variant<Error, TableView> createTableViewV2(const std::string& topic,
+                                                     const 
TableViewConfiguration& conf);
+
     /**
      * Get the list of partitions for a given topic.
      *
diff --git a/lib/Client.cc b/lib/Client.cc
index 32ab87c..11dfe26 100644
--- a/lib/Client.cc
+++ b/lib/Client.cc
@@ -26,15 +26,35 @@
 
 #include "ClientImpl.h"
 #include "Int64SerDes.h"
-#include "LogUtils.h"
 #include "LookupService.h"
 #include "TopicName.h"
 #include "Utils.h"
 
-DECLARE_LOG_OBJECT()
-
 namespace pulsar {
 
+namespace {
+
+template <typename T>
+void setPromiseValue(std::promise<std::variant<Error, T>>& promise, const 
std::variant<Error, T>& value) {
+    if (const auto* error = std::get_if<Error>(&value)) {
+        promise.set_value(*error);
+    } else {
+        promise.set_value(std::get<T>(value));
+    }
+}
+
+template <typename T>
+void invokeLegacyCallback(const std::function<void(Result, T)>& callback,
+                          const std::variant<Error, T>& value) {
+    if (const auto* error = std::get_if<Error>(&value)) {
+        callback(error->result, T());
+    } else {
+        callback(ResultOk, std::get<T>(value));
+    }
+}
+
+}  // namespace
+
 Client::Client(const std::shared_ptr<ClientImpl>& impl) : impl_(impl) { 
impl_->initialize(); }
 
 Client::Client(const std::string& serviceUrl) : Client(serviceUrl, 
ClientConfiguration()) {}
@@ -83,13 +103,8 @@ void Client::createProducerAsyncV2(const std::string& 
topic, const ProducerConfi
 std::variant<Error, Producer> Client::createProducerV2(const std::string& 
topic,
                                                        const 
ProducerConfiguration& conf) {
     std::promise<std::variant<Error, Producer>> promise;
-    createProducerAsyncV2(topic, conf, [&promise](const auto& v) mutable {
-        if (const auto* error = std::get_if<Error>(&v)) {
-            promise.set_value(*error);
-        } else {
-            promise.set_value(std::get<Producer>(v));
-        }
-    });
+    createProducerAsyncV2(topic, conf,
+                          [&promise](const auto& v) mutable { 
setPromiseValue<Producer>(promise, v); });
     return promise.get_future().get();
 }
 
@@ -113,8 +128,22 @@ void Client::subscribeAsync(const std::string& topic, 
const std::string& subscri
 
 void Client::subscribeAsync(const std::string& topic, const std::string& 
subscriptionName,
                             const ConsumerConfiguration& conf, const 
SubscribeCallback& callback) {
-    LOG_INFO("Subscribing on Topic :" << topic);
-    impl_->subscribeAsync(topic, subscriptionName, conf, callback);
+    subscribeAsyncV2(topic, subscriptionName, conf,
+                     [callback](const auto& value) { 
invokeLegacyCallback<Consumer>(callback, value); });
+}
+
+void Client::subscribeAsyncV2(const SubscribeTopics& topics, const 
std::string& subscriptionName,
+                              const ConsumerConfiguration& conf, 
SubscribeV2Callback callback) {
+    impl_->subscribeAsyncV2(topics, subscriptionName, conf, 
std::move(callback));
+}
+
+std::variant<Error, Consumer> Client::subscribeV2(const SubscribeTopics& 
topics,
+                                                  const std::string& 
subscriptionName,
+                                                  const ConsumerConfiguration& 
conf) {
+    std::promise<std::variant<Error, Consumer>> promise;
+    subscribeAsyncV2(topics, subscriptionName, conf,
+                     [&promise](const auto& v) mutable { 
setPromiseValue<Consumer>(promise, v); });
+    return promise.get_future().get();
 }
 
 Result Client::subscribe(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
@@ -138,7 +167,8 @@ void Client::subscribeAsync(const std::vector<std::string>& 
topics, const std::s
 
 void Client::subscribeAsync(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
                             const ConsumerConfiguration& conf, const 
SubscribeCallback& callback) {
-    impl_->subscribeAsync(topics, subscriptionName, conf, callback);
+    subscribeAsyncV2(topics, subscriptionName, conf,
+                     [callback](const auto& value) { 
invokeLegacyCallback<Consumer>(callback, value); });
 }
 
 Result Client::subscribeWithRegex(const std::string& regexPattern, const 
std::string& subscriptionName,
@@ -162,7 +192,8 @@ void Client::subscribeWithRegexAsync(const std::string& 
regexPattern, const std:
 
 void Client::subscribeWithRegexAsync(const std::string& regexPattern, const 
std::string& subscriptionName,
                                      const ConsumerConfiguration& conf, const 
SubscribeCallback& callback) {
-    impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf, 
callback);
+    subscribeAsyncV2(TopicRegex{regexPattern}, subscriptionName, conf,
+                     [callback](const auto& value) { 
invokeLegacyCallback<Consumer>(callback, value); });
 }
 
 Result Client::createReader(const std::string& topic, const MessageId& 
startMessageId,
@@ -176,7 +207,21 @@ Result Client::createReader(const std::string& topic, 
const MessageId& startMess
 
 void Client::createReaderAsync(const std::string& topic, const MessageId& 
startMessageId,
                                const ReaderConfiguration& conf, const 
ReaderCallback& callback) {
-    impl_->createReaderAsync(topic, startMessageId, conf, callback);
+    createReaderAsyncV2(topic, startMessageId, conf,
+                        [callback](const auto& value) { 
invokeLegacyCallback<Reader>(callback, value); });
+}
+
+void Client::createReaderAsyncV2(const std::string& topic, const MessageId& 
startMessageId,
+                                 const ReaderConfiguration& conf, 
ReaderV2Callback callback) {
+    impl_->createReaderAsyncV2(topic, startMessageId, conf, 
std::move(callback));
+}
+
+std::variant<Error, Reader> Client::createReaderV2(const std::string& topic, 
const MessageId& startMessageId,
+                                                   const ReaderConfiguration& 
conf) {
+    std::promise<std::variant<Error, Reader>> promise;
+    createReaderAsyncV2(topic, startMessageId, conf,
+                        [&promise](const auto& v) mutable { 
setPromiseValue<Reader>(promise, v); });
+    return promise.get_future().get();
 }
 
 Result Client::createTableView(const std::string& topic, const 
TableViewConfiguration& conf,
@@ -190,7 +235,21 @@ Result Client::createTableView(const std::string& topic, 
const TableViewConfigur
 
 void Client::createTableViewAsync(const std::string& topic, const 
TableViewConfiguration& conf,
                                   const TableViewCallback& callback) {
-    impl_->createTableViewAsync(topic, conf, callback);
+    createTableViewAsyncV2(
+        topic, conf, [callback](const auto& value) { 
invokeLegacyCallback<TableView>(callback, value); });
+}
+
+void Client::createTableViewAsyncV2(const std::string& topic, const 
TableViewConfiguration& conf,
+                                    TableViewV2Callback callback) {
+    impl_->createTableViewAsyncV2(topic, conf, std::move(callback));
+}
+
+std::variant<Error, TableView> Client::createTableViewV2(const std::string& 
topic,
+                                                         const 
TableViewConfiguration& conf) {
+    std::promise<std::variant<Error, TableView>> promise;
+    createTableViewAsyncV2(topic, conf,
+                           [&promise](const auto& v) mutable { 
setPromiseValue<TableView>(promise, v); });
+    return promise.get_future().get();
 }
 
 Result Client::getPartitionsForTopic(const std::string& topic, 
std::vector<std::string>& partitions) {
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 1e4f022..dc79fce 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -62,6 +62,20 @@ DECLARE_LOG_OBJECT()
 
 namespace pulsar {
 
+namespace {
+
+template <typename T>
+void invokeLegacyCallback(const std::function<void(Result, T)>& callback,
+                          const std::variant<Error, T>& value) {
+    if (const auto* error = std::get_if<Error>(&value)) {
+        callback(error->result, T());
+    } else {
+        callback(ResultOk, std::get<T>(value));
+    }
+}
+
+}  // namespace
+
 static const char hexDigits[] = {'0', '1', '2', '3', '4', '5', '6', '7',
                                  '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
 static std::uniform_int_distribution<> hexDigitsDist(0, sizeof(hexDigits) - 1);
@@ -286,72 +300,93 @@ void ClientImpl::handleProducerCreated(const Error& 
error, const ProducerImplBas
 
 void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& 
startMessageId,
                                    const ReaderConfiguration& conf, const 
ReaderCallback& callback) {
+    createReaderAsyncV2(topic, startMessageId, conf,
+                        [callback](const auto& value) { 
invokeLegacyCallback<Reader>(callback, value); });
+}
+
+void ClientImpl::createReaderAsyncV2(const std::string& topic, const 
MessageId& startMessageId,
+                                     const ReaderConfiguration& conf, 
ReaderV2Callback callback) {
     TopicNamePtr topicName;
     {
         std::shared_lock lock(mutex_);
         if (state_ != Open) {
             lock.unlock();
-            callback(ResultAlreadyClosed, Reader());
+            callback(Error{ResultAlreadyClosed, ""});
             return;
         } else if (!(topicName = TopicName::get(topic))) {
             lock.unlock();
-            callback(ResultInvalidTopicName, Reader());
+            callback(Error{ResultInvalidTopicName, ""});
             return;
         }
     }
 
-    getPartitionMetadataAsync(topicName).addListener([this, 
self{shared_from_this()}, topicName,
-                                                      startMessageId, conf,
-                                                      callback](const auto& 
error, const auto& metadata) {
-        handleReaderMetadataLookup(error.result, metadata, topicName, 
startMessageId, conf, callback);
-    });
+    getPartitionMetadataAsync(topicName).addListener(
+        [this, self{shared_from_this()}, topicName, startMessageId, conf, 
callback{std::move(callback)}](
+            const auto& error, const auto& metadata) {
+            handleReaderMetadataLookup(error, metadata, topicName, 
startMessageId, conf, callback);
+        });
 }
 
 void ClientImpl::createTableViewAsync(const std::string& topic, const 
TableViewConfiguration& conf,
                                       const TableViewCallback& callback) {
+    createTableViewAsyncV2(
+        topic, conf, [callback](const auto& value) { 
invokeLegacyCallback<TableView>(callback, value); });
+}
+
+void ClientImpl::createTableViewAsyncV2(const std::string& topic, const 
TableViewConfiguration& conf,
+                                        TableViewV2Callback callback) {
     TopicNamePtr topicName;
     {
         std::shared_lock lock(mutex_);
         if (state_ != Open) {
             lock.unlock();
-            callback(ResultAlreadyClosed, TableView());
+            callback(Error{ResultAlreadyClosed, ""});
             return;
         } else if (!(topicName = TopicName::get(topic))) {
             lock.unlock();
-            callback(ResultInvalidTopicName, TableView());
+            callback(Error{ResultInvalidTopicName, ""});
             return;
         }
     }
 
     TableViewImplPtr tableViewPtr =
         std::make_shared<TableViewImpl>(shared_from_this(), 
topicName->toString(), conf);
-    tableViewPtr->start().addListener([callback](Result result, const 
TableViewImplPtr& tableViewImplPtr) {
-        if (result == ResultOk) {
-            callback(result, TableView{tableViewImplPtr});
-        } else {
-            callback(result, {});
-        }
-    });
+    tableViewPtr->start().addListener(
+        [callback{std::move(callback)}](const Error& error, const 
TableViewImplPtr& tableViewImplPtr) {
+            if (error.result == ResultOk) {
+                callback(TableView{tableViewImplPtr});
+            } else {
+                callback(error);
+            }
+        });
 }
 
-void ClientImpl::handleReaderMetadataLookup(Result result, const 
LookupDataResultPtr& partitionMetadata,
+void ClientImpl::handleReaderMetadataLookup(const Error& error, const 
LookupDataResultPtr& partitionMetadata,
                                             const TopicNamePtr& topicName, 
const MessageId& startMessageId,
-                                            const ReaderConfiguration& conf, 
const ReaderCallback& callback) {
-    if (result != ResultOk) {
+                                            const ReaderConfiguration& conf,
+                                            const ReaderV2Callback& callback) {
+    if (error.result != ResultOk) {
         LOG_ERROR("Error Checking/Getting Partition Metadata while creating 
readeron "
-                  << topicName->toString() << " -- " << result);
-        callback(result, Reader());
+                  << topicName->toString() << " -- " << error.result);
+        callback(error);
         return;
     }
 
     ReaderImplPtr reader;
     try {
+        ReaderCallback readerCreatedCallback = [callback](Result result, const 
Reader& reader) {
+            if (result == ResultOk) {
+                callback(reader);
+            } else {
+                callback(Error{result, ""});
+            }
+        };
         reader.reset(new ReaderImpl(shared_from_this(), topicName->toString(),
                                     partitionMetadata->getPartitions(), conf,
-                                    getListenerExecutorProvider()->get(), 
callback));
+                                    getListenerExecutorProvider()->get(), 
readerCreatedCallback));
     } catch (const std::runtime_error& e) {
         LOG_ERROR("Failed to create reader: " << e.what());
-        callback(ResultConnectError, {});
+        callback(Error{ResultConnectError, e.what()});
         return;
     }
     ConsumerImplBasePtr consumer = reader->getConsumer();
@@ -375,18 +410,34 @@ void ClientImpl::handleReaderMetadataLookup(Result 
result, const LookupDataResul
 void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, 
const std::string& subscriptionName,
                                          const ConsumerConfiguration& conf,
                                          const SubscribeCallback& callback) {
+    subscribeAsyncV2(TopicRegex{regexPattern}, subscriptionName, conf,
+                     [callback](const auto& value) { 
invokeLegacyCallback<Consumer>(callback, value); });
+}
+
+void ClientImpl::subscribeAsyncV2(const SubscribeTopics& topics, const 
std::string& subscriptionName,
+                                  const ConsumerConfiguration& conf, 
SubscribeV2Callback callback) {
+    std::visit(
+        [this, &subscriptionName, &conf, callback{std::move(callback)}](const 
auto& topicSpec) mutable {
+            subscribeToTopicsAsyncV2(topicSpec, subscriptionName, conf, 
std::move(callback));
+        },
+        topics);
+}
+
+void ClientImpl::subscribeToTopicsAsyncV2(const TopicRegex& topicRegex, const 
std::string& subscriptionName,
+                                          const ConsumerConfiguration& conf, 
SubscribeV2Callback callback) {
+    const auto regexPattern = topicRegex.pattern;
     TopicNamePtr topicNamePtr = TopicName::get(regexPattern);
 
     std::shared_lock lock(mutex_);
     if (state_ != Open) {
         lock.unlock();
-        callback(ResultAlreadyClosed, Consumer());
+        callback(Error{ResultAlreadyClosed, ""});
         return;
     } else {
         lock.unlock();
         if (!topicNamePtr) {
             LOG_ERROR("Topic pattern not valid: " << regexPattern);
-            callback(ResultInvalidTopicName, Consumer());
+            callback(Error{ResultInvalidTopicName, ""});
             return;
         }
     }
@@ -411,14 +462,16 @@ void ClientImpl::subscribeWithRegexAsync(const 
std::string& regexPattern, const
             break;
         default:
             LOG_ERROR("RegexSubscriptionMode not valid: " << 
regexSubscriptionMode);
-            callback(ResultInvalidConfiguration, Consumer());
+            callback(Error{ResultInvalidConfiguration, ""});
             return;
     }
 
     getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
-        .addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, 
shared_from_this(),
-                               std::placeholders::_1, std::placeholders::_2, 
regexPattern, mode,
-                               subscriptionName, conf, callback));
+        .addListener([self{shared_from_this()}, regexPattern, mode, 
subscriptionName, conf,
+                      callback{std::move(callback)}](Result result, const 
NamespaceTopicsPtr& topics) {
+            self->createPatternMultiTopicsConsumer(result, topics, 
regexPattern, mode, subscriptionName, conf,
+                                                   callback);
+        });
 }
 
 void ClientImpl::createPatternMultiTopicsConsumer(Result result, const 
NamespaceTopicsPtr& topics,
@@ -426,7 +479,7 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result 
result, const Namespace
                                                   
CommandGetTopicsOfNamespace_Mode mode,
                                                   const std::string& 
subscriptionName,
                                                   const ConsumerConfiguration& 
conf,
-                                                  const SubscribeCallback& 
callback) {
+                                                  SubscribeV2Callback 
callback) {
     if (result == ResultOk) {
         ConsumerImplBasePtr consumer;
 
@@ -441,18 +494,27 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result 
result, const Namespace
             shared_from_this(), regexPattern, mode, *matchTopics, 
subscriptionName, conf, interceptors);
 
         consumer->getConsumerCreatedFuture().addListener(
-            std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), 
std::placeholders::_1,
-                      std::placeholders::_2, callback, consumer));
+            [self{shared_from_this()}, callback{std::move(callback)}, 
consumer](
+                Result result, const ConsumerImplBaseWeakPtr& 
consumerImplBaseWeakPtr) {
+                self->handleConsumerCreated(result, consumerImplBaseWeakPtr, 
callback, consumer);
+            });
         consumer->start();
     } else {
         LOG_ERROR("Error Getting topicsOfNameSpace while 
createPatternMultiTopicsConsumer:  " << result);
-        callback(result, Consumer());
+        callback(Error{result, ""});
     }
 }
 
 void ClientImpl::subscribeAsync(const std::vector<std::string>& originalTopics,
                                 const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
                                 const SubscribeCallback& callback) {
+    subscribeAsyncV2(originalTopics, subscriptionName, conf,
+                     [callback](const auto& value) { 
invokeLegacyCallback<Consumer>(callback, value); });
+}
+
+void ClientImpl::subscribeToTopicsAsyncV2(const std::vector<std::string>& 
originalTopics,
+                                          const std::string& subscriptionName,
+                                          const ConsumerConfiguration& conf, 
SubscribeV2Callback callback) {
     TopicNamePtr topicNamePtr;
 
     // Remove duplicates from the list of topics
@@ -464,12 +526,12 @@ void ClientImpl::subscribeAsync(const 
std::vector<std::string>& originalTopics,
     std::shared_lock lock(mutex_);
     if (state_ != Open) {
         lock.unlock();
-        callback(ResultAlreadyClosed, Consumer());
+        callback(Error{ResultAlreadyClosed, ""});
         return;
     } else {
         if (!topics.empty() && !(topicNamePtr = 
MultiTopicsConsumerImpl::topicNamesValid(topics))) {
             lock.unlock();
-            callback(ResultInvalidTopicName, Consumer());
+            callback(Error{ResultInvalidTopicName, ""});
             return;
         }
     }
@@ -487,45 +549,54 @@ void ClientImpl::subscribeAsync(const 
std::vector<std::string>& originalTopics,
     ConsumerImplBasePtr consumer = std::make_shared<MultiTopicsConsumerImpl>(
         shared_from_this(), topics, subscriptionName, topicNamePtr, conf, 
interceptors);
 
-    
consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
-                                                               
shared_from_this(), std::placeholders::_1,
-                                                               
std::placeholders::_2, callback, consumer));
+    consumer->getConsumerCreatedFuture().addListener(
+        [self{shared_from_this()}, callback{std::move(callback)}, consumer](
+            Result result, const ConsumerImplBaseWeakPtr& 
consumerImplBaseWeakPtr) {
+            self->handleConsumerCreated(result, consumerImplBaseWeakPtr, 
callback, consumer);
+        });
     consumer->start();
 }
 
 void ClientImpl::subscribeAsync(const std::string& topic, const std::string& 
subscriptionName,
                                 const ConsumerConfiguration& conf, const 
SubscribeCallback& callback) {
+    subscribeAsyncV2(topic, subscriptionName, conf,
+                     [callback](const auto& value) { 
invokeLegacyCallback<Consumer>(callback, value); });
+}
+
+void ClientImpl::subscribeToTopicsAsyncV2(const std::string& topic, const 
std::string& subscriptionName,
+                                          const ConsumerConfiguration& conf, 
SubscribeV2Callback callback) {
+    LOG_INFO("Subscribing on Topic :" << topic);
     TopicNamePtr topicName;
     {
         std::shared_lock lock(mutex_);
         if (state_ != Open) {
             lock.unlock();
-            callback(ResultAlreadyClosed, Consumer());
+            callback(Error{ResultAlreadyClosed, ""});
             return;
         } else if (!(topicName = TopicName::get(topic))) {
             lock.unlock();
-            callback(ResultInvalidTopicName, Consumer());
+            callback(Error{ResultInvalidTopicName, ""});
             return;
         } else if (conf.isReadCompacted() && 
(topicName->getDomain().compare("persistent") != 0 ||
                                               (conf.getConsumerType() != 
ConsumerExclusive &&
                                                conf.getConsumerType() != 
ConsumerFailover))) {
             lock.unlock();
-            callback(ResultInvalidConfiguration, Consumer());
+            callback(Error{ResultInvalidConfiguration, ""});
             return;
         }
     }
 
-    getPartitionMetadataAsync(topicName).addListener([this, 
self{shared_from_this()}, topicName,
-                                                      subscriptionName, conf,
-                                                      callback](const auto& 
error, const auto& metadata) {
-        handleSubscribe(error.result, metadata, topicName, subscriptionName, 
conf, callback);
-    });
+    getPartitionMetadataAsync(topicName).addListener(
+        [this, self{shared_from_this()}, topicName, subscriptionName, conf, 
callback{std::move(callback)}](
+            const auto& error, const auto& metadata) {
+            handleSubscribe(error, metadata, topicName, subscriptionName, 
conf, callback);
+        });
 }
 
-void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& 
partitionMetadata,
+void ClientImpl::handleSubscribe(const Error& error, const 
LookupDataResultPtr& partitionMetadata,
                                  const TopicNamePtr& topicName, const 
std::string& subscriptionName,
-                                 ConsumerConfiguration conf, const 
SubscribeCallback& callback) {
-    if (result == ResultOk) {
+                                 ConsumerConfiguration conf, 
SubscribeV2Callback callback) {
+    if (error.result == ResultOk) {
         // generate random name if not supplied by the customer.
         if (conf.getConsumerName().empty()) {
             conf.setConsumerName(generateRandomName());
@@ -537,7 +608,7 @@ void ClientImpl::handleSubscribe(Result result, const 
LookupDataResultPtr& parti
             if (partitionMetadata->getPartitions() > 0) {
                 if (conf.getReceiverQueueSize() == 0) {
                     LOG_ERROR("Can't use partitioned topic if the queue size 
is 0.");
-                    callback(ResultInvalidConfiguration, Consumer());
+                    callback(Error{ResultInvalidConfiguration, ""});
                     return;
                 }
                 consumer = 
std::make_shared<MultiTopicsConsumerImpl>(shared_from_this(), topicName,
@@ -552,22 +623,24 @@ void ClientImpl::handleSubscribe(Result result, const 
LookupDataResultPtr& parti
             }
         } catch (const std::runtime_error& e) {
             LOG_ERROR("Failed to create consumer: " << e.what());
-            callback(ResultConnectError, {});
+            callback(Error{ResultConnectError, e.what()});
             return;
         }
         consumer->getConsumerCreatedFuture().addListener(
-            std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), 
std::placeholders::_1,
-                      std::placeholders::_2, callback, consumer));
+            [self{shared_from_this()}, callback{std::move(callback)}, 
consumer](
+                Result result, const ConsumerImplBaseWeakPtr& 
consumerImplBaseWeakPtr) {
+                self->handleConsumerCreated(result, consumerImplBaseWeakPtr, 
callback, consumer);
+            });
         consumer->start();
     } else {
-        LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing 
on " << topicName->toString()
-                                                                               
     << " -- " << result);
-        callback(result, Consumer());
+        LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing 
on "
+                  << topicName->toString() << " -- " << error.result);
+        callback(error);
     }
 }
 
 void ClientImpl::handleConsumerCreated(Result result, const 
ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr,
-                                       const SubscribeCallback& callback,
+                                       const SubscribeV2Callback& callback,
                                        const ConsumerImplBasePtr& consumer) {
     if (result == ResultOk) {
         auto address = consumer.get();
@@ -576,18 +649,18 @@ void ClientImpl::handleConsumerCreated(Result result, 
const ConsumerImplBaseWeak
             auto consumer = existingConsumer.value().lock();
             LOG_ERROR("Unexpected existing consumer at the same address: "
                       << address << ", consumer: " << (consumer ? 
consumer->getName() : "(null)"));
-            callback(ResultUnknownError, {});
+            callback(Error{ResultUnknownError, ""});
             return;
         }
-        callback(result, Consumer(consumer));
+        callback(Consumer(consumer));
     } else {
         // In order to be compatible with the current broker error code 
confusion.
         // 
https://github.com/apache/pulsar/blob/cd2aa550d0fe4e72b5ff88c4f6c1c2795b3ff2cd/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java#L240-L241
         if (result == ResultProducerBusy) {
             LOG_ERROR("Failed to create consumer: SubscriptionName cannot be 
empty.");
-            callback(ResultInvalidConfiguration, {});
+            callback(Error{ResultInvalidConfiguration, "SubscriptionName 
cannot be empty."});
         } else {
-            callback(result, {});
+            callback(Error{result, ""});
         }
     }
 }
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index c046d8e..cdabf1f 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -98,6 +98,9 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
     void subscribeAsync(const std::string& topic, const std::string& 
subscriptionName,
                         const ConsumerConfiguration& conf, const 
SubscribeCallback& callback);
 
+    void subscribeAsyncV2(const SubscribeTopics& topics, const std::string& 
subscriptionName,
+                          const ConsumerConfiguration& conf, 
SubscribeV2Callback callback);
+
     void subscribeAsync(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
                         const ConsumerConfiguration& conf, const 
SubscribeCallback& callback);
 
@@ -107,9 +110,15 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
     void createReaderAsync(const std::string& topic, const MessageId& 
startMessageId,
                            const ReaderConfiguration& conf, const 
ReaderCallback& callback);
 
+    void createReaderAsyncV2(const std::string& topic, const MessageId& 
startMessageId,
+                             const ReaderConfiguration& conf, ReaderV2Callback 
callback);
+
     void createTableViewAsync(const std::string& topic, const 
TableViewConfiguration& conf,
                               const TableViewCallback& callback);
 
+    void createTableViewAsyncV2(const std::string& topic, const 
TableViewConfiguration& conf,
+                                TableViewV2Callback callback);
+
     void getPartitionsForTopicAsync(const std::string& topic, const 
GetPartitionsCallback& callback);
 
     // Use virtual method to test
@@ -175,13 +184,22 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
                               const TopicNamePtr& topicName, const 
ProducerConfiguration& conf,
                               CreateProducerV2Callback callback);
 
-    void handleSubscribe(Result result, const LookupDataResultPtr& 
partitionMetadata,
+    void handleSubscribe(const Error& error, const LookupDataResultPtr& 
partitionMetadata,
                          const TopicNamePtr& topicName, const std::string& 
consumerName,
-                         ConsumerConfiguration conf, const SubscribeCallback& 
callback);
+                         ConsumerConfiguration conf, SubscribeV2Callback 
callback);
+
+    void subscribeToTopicsAsyncV2(const std::string& topic, const std::string& 
subscriptionName,
+                                  const ConsumerConfiguration& conf, 
SubscribeV2Callback callback);
+
+    void subscribeToTopicsAsyncV2(const std::vector<std::string>& topics, 
const std::string& subscriptionName,
+                                  const ConsumerConfiguration& conf, 
SubscribeV2Callback callback);
+
+    void subscribeToTopicsAsyncV2(const TopicRegex& topicRegex, const 
std::string& subscriptionName,
+                                  const ConsumerConfiguration& conf, 
SubscribeV2Callback callback);
 
-    void handleReaderMetadataLookup(Result result, const LookupDataResultPtr& 
partitionMetadata,
+    void handleReaderMetadataLookup(const Error& error, const 
LookupDataResultPtr& partitionMetadata,
                                     const TopicNamePtr& topicName, const 
MessageId& startMessageId,
-                                    const ReaderConfiguration& conf, const 
ReaderCallback& callback);
+                                    const ReaderConfiguration& conf, const 
ReaderV2Callback& callback);
 
     void handleGetPartitions(Result result, const LookupDataResultPtr& 
partitionMetadata,
                              const TopicNamePtr& topicName, const 
GetPartitionsCallback& callback);
@@ -189,7 +207,7 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
     void handleProducerCreated(const Error& error, const 
ProducerImplBaseWeakPtr& producerWeakPtr,
                                const CreateProducerV2Callback& callback, const 
ProducerImplBasePtr& producer);
     void handleConsumerCreated(Result result, const ConsumerImplBaseWeakPtr& 
consumerWeakPtr,
-                               const SubscribeCallback& callback, const 
ConsumerImplBasePtr& consumer);
+                               const SubscribeV2Callback& callback, const 
ConsumerImplBasePtr& consumer);
 
     typedef std::shared_ptr<int> SharedInt;
 
@@ -199,7 +217,7 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
                                           const std::string& regexPattern,
                                           CommandGetTopicsOfNamespace_Mode 
mode,
                                           const std::string& consumerName, 
const ConsumerConfiguration& conf,
-                                          const SubscribeCallback& callback);
+                                          SubscribeV2Callback callback);
 
     const std::string& getPhysicalAddress(const std::string& 
redirectedClusterURI,
                                           const std::string& logicalAddress);
diff --git a/lib/TableViewImpl.cc b/lib/TableViewImpl.cc
index f634fa5..686cd55 100644
--- a/lib/TableViewImpl.cc
+++ b/lib/TableViewImpl.cc
@@ -32,23 +32,23 @@ TableViewImpl::TableViewImpl(const ClientImplPtr& client, 
const std::string& top
                              const TableViewConfiguration& conf)
     : client_(client), topic_(topic), conf_(conf) {}
 
-Future<Result, TableViewImplPtr> TableViewImpl::start() {
-    Promise<Result, TableViewImplPtr> promise;
+Future<Error, TableViewImplPtr> TableViewImpl::start() {
+    Promise<Error, TableViewImplPtr> promise;
     ReaderConfiguration readerConfiguration;
     readerConfiguration.setSchema(conf_.schemaInfo);
     readerConfiguration.setReadCompacted(true);
     readerConfiguration.setInternalSubscriptionName(conf_.subscriptionName);
 
     TableViewImplPtr self = shared_from_this();
-    ReaderCallback readerCallback = [self, promise](Result res, const Reader& 
reader) {
-        if (res == ResultOk) {
-            self->reader_ = reader.impl_;
+    ReaderV2Callback readerCallback = [self, promise](const 
std::variant<Error, Reader>& result) {
+        if (const auto* reader = std::get_if<Reader>(&result)) {
+            self->reader_ = reader->impl_;
             self->readAllExistingMessages(promise, 
TimeUtils::currentTimeMillis(), 0);
         } else {
-            promise.setFailed(res);
+            promise.setFailed(std::get<Error>(result));
         }
     };
-    client_->createReaderAsync(topic_, MessageId::earliest(), 
readerConfiguration, readerCallback);
+    client_->createReaderAsyncV2(topic_, MessageId::earliest(), 
readerConfiguration, readerCallback);
     return promise.getFuture();
 }
 
@@ -118,14 +118,14 @@ void TableViewImpl::handleMessage(const Message& msg) {
     }
 }
 
-void TableViewImpl::readAllExistingMessages(const Promise<Result, 
TableViewImplPtr>& promise, long startTime,
+void TableViewImpl::readAllExistingMessages(const Promise<Error, 
TableViewImplPtr>& promise, long startTime,
                                             long messagesRead) {
     auto weakSelf = weak_from_this();
     reader_->hasMessageAvailableAsync(
         [weakSelf, promise, startTime, messagesRead](Result result, bool 
hasMessage) {
             auto self = weakSelf.lock();
             if (!self || result != ResultOk) {
-                promise.setFailed(result);
+                promise.setFailed(Error{result, ""});
                 return;
             }
             if (hasMessage) {
@@ -135,7 +135,7 @@ void TableViewImpl::readAllExistingMessages(const 
Promise<Result, TableViewImplP
                     [weakSelf, promise, startTime, messagesRead, topic](Result 
res, const Message& msg) {
                         auto self = weakSelf.lock();
                         if (!self || res != ResultOk) {
-                            promise.setFailed(res);
+                            promise.setFailed(Error{res, ""});
                             LOG_ERROR("Start table view failed, reader msg for 
"
                                       << topic << " error: " << 
strResult(res));
                         } else {
diff --git a/lib/TableViewImpl.h b/lib/TableViewImpl.h
index eda5247..a968fa8 100644
--- a/lib/TableViewImpl.h
+++ b/lib/TableViewImpl.h
@@ -45,7 +45,7 @@ class TableViewImpl : public 
std::enable_shared_from_this<TableViewImpl> {
 
     ~TableViewImpl(){};
 
-    Future<Result, TableViewImplPtr> start();
+    Future<Error, TableViewImplPtr> start();
 
     bool retrieveValue(const std::string& key, std::string& value);
 
@@ -77,7 +77,7 @@ class TableViewImpl : public 
std::enable_shared_from_this<TableViewImpl> {
     SynchronizedHashMap<std::string, std::string> data_;
 
     void handleMessage(const Message& msg);
-    void readAllExistingMessages(const Promise<Result, TableViewImplPtr>& 
promise, long startTime,
+    void readAllExistingMessages(const Promise<Error, TableViewImplPtr>& 
promise, long startTime,
                                  long messagesRead);
     void readTailMessage();
 };
diff --git a/tests/AuthTokenTest.cc b/tests/AuthTokenTest.cc
index d38e62c..e2c4c16 100644
--- a/tests/AuthTokenTest.cc
+++ b/tests/AuthTokenTest.cc
@@ -54,6 +54,15 @@ std::string getToken() {
     return str;
 }
 
+template <typename T>
+void assertV2Error(const std::variant<Error, T>& result, Result expectedResult,
+                   const std::string& expectedMessage) {
+    const auto* error = std::get_if<Error>(&result);
+    ASSERT_NE(nullptr, error);
+    EXPECT_EQ(expectedResult, error->result);
+    EXPECT_EQ(expectedMessage, error->message);
+}
+
 TEST(AuthPluginToken, testToken) {
     ClientConfiguration config = ClientConfiguration();
     std::string token = getToken();
@@ -181,17 +190,17 @@ TEST(AuthPluginToken, testNoAuth) {
 
     std::string topicName = "persistent://private/auth/test-token";
     std::string subName = "subscription-name";
+    const std::string expectedErrorMessage = "Client is not authorized to Get 
Partition Metadata";
 
     Producer producer;
     Result result = client.createProducer(topicName, producer);
     ASSERT_EQ(ResultAuthorizationError, result);
 
-    std::visit(overloaded{[](Error&& error) {
-                              ASSERT_EQ(ResultAuthorizationError, 
error.result);
-                              ASSERT_EQ("Client is not authorized to Get 
Partition Metadata", error.message);
-                          },
-                          [](auto&&) { FAIL(); }},
-               client.createProducerV2(topicName, {}));
+    assertV2Error(client.createProducerV2(topicName, {}), 
ResultAuthorizationError, expectedErrorMessage);
+    assertV2Error(client.subscribeV2(topicName, subName, {}), 
ResultAuthorizationError, expectedErrorMessage);
+    assertV2Error(client.createReaderV2(topicName, MessageId::earliest(), {}), 
ResultAuthorizationError,
+                  expectedErrorMessage);
+    assertV2Error(client.createTableViewV2(topicName, {}), 
ResultAuthorizationError, expectedErrorMessage);
 
     Consumer consumer;
     result = client.subscribe(topicName, subName, consumer);


Reply via email to